How PySpark processes your pipeline – data size, partition count, shuffle impact, memory usage, time estimates, and executor configuration

๐Ÿ” Complete Spark Job Memory + Performance Analysis (Based on Your Use Case)

๐Ÿงฑ Context Overview

  • Transactional Tables: 4 large tables (12 months), each month: ~25 Cr rows
  • Additional Fact Tables: 20 tables, 5โ€“8 Cr rows each
  • Dimensional + Lookup Tables: ~20 small-medium tables
  • Final Fact Table Goal: ~5.5 Cr unique cust_id
  • Transformations: .pivot(), .groupBy(), .withColumn(), .window, .join, .filter, aggregation (quarter, 6m, 12m rollups), MDAB daily โ†’ monthly
  • Output Storage: Parquet in Cloudera BDL (on HDFS or S3)
  • Max Executors Available: 20

๐Ÿง  Stepwise Processing Breakdown (Memory + Shuffle + Partition Estimation)

๐Ÿ”น Source Data Read (from Parquet or ORC)

StepTable TypeSize per MonthRowsColumnsRead Size (Compressed)Read Size (In-Memory)
1Transactional~20 GB25 Cr40~20 GB~80โ€“100 GB (uncompressed/parquet overhead)
  • Memory used: Disk to RAM load (~100 GB)
  • Suggest: .persist(StorageLevel.MEMORY_AND_DISK_SER) only after heavy compute ops
  • Executors’ off-heap memory is not used here

๐Ÿ”น Data Cleaning / Filtering

  • Column pruning, bad records dropped, null checks etc.
  • Approx retained rows: ~5 Cr per month
  • No shuffle yet
Config SuggestionValue
Input Partitions500โ€“800
Executor Cores5
Executor Memory8โ€“12 GB
Num Executors20

๐Ÿ”น Pivot (e.g., .groupBy(cust_id).pivot(month_col).agg(sum(...)))

  • Pivot on month โ†’ expands 12 new columns per metric (10 columns ร— 12 months)
  • Internally causes wide shuffle
OperationShuffle SizeEst Memory UsageNotes
Pivot~50โ€“60 GB~6โ€“8 GB / executorSpill likely if not cached

๐Ÿ”น Feature Aggregation (Quarterly, 6-Month, 12-Month Rollups)

  • Window functions (partitionBy('cust_id')) + date filtering
  • Multi-pass aggregations
  • Causes another stage with groupByKey shuffle
Window StepRecordsIntermediate ShuffleExecution Notes
QTR5 Cr~20 GBNarrow-dependency window
6 Month5 Cr~30 GBWide shuffle
12 Month5 Cr~40 GBMost expensive

๐Ÿ”น MDAB Rollups (Daily โ†’ Monthly)

  • Uses .groupBy(cust_id, month).agg(...)
  • Another shuffle stage
RecordsGrouping KeysShuffle EstMemory Req
~30 Crcust_id+month~50 GB6โ€“8 GB

๐Ÿ”น Join with Fact/Dim Tables

  • Mostly left joins with small lookup and fact tables (20โ€“100 MB compressed each)
  • Final join with 20 fact tables ~200 GB shuffle
Join TypeBroadcast?Shuffle?Notes
dim joinโœ…โŒUse broadcast hint
big fact joinsโŒโœ…Use repartition on join keys

๐Ÿ”น Final Write

  • Partition by month or region or cust_segment
  • .write.mode("overwrite").partitionBy(...).parquet()
  • Output table size: 200โ€“300 GB uncompressed, ~15โ€“20 GB compressed/month

โ›“๏ธ Execution DAG (Wave-wise Estimation)

WaveStageOperationInput RowsPartitionsTime Est (min)
1Stage 1Read + Filter25 Cr8003โ€“4 min
2Stage 2Pivot + Agg5 Cr100010โ€“15 min
3Stage 3Window (QTR, 6M, 12M)5 Cr120015โ€“20 min
4Stage 4MDAB + GroupBy30 Cr160012โ€“15 min
5Stage 5Joins with Fact + Dim tables5 Cr200010โ€“12 min
6Stage 6Final Write to Parquet5.5 Cr10003โ€“5 min
Total Time Est~55 min

๐Ÿงฐ Sample Spark Submit Command

spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --num-executors 20 \
  --executor-memory 10G \
  --executor-cores 5 \
  --conf spark.sql.shuffle.partitions=2000 \
  --conf spark.sql.autoBroadcastJoinThreshold=50MB \
  --conf spark.sql.adaptive.enabled=true \
  --conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=64MB \
  --conf spark.executor.memoryOverhead=2G \
  --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
  main_script.py

๐Ÿงช Sample Code (Core Sections)

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, when, expr, month, year
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("Transactional ETL").getOrCreate()

# 1. Read data
trans_df = spark.read.parquet("/bdl/txn_data/2024_*.parquet")

# 2. Filter & Preprocess
trans_df = trans_df.filter(col("status") == "valid")

# 3. Pivot (Monthly Spend)
pivot_df = trans_df.groupBy("cust_id").pivot("month", list(range(1,13))).agg(sum("amount").alias("spend"))

# 4. Rolling Features
windowSpec = Window.partitionBy("cust_id").orderBy("txn_date").rowsBetween(-90, 0)
rolling_df = trans_df.withColumn("qtr_spend", sum("amount").over(windowSpec))

# 5. MDAB Rollup
mdab_df = trans_df.groupBy("cust_id", month("txn_date").alias("month")).agg(sum("daily_metric").alias("monthly_metric"))

# 6. Join with Lookup/Dim
final_df = pivot_df.join(mdab_df, ["cust_id"], "left")
final_df = final_df.join(dim_table, ["cust_id"], "left")

# 7. Write
final_df.write.mode("overwrite").partitionBy("month").parquet("/final_fact")

๐Ÿง  Tips to Debug & Monitor

  • Use spark.sql.shuffle.partitions wisely (target 64 MB per partition)
  • Enable Spark UI or History Server to review Stage Duration, Shuffle Size, Task Spill
  • Use df.explain(True) for physical DAG planning
  • Track GC time via Spark UI executor logs
  • Use adaptive execution for late stage re-optimization

โœ… Final Suggestions

AreaSuggestion
Pivot / GroupByRepartition before heavy shuffles
JoinsBroadcast only if < 50 MB
File FormatUse Parquet with Snappy
Task Size Target~64โ€“100 MB (not too small)
Cache LayerAvoid unless reused multiple times
MonitoringUse Spark History Server for long-run stats

Pages: 1 2 3

Subscribe