๐ Complete Spark Job Memory + Performance Analysis (Based on Your Use Case)
๐งฑ Context Overview
- Transactional Tables: 4 large tables (12 months), each month: ~25 Cr rows
- Additional Fact Tables: 20 tables, 5โ8 Cr rows each
- Dimensional + Lookup Tables: ~20 small-medium tables
- Final Fact Table Goal: ~5.5 Cr unique
cust_id - Transformations:
.pivot(), .groupBy(), .withColumn(), .window, .join, .filter, aggregation (quarter, 6m, 12m rollups), MDAB daily โ monthly - Output Storage: Parquet in Cloudera BDL (on HDFS or S3)
- Max Executors Available: 20
๐ง Stepwise Processing Breakdown (Memory + Shuffle + Partition Estimation)
๐น Source Data Read (from Parquet or ORC)
| Step | Table Type | Size per Month | Rows | Columns | Read Size (Compressed) | Read Size (In-Memory) |
|---|
| 1 | Transactional | ~20 GB | 25 Cr | 40 | ~20 GB | ~80โ100 GB (uncompressed/parquet overhead) |
- Memory used: Disk to RAM load (~100 GB)
- Suggest:
.persist(StorageLevel.MEMORY_AND_DISK_SER) only after heavy compute ops - Executors’ off-heap memory is not used here
๐น Data Cleaning / Filtering
- Column pruning, bad records dropped, null checks etc.
- Approx retained rows: ~5 Cr per month
- No shuffle yet
| Config Suggestion | Value |
|---|
| Input Partitions | 500โ800 |
| Executor Cores | 5 |
| Executor Memory | 8โ12 GB |
| Num Executors | 20 |
๐น Pivot (e.g., .groupBy(cust_id).pivot(month_col).agg(sum(...)))
- Pivot on month โ expands 12 new columns per metric (10 columns ร 12 months)
- Internally causes wide shuffle
| Operation | Shuffle Size | Est Memory Usage | Notes |
|---|
| Pivot | ~50โ60 GB | ~6โ8 GB / executor | Spill likely if not cached |
๐น Feature Aggregation (Quarterly, 6-Month, 12-Month Rollups)
- Window functions (
partitionBy('cust_id')) + date filtering - Multi-pass aggregations
- Causes another stage with groupByKey shuffle
| Window Step | Records | Intermediate Shuffle | Execution Notes |
|---|
| QTR | 5 Cr | ~20 GB | Narrow-dependency window |
| 6 Month | 5 Cr | ~30 GB | Wide shuffle |
| 12 Month | 5 Cr | ~40 GB | Most expensive |
๐น MDAB Rollups (Daily โ Monthly)
- Uses
.groupBy(cust_id, month).agg(...) - Another shuffle stage
| Records | Grouping Keys | Shuffle Est | Memory Req |
|---|
| ~30 Cr | cust_id+month | ~50 GB | 6โ8 GB |
๐น Join with Fact/Dim Tables
- Mostly left joins with small lookup and fact tables (20โ100 MB compressed each)
- Final join with 20 fact tables ~200 GB shuffle
| Join Type | Broadcast? | Shuffle? | Notes |
|---|
| dim join | โ
| โ | Use broadcast hint |
| big fact joins | โ | โ
| Use repartition on join keys |
๐น Final Write
- Partition by
month or region or cust_segment .write.mode("overwrite").partitionBy(...).parquet()- Output table size: 200โ300 GB uncompressed, ~15โ20 GB compressed/month
โ๏ธ Execution DAG (Wave-wise Estimation)
| Wave | Stage | Operation | Input Rows | Partitions | Time Est (min) |
|---|
| 1 | Stage 1 | Read + Filter | 25 Cr | 800 | 3โ4 min |
| 2 | Stage 2 | Pivot + Agg | 5 Cr | 1000 | 10โ15 min |
| 3 | Stage 3 | Window (QTR, 6M, 12M) | 5 Cr | 1200 | 15โ20 min |
| 4 | Stage 4 | MDAB + GroupBy | 30 Cr | 1600 | 12โ15 min |
| 5 | Stage 5 | Joins with Fact + Dim tables | 5 Cr | 2000 | 10โ12 min |
| 6 | Stage 6 | Final Write to Parquet | 5.5 Cr | 1000 | 3โ5 min |
| | Total Time Est | | | ~55 min |
๐งฐ Sample Spark Submit Command
spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 20 \
--executor-memory 10G \
--executor-cores 5 \
--conf spark.sql.shuffle.partitions=2000 \
--conf spark.sql.autoBroadcastJoinThreshold=50MB \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=64MB \
--conf spark.executor.memoryOverhead=2G \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
main_script.py
๐งช Sample Code (Core Sections)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, when, expr, month, year
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("Transactional ETL").getOrCreate()
# 1. Read data
trans_df = spark.read.parquet("/bdl/txn_data/2024_*.parquet")
# 2. Filter & Preprocess
trans_df = trans_df.filter(col("status") == "valid")
# 3. Pivot (Monthly Spend)
pivot_df = trans_df.groupBy("cust_id").pivot("month", list(range(1,13))).agg(sum("amount").alias("spend"))
# 4. Rolling Features
windowSpec = Window.partitionBy("cust_id").orderBy("txn_date").rowsBetween(-90, 0)
rolling_df = trans_df.withColumn("qtr_spend", sum("amount").over(windowSpec))
# 5. MDAB Rollup
mdab_df = trans_df.groupBy("cust_id", month("txn_date").alias("month")).agg(sum("daily_metric").alias("monthly_metric"))
# 6. Join with Lookup/Dim
final_df = pivot_df.join(mdab_df, ["cust_id"], "left")
final_df = final_df.join(dim_table, ["cust_id"], "left")
# 7. Write
final_df.write.mode("overwrite").partitionBy("month").parquet("/final_fact")
๐ง Tips to Debug & Monitor
- Use
spark.sql.shuffle.partitions wisely (target 64 MB per partition) - Enable Spark UI or History Server to review Stage Duration, Shuffle Size, Task Spill
- Use
df.explain(True) for physical DAG planning - Track GC time via Spark UI executor logs
- Use adaptive execution for late stage re-optimization
โ
Final Suggestions
| Area | Suggestion |
|---|
| Pivot / GroupBy | Repartition before heavy shuffles |
| Joins | Broadcast only if < 50 MB |
| File Format | Use Parquet with Snappy |
| Task Size Target | ~64โ100 MB (not too small) |
| Cache Layer | Avoid unless reused multiple times |
| Monitoring | Use Spark History Server for long-run stats |
Pages: 1 2 3