Question:-
βWhat really happens inside the Spark engine when I run a simple .read()
or .join()
on a file?β
Let me break this down in a clear, interview-ready, cluster-level Spark execution flow, step-by-step:
π Spark Cluster Background Process (Example: spark.read.csv(...)
)
Imagine this code:
df = spark.read.csv("/path/to/1GB_file.csv")
df2 = df.repartition(16).groupBy("col").count()
df2.write.mode("overwrite").parquet("/output")
Letβs analyze it in chronological order:
β 1. Driver Program Starts the Spark Application
- Your code is submitted to Spark Driver.
- The Driver:
- Initializes a SparkSession
- Connects to the Cluster Manager (YARN, Kubernetes, or Spark Standalone)
- Requests resources β Executors
β 2. Cluster Manager Allocates Executors
Component | Description |
---|---|
Driver | Main controller that coordinates the job |
Executors | Workers that do the actual task processing |
Cluster Manager | Allocates resources like executors/cores/memory |
Example:
- You request: 4 Executors Γ 4 Cores Γ 8 GB RAM
- Cluster Manager provisions them and tells Driver: βReady!β
β 3. Logical Plan β Physical Plan
When you run:
df = spark.read.csv("/path/to/1GB_file.csv")
Spark performs:
- Logical Plan: Describes “what” you want
- Optimized Logical Plan: Catalyst optimizer prunes & simplifies
- Physical Plan: Describes “how” to execute β e.g., 8 file splits β 8 tasks
β 4. Spark Reads the File
- File is split based on:
- HDFS block size (e.g., 128MB)
- Number of input files
- Each split β 1 task
- Spark schedules these tasks on available executor cores
β³ Stage 0 β Task 0 to Task 7 (1 per file split)
β 5. Transformation (Repartition + GroupBy)
df2 = df.repartition(16).groupBy("col").count()
repartition(16)
β full shufflegroupBy("col").count()
β triggers another wide transformation (needs data redistribution)- Spark inserts ShuffleStage in physical plan
π§ Result:
- Stage 1 =
repartition(16)
- Stage 2 =
groupBy/count()
β another shuffle - Number of tasks = based on
spark.sql.shuffle.partitions
(default = 200)
β 6. Write Output
df2.write.parquet("/output")
- Final stage:
Stage 3
- 200 tasks β write output into 200 Parquet files (one per partition)
- Can be optimized by
.coalesce()
before writing
β 7. Stages, Jobs, Tasks Breakdown
Stage | Action | Example Task Count |
---|---|---|
0 | Read CSV (input splits) | 8 (for 1 GB) |
1 | Repartition | 16 |
2 | GroupBy β Shuffle β Aggregation | 200 (default shuffle partitions) |
3 | Write to Parquet | 200 |
Each stage = one boundary of shuffle
Each stage = set of tasks
β 8. Task Scheduling Across Executors
If you have:
- 4 Executors Γ 4 Cores = 16 parallel tasks
- 200 tasks β 12.5 task waves
Execution:
Round 1: Task 0β15 β Run on all cores
Round 2: Task 16β31 β Next wave
...
Round 13: Task 192β199 β Final wave
β 9. How Fault Tolerance Works in Spark
- Spark stores RDD lineage β Directed Acyclic Graph (DAG)
- If a task fails:
- Spark retries (default = 4 times)
- Only the failed task is re-executed β not the whole job
- If an executor fails:
- Cluster manager re-launches a new executor
- Spark reschedules pending tasks
β‘οΈ No data loss due to:
- Lazy evaluation
- Immutability
- DAG recomputation
β 10. Monitoring the Whole Process (Spark UI)
Tab | What You See |
---|---|
Jobs | Total jobs triggered (1 action = 1 job) |
Stages | Shuffle boundaries, task counts |
Tasks | Task status, run time, executor used |
Storage | Cached/persisted RDDs/DFs |
Executors | CPU/memory usage, number of tasks |
π Visual Flow Summary
Driver Program
β
Cluster Manager β Executors
β
Logical Plan β Optimized Plan β Physical Plan
β
Stage 0: Read File β 8 Tasks
β
Stage 1: Repartition β 16 Tasks
β
Stage 2: GroupBy/Count β 200 Tasks (Shuffle)
β
Stage 3: Write Parquet β 200 Tasks
β Conclusion
Component | Description |
---|---|
Driver | Orchestrates job, breaks into DAG |
Executors | Run tasks and store shuffle data |
Stages | Created at shuffle boundaries |
Tasks | One unit of execution per input split or partition |
Jobs | Triggered per action (.show() , .count() , .write() ) |
Fault Tolerance | Achieved via task retry & DAG recomputation |
# π Advanced Spark Execution Flow with Code
# Covers: caching, broadcast joins, shuffle stages, write modes, and checkpointing
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
spark = SparkSession.builder \
.appName("AdvancedSparkFlow") \
.config("spark.sql.shuffle.partitions", 8) \
.getOrCreate()
# --------------------------------------------
# β
Step 1: Load Data & Observe Initial Stages
# --------------------------------------------
# Let's load a large dataset (~100MB)
df_large = spark.range(10_000_000).withColumnRenamed("id", "user_id")
df_large.write.mode("overwrite").parquet("/tmp/large_users")
# Small lookup table
df_lookup = spark.createDataFrame([(i, f"Name_{i}") for i in range(1000)], ["user_id", "name"])
df_lookup.write.mode("overwrite").parquet("/tmp/lookup")
# Read both
df1 = spark.read.parquet("/tmp/large_users")
df2 = spark.read.parquet("/tmp/lookup")
# --------------------------------------------
# β
Step 2: Caching the Large DataFrame
# --------------------------------------------
# Cache the large table
# This will appear in Spark UI -> "Storage" tab
df1.cache()
df1.count() # Action to materialize the cache
# You can verify with:
# spark.catalog.isCached("<table/view>") β for SQL views
# --------------------------------------------
# β
Step 3: Broadcast Join Optimization
# --------------------------------------------
# Force a broadcast join
joined = df1.join(broadcast(df2), on="user_id", how="inner")
# Another way Spark broadcasts automatically:
# spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10 * 1024 * 1024) # 10 MB
# View in Spark UI > SQL > Join plan shows BroadcastHashJoin
# --------------------------------------------
# β
Step 4: Understanding Stages
# --------------------------------------------
# Triggers multiple stages:
# - Stage 0: read & broadcast lookup
# - Stage 1: read + cached large file (df1)
# - Stage 2: join + write
# Let's perform an aggregation after join
agg_df = joined.groupBy("name").count()
# --------------------------------------------
# β
Step 5: Saving the Output
# --------------------------------------------
# Write output with partitioning
agg_df.write.mode("overwrite").partitionBy("name").parquet("/tmp/output_partitioned")
# Use .coalesce(1) to reduce output files:
# agg_df.coalesce(1).write.mode("overwrite").parquet("/tmp/output_single_file")
# --------------------------------------------
# β
Step 6: Checkpointing (for lineage cutoff)
# --------------------------------------------
# Must set checkpoint directory
spark.sparkContext.setCheckpointDir("/tmp/checkpoint")
# Apply checkpointing
chk_df = agg_df.checkpoint()
chk_df.count()
# Why checkpoint? Cuts off lineage, avoids long DAG recomputation
# --------------------------------------------
# β
Bonus: View Execution Plans
# --------------------------------------------
# Logical + physical plan
joined.explain(True)
# Check broadcast
# Look for "BroadcastHashJoin" in explain output
# --------------------------------------------
# β
Recap:
# - Caching: `df.cache()` and trigger an action
# - Broadcast Join: use `broadcast(df)` or auto if <10MB
# - Shuffle Stages: triggered by wide transformations (join, groupBy)
# - Checkpointing: cuts lineage, must call `setCheckpointDir()`
# - Writing: control file count using `.coalesce()` and `partitionBy()`
# Monitor in Spark UI β Jobs, Stages, Storage, SQL tabs
Leave a Reply