How PySpark processes your pipeline – data size, partition count, shuffle impact, memory usage, time estimates, and executor configuration

Let me give you a complete, inline and intuitive breakdown of how Spark processes your pipeline stage by stage — including data size, partition count, shuffle impact, memory usage, time estimates, and executor configuration.

This will simulate a real-world Spark job in Cloudera BDL using Parquet input → Transformations → Joins → Final write to HDFS/S3, operating on ~5.5 crore final customer rows.


💡 CONFIG OVERVIEW (Used for all Estimations)

ParameterValue
Executors20
Executor Cores5
Executor Memory16 GB
Total Executors20
Total Cores100
Parallelism800 (default)
Memory Available (exec)~11 GB usable (after overheads)
Compression FormatParquet + Snappy
StorageHDFS (Cloudera BDL) or S3

⚙️ STAGE-WISE ANALYSIS: Memory, Shuffle, Time, Partitions


STAGE 1: Read & Filter + Add Month Column

🔸 Action

txn_df = spark.read.parquet("txn/*.parquet")
txn_df = txn_df.withColumn("month", F.date_format("txn_date", "yyyy-MM"))

🔹 Details

  • Input: 4 txn tables × 3B rows → ~1.2 TB compressed Parquet
  • Filtered down to relevant columns (say ~30) via schema pruning

🔸 Memory & Execution

AspectEstimate
Partitions~2400 (based on input files or spark.sql.files.maxPartitionBytes)
Memory/Executor~1–2 GB per executor buffer
Shuffle❌ No shuffle here (lazy transformations)
Time~2–4 minutes to parse schema & infer partitioning
OptimizationUse .select() to reduce I/O early

STAGE 2: GroupBy + Pivot on Month + txn_type

🔸 Action

pivot_df = txn_df.groupBy("cust_id", "month").pivot("txn_type").agg(F.sum("txn_amount"))

🔹 Details

  • Input: 3B rows → grouped to ~6 crore rows
  • Each group becomes ~120 columns after pivot (assuming 10 txn types over 12 months)

🔸 Memory & Execution

AspectEstimate
Partitions800–1200 (shuffle partitions)
Shuffle✅ Yes — heavy shuffle
Shuffle Size~200–300 GB (intermediate files)
Peak Mem Needed8–10 GB per executor
Time10–15 minutes
NotesSkew on cust_id can slow this down

🔧 Tips

  • Pre-partition by cust_id
  • Use AQE: spark.sql.adaptive.enabled=true
  • Monitor skewed partitions in Spark UI

STAGE 3: Window Function (Rolling Average)

🔸 Action

window_spec = Window.partitionBy("cust_id").orderBy("month").rowsBetween(-2, 0)
pivot_df = pivot_df.withColumn("rolling_avg", F.avg("DEBIT").over(window_spec))

🔹 Details

  • Operates on ~6 crore rows, sorted within each cust_id
  • Costly if cust_id cardinality is high or unbalanced

🔸 Memory & Execution

AspectEstimate
PartitionsInherits previous (~800–1000)
Shuffle✅ Yes — Spark sorts data
Shuffle Size100–150 GB
Peak Mem Needed6–8 GB per executor
Time5–10 minutes

STAGE 4: Join with Dimension & Fact Tables

🔸 Action

enriched_df = pivot_df.join(broadcast(dim_df), "cust_id", "left")

🔹 Details

  • Join ~6 crore rows with 20 small dim tables (broadcasted)
  • Fact tables: 20 × ~6 crore rows → join selectively (e.g., left semi or left join on few tables)

🔸 Memory & Execution

Join TypeEstimate
Broadcast JoinIf <10GB, no shuffle
Fact Table JoinShuffle if not broadcastable
Total Joins20 joins: 12–15 broadcasted
PartitionsRepartitioned by cust_id: 800–1000
Time10–12 minutes
Shuffle~150–200 GB total (if some joins not broadcasted)
Memory7–10 GB / executor if multiple joins happen together

STAGE 5: Final Transform + Filter

🔸 Action

final_df = enriched_df.filter(...).withColumn("flag", ...)

🔹 Details

  • Adds 1–2 lightweight columns
  • Applies filter on amount fields

🔸 Memory & Execution

AspectEstimate
Shuffle❌ No shuffle
Memory~1 GB/executor
Time~1–2 minutes

STAGE 6: Final Repartition + Write to Parquet

🔸 Action

final_df.repartition(200).write.mode("overwrite").parquet("final_fact/...")

🔹 Details

  • Final row count: ~5.5 crore
  • Final columns: ~150
  • Output file size: ~20 GB/month

🔸 Memory & Execution

AspectEstimate
Output Partitions200
File Size~100 MB per part (Snappy Parquet)
Write Time~3–5 minutes
Shuffle✅ Yes — repartitioning involved
Memory Needed4–6 GB per executor

🧠 TOTAL PIPELINE SUMMARY

