๐ Complete Spark Job Memory + Performance Analysis (Based on Your Use Case)
๐งฑ Context Overview
- Transactional Tables: 4 large tables (12 months), each month: ~25 Cr rows
- Additional Fact Tables: 20 tables, 5โ8 Cr rows each
- Dimensional + Lookup Tables: ~20 small-medium tables
- Final Fact Table Goal: ~5.5 Cr unique
cust_id
- Transformations:
.pivot()
, .groupBy()
, .withColumn()
, .window
, .join
, .filter
, aggregation (quarter, 6m, 12m rollups), MDAB daily โ monthly - Output Storage: Parquet in Cloudera BDL (on HDFS or S3)
- Max Executors Available: 20
๐ง Stepwise Processing Breakdown (Memory + Shuffle + Partition Estimation)
๐น Source Data Read (from Parquet or ORC)
Step | Table Type | Size per Month | Rows | Columns | Read Size (Compressed) | Read Size (In-Memory) |
---|
1 | Transactional | ~20 GB | 25 Cr | 40 | ~20 GB | ~80โ100 GB (uncompressed/parquet overhead) |
- Memory used: Disk to RAM load (~100 GB)
- Suggest:
.persist(StorageLevel.MEMORY_AND_DISK_SER)
only after heavy compute ops - Executors’ off-heap memory is not used here
๐น Data Cleaning / Filtering
- Column pruning, bad records dropped, null checks etc.
- Approx retained rows: ~5 Cr per month
- No shuffle yet
Config Suggestion | Value |
---|
Input Partitions | 500โ800 |
Executor Cores | 5 |
Executor Memory | 8โ12 GB |
Num Executors | 20 |
๐น Pivot (e.g., .groupBy(cust_id).pivot(month_col).agg(sum(...))
)
- Pivot on month โ expands 12 new columns per metric (10 columns ร 12 months)
- Internally causes wide shuffle
Operation | Shuffle Size | Est Memory Usage | Notes |
---|
Pivot | ~50โ60 GB | ~6โ8 GB / executor | Spill likely if not cached |
๐น Feature Aggregation (Quarterly, 6-Month, 12-Month Rollups)
- Window functions (
partitionBy('cust_id')
) + date filtering - Multi-pass aggregations
- Causes another stage with groupByKey shuffle
Window Step | Records | Intermediate Shuffle | Execution Notes |
---|
QTR | 5 Cr | ~20 GB | Narrow-dependency window |
6 Month | 5 Cr | ~30 GB | Wide shuffle |
12 Month | 5 Cr | ~40 GB | Most expensive |
๐น MDAB Rollups (Daily โ Monthly)
- Uses
.groupBy(cust_id, month).agg(...)
- Another shuffle stage
Records | Grouping Keys | Shuffle Est | Memory Req |
---|
~30 Cr | cust_id+month | ~50 GB | 6โ8 GB |
๐น Join with Fact/Dim Tables
- Mostly left joins with small lookup and fact tables (20โ100 MB compressed each)
- Final join with 20 fact tables ~200 GB shuffle
Join Type | Broadcast? | Shuffle? | Notes |
---|
dim join | โ
| โ | Use broadcast hint |
big fact joins | โ | โ
| Use repartition on join keys |
๐น Final Write
- Partition by
month
or region
or cust_segment
.write.mode("overwrite").partitionBy(...).parquet()
- Output table size: 200โ300 GB uncompressed, ~15โ20 GB compressed/month
โ๏ธ Execution DAG (Wave-wise Estimation)
Wave | Stage | Operation | Input Rows | Partitions | Time Est (min) |
---|
1 | Stage 1 | Read + Filter | 25 Cr | 800 | 3โ4 min |
2 | Stage 2 | Pivot + Agg | 5 Cr | 1000 | 10โ15 min |
3 | Stage 3 | Window (QTR, 6M, 12M) | 5 Cr | 1200 | 15โ20 min |
4 | Stage 4 | MDAB + GroupBy | 30 Cr | 1600 | 12โ15 min |
5 | Stage 5 | Joins with Fact + Dim tables | 5 Cr | 2000 | 10โ12 min |
6 | Stage 6 | Final Write to Parquet | 5.5 Cr | 1000 | 3โ5 min |
| | Total Time Est | | | ~55 min |
๐งฐ Sample Spark Submit Command
spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 20 \
--executor-memory 10G \
--executor-cores 5 \
--conf spark.sql.shuffle.partitions=2000 \
--conf spark.sql.autoBroadcastJoinThreshold=50MB \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=64MB \
--conf spark.executor.memoryOverhead=2G \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
main_script.py
๐งช Sample Code (Core Sections)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, when, expr, month, year
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("Transactional ETL").getOrCreate()
# 1. Read data
trans_df = spark.read.parquet("/bdl/txn_data/2024_*.parquet")
# 2. Filter & Preprocess
trans_df = trans_df.filter(col("status") == "valid")
# 3. Pivot (Monthly Spend)
pivot_df = trans_df.groupBy("cust_id").pivot("month", list(range(1,13))).agg(sum("amount").alias("spend"))
# 4. Rolling Features
windowSpec = Window.partitionBy("cust_id").orderBy("txn_date").rowsBetween(-90, 0)
rolling_df = trans_df.withColumn("qtr_spend", sum("amount").over(windowSpec))
# 5. MDAB Rollup
mdab_df = trans_df.groupBy("cust_id", month("txn_date").alias("month")).agg(sum("daily_metric").alias("monthly_metric"))
# 6. Join with Lookup/Dim
final_df = pivot_df.join(mdab_df, ["cust_id"], "left")
final_df = final_df.join(dim_table, ["cust_id"], "left")
# 7. Write
final_df.write.mode("overwrite").partitionBy("month").parquet("/final_fact")
๐ง Tips to Debug & Monitor
- Use
spark.sql.shuffle.partitions
wisely (target 64 MB per partition) - Enable Spark UI or History Server to review Stage Duration, Shuffle Size, Task Spill
- Use
df.explain(True)
for physical DAG planning - Track GC time via Spark UI executor logs
- Use adaptive execution for late stage re-optimization
โ
Final Suggestions
Area | Suggestion |
---|
Pivot / GroupBy | Repartition before heavy shuffles |
Joins | Broadcast only if < 50 MB |
File Format | Use Parquet with Snappy |
Task Size Target | ~64โ100 MB (not too small) |
Cache Layer | Avoid unless reused multiple times |
Monitoring | Use Spark History Server for long-run stats |
Pages: 1 2 3