Let me give you a complete, inline and intuitive breakdown of how Spark processes your pipeline stage by stage — including data size, partition count, shuffle impact, memory usage, time estimates, and executor configuration.
This will simulate a real-world Spark job in Cloudera BDL using Parquet input → Transformations → Joins → Final write to HDFS/S3, operating on ~5.5 crore final customer rows.
💡 CONFIG OVERVIEW (Used for all Estimations)
Parameter | Value |
---|---|
Executors | 20 |
Executor Cores | 5 |
Executor Memory | 16 GB |
Total Executors | 20 |
Total Cores | 100 |
Parallelism | 800 (default) |
Memory Available (exec) | ~11 GB usable (after overheads) |
Compression Format | Parquet + Snappy |
Storage | HDFS (Cloudera BDL) or S3 |
⚙️ STAGE-WISE ANALYSIS: Memory, Shuffle, Time, Partitions
✅ STAGE 1: Read & Filter + Add Month Column
🔸 Action
txn_df = spark.read.parquet("txn/*.parquet")
txn_df = txn_df.withColumn("month", F.date_format("txn_date", "yyyy-MM"))
🔹 Details
- Input: 4 txn tables × 3B rows → ~1.2 TB compressed Parquet
- Filtered down to relevant columns (say ~30) via schema pruning
🔸 Memory & Execution
Aspect | Estimate |
---|---|
Partitions | ~2400 (based on input files or spark.sql.files.maxPartitionBytes ) |
Memory/Executor | ~1–2 GB per executor buffer |
Shuffle | ❌ No shuffle here (lazy transformations) |
Time | ~2–4 minutes to parse schema & infer partitioning |
Optimization | Use .select() to reduce I/O early |
✅ STAGE 2: GroupBy + Pivot on Month + txn_type
🔸 Action
pivot_df = txn_df.groupBy("cust_id", "month").pivot("txn_type").agg(F.sum("txn_amount"))
🔹 Details
- Input: 3B rows → grouped to ~6 crore rows
- Each group becomes ~120 columns after pivot (assuming 10 txn types over 12 months)
🔸 Memory & Execution
Aspect | Estimate |
---|---|
Partitions | 800–1200 (shuffle partitions) |
Shuffle | ✅ Yes — heavy shuffle |
Shuffle Size | ~200–300 GB (intermediate files) |
Peak Mem Needed | 8–10 GB per executor |
Time | 10–15 minutes |
Notes | Skew on cust_id can slow this down |
🔧 Tips
- Pre-partition by
cust_id
- Use AQE:
spark.sql.adaptive.enabled=true
- Monitor skewed partitions in Spark UI
✅ STAGE 3: Window Function (Rolling Average)
🔸 Action
window_spec = Window.partitionBy("cust_id").orderBy("month").rowsBetween(-2, 0)
pivot_df = pivot_df.withColumn("rolling_avg", F.avg("DEBIT").over(window_spec))
🔹 Details
- Operates on ~6 crore rows, sorted within each
cust_id
- Costly if
cust_id
cardinality is high or unbalanced
🔸 Memory & Execution
Aspect | Estimate |
---|---|
Partitions | Inherits previous (~800–1000) |
Shuffle | ✅ Yes — Spark sorts data |
Shuffle Size | 100–150 GB |
Peak Mem Needed | 6–8 GB per executor |
Time | 5–10 minutes |
✅ STAGE 4: Join with Dimension & Fact Tables
🔸 Action
enriched_df = pivot_df.join(broadcast(dim_df), "cust_id", "left")
🔹 Details
- Join ~6 crore rows with 20 small dim tables (broadcasted)
- Fact tables: 20 × ~6 crore rows → join selectively (e.g., left semi or left join on few tables)
🔸 Memory & Execution
Join Type | Estimate |
---|---|
Broadcast Join | If <10GB, no shuffle |
Fact Table Join | Shuffle if not broadcastable |
Total Joins | 20 joins: 12–15 broadcasted |
Partitions | Repartitioned by cust_id : 800–1000 |
Time | 10–12 minutes |
Shuffle | ~150–200 GB total (if some joins not broadcasted) |
Memory | 7–10 GB / executor if multiple joins happen together |
✅ STAGE 5: Final Transform + Filter
🔸 Action
final_df = enriched_df.filter(...).withColumn("flag", ...)
🔹 Details
- Adds 1–2 lightweight columns
- Applies filter on amount fields
🔸 Memory & Execution
Aspect | Estimate |
---|---|
Shuffle | ❌ No shuffle |
Memory | ~1 GB/executor |
Time | ~1–2 minutes |
✅ STAGE 6: Final Repartition + Write to Parquet
🔸 Action
final_df.repartition(200).write.mode("overwrite").parquet("final_fact/...")
🔹 Details
- Final row count: ~5.5 crore
- Final columns: ~150
- Output file size: ~20 GB/month
🔸 Memory & Execution
Aspect | Estimate |
---|---|
Output Partitions | 200 |
File Size | ~100 MB per part (Snappy Parquet) |
Write Time | ~3–5 minutes |
Shuffle | ✅ Yes — repartitioning involved |
Memory Needed | 4–6 GB per executor |
🧠 TOTAL PIPELINE SUMMARY
Stage | Shuffle | Time (est.) | Memory/Exec | Partitions | Shuffle Size |
---|---|---|---|---|---|
Read + Month | ❌ No | 2–4 min | ~2 GB | 2400 | — |
Pivot + GroupBy | ✅ Yes | 10–15 min | ~10 GB | 800–1200 | ~300 GB |
Window | ✅ Yes | 5–10 min | ~8 GB | 800 | ~150 GB |
Joins | ✅ Yes | 10–12 min | ~8–10 GB | 1000 | ~200 GB |
Filter + Columns | ❌ No | 2 min | ~2 GB | 1000 | — |
Final Write | ✅ Yes | 3–5 min | ~6 GB | 200 | ~20 GB Output |
🧪 CLUSTER SIZING RECOMMENDATION (20 Executors)
Config Key | Suggested Value |
---|---|
spark.executor.instances | 20 |
spark.executor.memory | 16G |
spark.executor.cores | 5 |
spark.sql.shuffle.partitions | 800–1200 |
spark.sql.adaptive.enabled | true |
spark.sql.autoBroadcastJoinThreshold | 50MB |
spark.memory.fraction | 0.6 |
✅ Final Output Characteristics
Metric | Value |
---|---|
Final Row Count | ~5.5 crore |
Final Columns | ~150 |
Compressed Row Size | ~40–60 bytes |
Output Format | Parquet + Snappy |
Output Monthly Size | ~15–20 GB |
Final Files | ~200 files (~100 MB each) |
Time Estimation–
Here’s how the time was estimated for each job/stage/wave in your Spark pipeline, based on reasonable production-like assumptions and formulas. Let’s break it down:
⚙️ Assumptions for Estimation
Parameter | Value | Notes |
---|---|---|
Executors | 20 | You mentioned max 20 |
Cores per executor | 5 | Typical on Cloudera BDL |
Total Cores | 100 | 20 x 5 |
Executor Memory | 16 GB | Common on BDL |
Available shuffle memory/executor | 6–8 GB | After overhead/reserved memory |
I/O Speed | 100 MB/s per executor | Disk read/write + shuffle varies |
Network Speed | 1 Gbps ~ 100 MB/s | Used during shuffles & joins |
Parquet Compression Ratio | ~3x | Common with Snappy |
⏱ Time Estimation Formula
For each stage:
Time (sec) = Total Input Size / (Number of Executors × Effective Read Speed per Executor)
If there’s shuffle or a wide transformation:
Time += Shuffle Size / (Number of Executors × Network Bandwidth per Executor)
For CPU-heavy jobs (e.g., window, pivot), we factor in CPU usage:
Time += Compute Factor × Number of Partitions / Total Cores
✅ Estimation Example Breakdown
Let’s go inline step-by-step through the transformation pipeline.
💾 Step 1: Read 12-Month Transaction Table (Parquet)
Metric | Value |
---|---|
Rows | 12 months × 25 Cr = 300 Cr |
Columns | 40 |
Raw Size (uncompressed) | ~2.4 TB |
Parquet (Snappy) | ~800 GB (3x compression) |
Input Partitions | 800 GB / 256 MB = 3200 partitions |
📌 Execution:
- Spark parallel read → uses 20 executors × 5 = 100 tasks in parallel
- Time = 800 GB / (100 exec × 100 MB/s) = 80 seconds
🧹 Step 2: Filter & Cleanse (Light Select + filter
)
| Output | ~60% rows retained |
| New Size | 180 Cr rows → ~480 GB compressed |
| Partitions | ~1900 |
📌 Execution:
- Minimal CPU
- Time = 480 GB / (100 × 100 MB/s) = ~48 seconds
🔄 Step 3: Pivot (10 columns × 12 months = 120 new cols)
| Rows After Pivot | 6 Cr |
| New Columns | 120 extra, total ~160 |
| New Size | ~15 GB compressed |
📌 Execution:
- CPU-heavy + wide transformation
- Needs shuffle
- Compute cost: 6 Cr / 100 cores = ~5–8 seconds
- Shuffle size: 15 GB → same as output size
- Shuffle time: 15 GB / (100 × 100 MB/s) = 1.5 seconds
- Total: ~10 seconds
🪟 Step 4: Window Functions (per cust_id over 12-month data)
| Input Rows | 6 Cr |
| Output | 6 Cr |
| Shuffle | Yes, needs full repartition by cust_id |
| Estimated Shuffle Size | ~15 GB |
📌 Execution:
- Repartition 6 Cr rows → 100 partitions
- Time = 15 GB / (100 × 100 MB/s) = 1.5 seconds
- Compute time = ~8 seconds
- Total: ~10 seconds
🔗 Step 5: Join with 4 Big Transactional Tables (each 5–6 Cr rows)
| Method | Broadcast or Shuffle join |
| Join Output | 6 Cr rows |
| Intermediate Join Shuffle Size | ~20–25 GB per join |
| Total Shuffle | ~100 GB |
📌 Execution:
- Time per join = 25 GB / (100 × 100 MB/s) = 2.5 sec
- For 4 joins: 2.5 × 4 = 10 seconds
- CPU = 6–8 seconds extra
- Total: ~18 seconds
📊 Step 6: Join with 20 Small Dim Tables (lookup)
| Type | Broadcast |
| Rows | 10K–50K each |
| Shuffle | No |
| Output | 6 Cr |
| Time | Negligible (~5–10 sec total) |
📦 Final Write: Save 6 Cr rows to Parquet
| Final Rows | 6 Cr |
| Final Size | ~20 GB |
| Time = 20 GB / (100 × 100 MB/s) = 2 seconds |
📉 Summary Table
Step | Input Size | Output Size | Shuffle | Time (sec) | Notes |
---|---|---|---|---|---|
Read 12M Parquet | 800 GB | 800 GB | No | 80 | Parallel read |
Filter | 800 → 480 GB | 480 GB | No | 48 | Minimal CPU |
Pivot | 480 GB | ~15 GB | Yes | 10 | CPU + shuffle |
Window | 15 GB | 15 GB | Yes | 10 | Per-key CPU |
4 Large Joins | ~15 GB | ~15 GB | Yes | 18 | Shuffle + CPU |
20 Dim Joins | – | – | Broadcast | 10 | Fast |
Write Parquet | 15 GB | 15 GB | No | 2 | Final save |
🕒 Total Time Estimate: ~180–200 seconds (~3 mins) end-to-end.
🧠 Notes on Memory Use
- During wide transformations (pivot, window, join), shuffle memory is critical.
- Each executor will need:
- 6–8 GB for shuffle buffers
- 2–3 GB for task computation
- Remainder for cached broadcast joins or temporary results.
Another Version With Some Additions–
- 12 months of pivoted data (~5–6 Cr rows)
- Multiple aggregation layers: quarterly, 6-month, yearly summaries
- Per-user metrics (cust_id level)
- 30–40 custom metric calculations using Window + Pivot + Filters
- Final write to HDFS/S3 in Parquet (Cloudera BDL)
I’ll now provide a complete estimation:
✅ Spark Job Runtime & Memory Analysis (Inline Estimation)
⚙️ Environment Configuration:
Parameter | Value |
---|---|
Executors | 20 |
Executor Cores | 5 |
Total Cores | 100 |
Executor Memory | 16 GB |
Executor Memory Overhead | 3 GB (approx) |
Shuffle Partitions | 3000 (default or tuned) |
Input Table Size (monthly) | 15–20 GB compressed Parquet (~100–120 GB in memory) |
Final Fact Table | 5.5 Cr unique cust_id |
📊 Job Breakdown and Stage Estimation
Stage | Description | Input Size (Uncompressed) | Output Size | Est. Shuffle Data | Est. Time | Memory Pressure |
---|---|---|---|---|---|---|
1. Read + Initial Filter | Read parquet from HDFS/S3, apply initial filters | 100–120 GB | 20–30 GB | ❌ | ~3–5 mins | Low |
2. Pivot by Month | Monthly pivot (e.g., txn_amt_01, txn_amt_02…) | 5 Cr rows | 5 Cr rows x 10–12 cols = ~5–8 GB | ❌ | ~5–7 mins | Medium |
3. Monthly Rolling Aggregates | Window functions per cust_id for 12-month, QTR, 6M | 5 Cr x 12 months | Intermediate ≈ 30–40 GB | 10–15 GB shuffle | ~12–18 mins | High (spill likely) |
4. Daily to Monthly MDAB Rollup | per-day customer data → monthly metrics | 5 Cr x 30 days = 150 Cr rows est. | Output ≈ 5 Cr x 12 cols = 5–8 GB | 10–15 GB | ~10–15 mins | High |
5. Additional 30–40 Custom Metrics | Complex aggregations (UDAF, window, filters) | From step 3 & 4 | ≈ 5–10 GB more | 20–30 GB shuffle | ~15–20 mins | Very High |
6. Join With Dim / Lookup / Other Fact | 20–30 joins with 5–8 Cr dimension/fact tables | Input ≈ 30–40 GB | Output ≈ 5.5 Cr rows | ~20–30 GB | ~10–15 mins | Medium–High |
7. Final Fact Write | Write 5.5 Cr rows Parquet to BDL | ~20–25 GB parquet | Compressed: 15–20 GB | ❌ | ~5–10 mins | Medium |
🧠 Estimated Total Memory Pressure
- Peak In-Memory Footprint per Executor: ~6–10 GB working set
- Shuffle Spill Risk: Medium to High
- Total Shuffle Data across stages: ~70–100 GB
- Temp Storage Needed (Shuffle + Spill): At least 100–150 GB on disk
- Ideal Executor Memory: 16 GB + 3 GB overhead
⏱️ Final Time Estimation Breakdown
Category | Est. Time |
---|---|
Read + Initial Filter | 5 mins |
Pivot + Aggregations + Rolling + Custom Metrics | 45–55 mins |
Joins | 10–15 mins |
Write Final Fact | 5–10 mins |
Total | ~60–75 mins |
⚠️ Note: This aligns with your observed time (~1 hour), especially due to:
- Wide aggregations per
cust_id
over 12 months × N metrics - Costly joins and large shuffle
- Memory spill due to heavy window/pivot stages
📌 Optimization Pointers (Realistic for Cloudera BDL)
- Use
.persist(StorageLevel.MEMORY_AND_DISK_SER)
on intermediate wide DataFrames to reduce recomputation. - Avoid wide shuffles during pivot by doing monthly batches if possible (e.g., process 3 months, union all).
- Repartition based on
cust_id
before window/pivot-heavy jobs to localize memory usage. - Use
broadcast()
joins for smaller dims/lookup tables < 500 MB. - Enable Spark SQL Adaptive Execution (AQE) if not enabled.
- Tune shuffle partitions based on cluster: for 100 cores, ~3000 is ideal.