StageShuffleTime (est.)Memory/ExecPartitionsShuffle Size
Read + Month❌ No2–4 min~2 GB2400
Pivot + GroupBy✅ Yes10–15 min~10 GB800–1200~300 GB
Window✅ Yes5–10 min~8 GB800~150 GB
Joins✅ Yes10–12 min~8–10 GB1000~200 GB
Filter + Columns❌ No2 min~2 GB1000
Final Write✅ Yes3–5 min~6 GB200~20 GB Output

🧪 CLUSTER SIZING RECOMMENDATION (20 Executors)

Config KeySuggested Value
spark.executor.instances20
spark.executor.memory16G
spark.executor.cores5
spark.sql.shuffle.partitions800–1200
spark.sql.adaptive.enabledtrue
spark.sql.autoBroadcastJoinThreshold50MB
spark.memory.fraction0.6

✅ Final Output Characteristics

MetricValue
Final Row Count~5.5 crore
Final Columns~150
Compressed Row Size~40–60 bytes
Output FormatParquet + Snappy
Output Monthly Size~15–20 GB
Final Files~200 files (~100 MB each)

Time Estimation–

Here’s how the time was estimated for each job/stage/wave in your Spark pipeline, based on reasonable production-like assumptions and formulas. Let’s break it down:


⚙️ Assumptions for Estimation

ParameterValueNotes
Executors20You mentioned max 20
Cores per executor5Typical on Cloudera BDL
Total Cores10020 x 5
Executor Memory16 GBCommon on BDL
Available shuffle memory/executor6–8 GBAfter overhead/reserved memory
I/O Speed100 MB/s per executorDisk read/write + shuffle varies
Network Speed1 Gbps ~ 100 MB/sUsed during shuffles & joins
Parquet Compression Ratio~3xCommon with Snappy

⏱ Time Estimation Formula

For each stage:

Time (sec) = Total Input Size / (Number of Executors × Effective Read Speed per Executor)

If there’s shuffle or a wide transformation:

Time += Shuffle Size / (Number of Executors × Network Bandwidth per Executor)

For CPU-heavy jobs (e.g., window, pivot), we factor in CPU usage:

Time += Compute Factor × Number of Partitions / Total Cores

✅ Estimation Example Breakdown

Let’s go inline step-by-step through the transformation pipeline.


💾 Step 1: Read 12-Month Transaction Table (Parquet)

MetricValue
Rows12 months × 25 Cr = 300 Cr
Columns40
Raw Size (uncompressed)~2.4 TB
Parquet (Snappy)~800 GB (3x compression)
Input Partitions800 GB / 256 MB = 3200 partitions

📌 Execution:

  • Spark parallel read → uses 20 executors × 5 = 100 tasks in parallel
  • Time = 800 GB / (100 exec × 100 MB/s) = 80 seconds

🧹 Step 2: Filter & Cleanse (Light Select + filter)

| Output | ~60% rows retained |
| New Size | 180 Cr rows → ~480 GB compressed |
| Partitions | ~1900 |

📌 Execution:

  • Minimal CPU
  • Time = 480 GB / (100 × 100 MB/s) = ~48 seconds

🔄 Step 3: Pivot (10 columns × 12 months = 120 new cols)

| Rows After Pivot | 6 Cr |
| New Columns | 120 extra, total ~160 |
| New Size | ~15 GB compressed |

📌 Execution:

  • CPU-heavy + wide transformation
  • Needs shuffle
  • Compute cost: 6 Cr / 100 cores = ~5–8 seconds
  • Shuffle size: 15 GB → same as output size
  • Shuffle time: 15 GB / (100 × 100 MB/s) = 1.5 seconds
  • Total: ~10 seconds

🪟 Step 4: Window Functions (per cust_id over 12-month data)

| Input Rows | 6 Cr |
| Output | 6 Cr |
| Shuffle | Yes, needs full repartition by cust_id |
| Estimated Shuffle Size | ~15 GB |

📌 Execution:

  • Repartition 6 Cr rows → 100 partitions
  • Time = 15 GB / (100 × 100 MB/s) = 1.5 seconds
  • Compute time = ~8 seconds
  • Total: ~10 seconds

🔗 Step 5: Join with 4 Big Transactional Tables (each 5–6 Cr rows)

| Method | Broadcast or Shuffle join |
| Join Output | 6 Cr rows |
| Intermediate Join Shuffle Size | ~20–25 GB per join |
| Total Shuffle | ~100 GB |

📌 Execution:

  • Time per join = 25 GB / (100 × 100 MB/s) = 2.5 sec
  • For 4 joins: 2.5 × 4 = 10 seconds
  • CPU = 6–8 seconds extra
  • Total: ~18 seconds

📊 Step 6: Join with 20 Small Dim Tables (lookup)

| Type | Broadcast |
| Rows | 10K–50K each |
| Shuffle | No |
| Output | 6 Cr |
| Time | Negligible (~5–10 sec total) |


📦 Final Write: Save 6 Cr rows to Parquet

| Final Rows | 6 Cr |
| Final Size | ~20 GB |
| Time = 20 GB / (100 × 100 MB/s) = 2 seconds |


📉 Summary Table

StepInput SizeOutput SizeShuffleTime (sec)Notes
Read 12M Parquet800 GB800 GBNo80Parallel read
Filter800 → 480 GB480 GBNo48Minimal CPU
Pivot480 GB~15 GBYes10CPU + shuffle
Window15 GB15 GBYes10Per-key CPU
4 Large Joins~15 GB~15 GBYes18Shuffle + CPU
20 Dim JoinsBroadcast10Fast
Write Parquet15 GB15 GBNo2Final save

🕒 Total Time Estimate: ~180–200 seconds (~3 mins) end-to-end.


🧠 Notes on Memory Use

  • During wide transformations (pivot, window, join), shuffle memory is critical.
  • Each executor will need:
    • 6–8 GB for shuffle buffers
    • 2–3 GB for task computation
    • Remainder for cached broadcast joins or temporary results.

Another Version With Some Additions–

  • 12 months of pivoted data (~5–6 Cr rows)
  • Multiple aggregation layers: quarterly, 6-month, yearly summaries
  • Per-user metrics (cust_id level)
  • 30–40 custom metric calculations using Window + Pivot + Filters
  • Final write to HDFS/S3 in Parquet (Cloudera BDL)

I’ll now provide a complete estimation:


✅ Spark Job Runtime & Memory Analysis (Inline Estimation)

⚙️ Environment Configuration:

ParameterValue
Executors20
Executor Cores5
Total Cores100
Executor Memory16 GB
Executor Memory Overhead3 GB (approx)
Shuffle Partitions3000 (default or tuned)
Input Table Size (monthly)15–20 GB compressed Parquet (~100–120 GB in memory)
Final Fact Table5.5 Cr unique cust_id

📊 Job Breakdown and Stage Estimation

StageDescriptionInput Size (Uncompressed)Output SizeEst. Shuffle DataEst. TimeMemory Pressure
1. Read + Initial FilterRead parquet from HDFS/S3, apply initial filters100–120 GB20–30 GB~3–5 minsLow
2. Pivot by MonthMonthly pivot (e.g., txn_amt_01, txn_amt_02…)5 Cr rows5 Cr rows x 10–12 cols = ~5–8 GB~5–7 minsMedium
3. Monthly Rolling AggregatesWindow functions per cust_id for 12-month, QTR, 6M5 Cr x 12 monthsIntermediate ≈ 30–40 GB10–15 GB shuffle~12–18 minsHigh (spill likely)
4. Daily to Monthly MDAB Rollupper-day customer data → monthly metrics5 Cr x 30 days = 150 Cr rows est.Output ≈ 5 Cr x 12 cols = 5–8 GB10–15 GB~10–15 minsHigh
5. Additional 30–40 Custom MetricsComplex aggregations (UDAF, window, filters)From step 3 & 4≈ 5–10 GB more20–30 GB shuffle~15–20 minsVery High
6. Join With Dim / Lookup / Other Fact20–30 joins with 5–8 Cr dimension/fact tablesInput ≈ 30–40 GBOutput ≈ 5.5 Cr rows~20–30 GB~10–15 minsMedium–High
7. Final Fact WriteWrite 5.5 Cr rows Parquet to BDL~20–25 GB parquetCompressed: 15–20 GB~5–10 minsMedium

🧠 Estimated Total Memory Pressure

  • Peak In-Memory Footprint per Executor: ~6–10 GB working set
  • Shuffle Spill Risk: Medium to High
  • Total Shuffle Data across stages: ~70–100 GB
  • Temp Storage Needed (Shuffle + Spill): At least 100–150 GB on disk
  • Ideal Executor Memory: 16 GB + 3 GB overhead

⏱️ Final Time Estimation Breakdown

CategoryEst. Time
Read + Initial Filter5 mins
Pivot + Aggregations + Rolling + Custom Metrics45–55 mins
Joins10–15 mins
Write Final Fact5–10 mins
Total~60–75 mins

⚠️ Note: This aligns with your observed time (~1 hour), especially due to:

  • Wide aggregations per cust_id over 12 months × N metrics
  • Costly joins and large shuffle
  • Memory spill due to heavy window/pivot stages

📌 Optimization Pointers (Realistic for Cloudera BDL)

  1. Use .persist(StorageLevel.MEMORY_AND_DISK_SER) on intermediate wide DataFrames to reduce recomputation.
  2. Avoid wide shuffles during pivot by doing monthly batches if possible (e.g., process 3 months, union all).
  3. Repartition based on cust_id before window/pivot-heavy jobs to localize memory usage.
  4. Use broadcast() joins for smaller dims/lookup tables < 500 MB.
  5. Enable Spark SQL Adaptive Execution (AQE) if not enabled.
  6. Tune shuffle partitions based on cluster: for 100 cores, ~3000 is ideal.

Pages: 1 2 3

Subscribe