Here is a detailed step-by-step lineage of how a Spark job is initiated, executed, and managed in a cluster, including how components interact, programs are called, memory is allocated, and executors are assigned.
๐ฅ Apache Spark Job Execution โ Complete Lifecycle & Architecture
๐ Step 0: Your Program โ Spark Application Code
You write a PySpark or Scala/Java application like:
spark = SparkSession.builder.appName("MyJob").getOrCreate()
df = spark.read.csv("input.csv")
result = df.filter(...).groupBy(...).agg(...)
result.write.parquet("output/")
This script constitutes a Spark Application.
โ Step 1: Driver Process Launch
- The driver is the master JVM process that initiates SparkContext and orchestrates the entire job.
- This is where the
SparkSession
lives. - Driver responsibilities:
- Reads your code
- Builds DAG of transformations
- Converts to physical plan
- Schedules and coordinates executors
โ Step 2: Cluster Manager Interaction
When you call .getOrCreate()
, Spark communicates with a cluster manager:
Cluster Manager | Description |
---|---|
Standalone | Built-in Spark cluster manager |
YARN (Hadoop) | External cluster manager (very common) |
Kubernetes | Container-based cluster manager |
Mesos (rare now) | Another general-purpose manager |
The cluster manager allocates:
- 1 driver container/process
- N executor containers/processes
โ๏ธ Resource Allocation Parameters
--num-executors 4
--executor-memory 4G
--executor-cores 2
โ Step 3: SparkContext + DAG Creation
Logical Plan
- Spark creates a DAG (Directed Acyclic Graph) of transformations (e.g., map, filter, groupBy).
- This is the logical plan: it represents what to do, not how to do it.
โ Step 4: DAG Scheduler โ Stage Creation
- Spark divides the DAG into stages based on shuffle boundaries.
- Narrow transformations (map, filter): stay in same stage.
- Wide transformations (groupBy, join): new stage, causes a shuffle.
- Each stage is broken into tasks โ one task per partition.
Example:
df = spark.read.csv("...").filter(...).groupBy(...).count()
Stages:
- Stage 1: Read & Filter
- Stage 2: GroupBy (shuffle required)
โ Step 5: TaskScheduler โ Task Execution
The TaskScheduler sends tasks to executors.
๐น Executors
- JVM processes launched on worker nodes.
- Run tasks assigned by the driver.
- Store data in memory/disk using block manager.
- Cache/persist RDDs/DataFrames.
- Communicate results back to driver.
Each executor has:
- N cores (
executor-cores
) - M memory (
executor-memory
)
โ Step 6: Execution Flow
- Driver sends tasks to executors.
- Executors run tasks in parallel (one per core).
- Intermediate data is:
- Held in memory (if caching)
- Written to disk (if shuffling)
- Shuffled data is pulled by downstream stages.
โ Step 7: Memory Management
Spark divides executor memory:
Memory Pool | Purpose |
---|---|
Execution Memory | For shuffles, joins, aggregations |
Storage Memory | For caching/persisting data |
User Memory | For user variables |
Reserved Memory | Always reserved internally |
In Spark 3.x (Unified Memory Manager), execution and storage share memory dynamically.
Great! Let’s now deep dive into Step 7: Memory Allocation per Executor in Spark, including how memory is calculated, what config values matter, and how dynamic memory sharing works in Sparkโs Unified Memory Manager.
๐ฆ Step 7: Memory Allocation per Executor (in Spark 3.x+)
๐ฅ What Is Executor Memory?
Each Spark executor runs on a worker node and has a JVM with its own memory space.
This memory is divided into:
Memory Category | Purpose |
---|---|
Reserved Memory | Safety buffer Spark always keeps (not user configurable) |
User Memory | For user variables, closures, data structures (10%) |
Spark Memory | Split into two types (combined as Unified Memory) |
– Execution Memory | Used for joins, aggregations, sorting, shuffles |
– Storage Memory | Used for caching (df.cache() , persist() ) |
โ Key Configuration Parameters
Parameter | Description | Default |
---|---|---|
spark.executor.memory | Total memory allocated per executor | (e.g., 4g ) |
spark.memory.fraction | Fraction of usable memory for Spark (exec+storage) | 0.6 |
spark.memory.storageFraction | Portion of Spark memory reserved for storage | 0.5 of Spark mem |
spark.executor.memoryOverhead | Extra memory for non-JVM stuff (shuffle, Python) | auto (max(384 MB, 0.1 * executor mem)) |
spark.driver.memory | Memory allocated to driver process | (e.g., 4g ) |
๐งฎ How Spark Memory Is Calculated
Letโs assume:
spark.executor.memory = 4g
๐ข Breakdown:
Type | Formula | Amount (Approx) |
---|---|---|
Reserved | Fixed (~300MB) | ~300 MB |
Usable JVM Heap | 4g - reserved | ~3.7 GB |
Unified Spark Memory | spark.memory.fraction ร usable | 0.6 ร 3.7 GB โ 2.2 GB |
โ Execution Memory | 50% of Unified Memory | ~1.1 GB |
โ Storage Memory | 50% of Unified Memory | ~1.1 GB |
User Memory | remaining JVM heap (1 - fraction) | ~1.5 GB |
Andโฆ
| Non-heap Overhead | spark.executor.memoryOverhead
| ~384 MB (default) |
โ Unified Memory Manager (Spark 2.0+)
Execution and storage memory are not fixed:
- If your job does no caching, storage memory is released to execution.
- If execution needs memory and storage is underused โ it borrows it.
- If caching is needed and there’s space โ storage can grow.
- Eviction happens if execution needs more and storage has cached data.
๐ Dynamic Memory Sharing
Scenario A: Execution-intensive stage
- Sorts, joins, shuffles โ use most of Unified Memory
- Spark evicts cached data if needed (LRU policy)
Scenario B: Heavy caching
- Many
.cache()
or.persist()
calls - Execution will fail if it canโt get enough memory (OOM)
โ Memory Tuning Tips
Issue | Fix / Action |
---|---|
Frequent spills | Increase executor.memory or memory.fraction |
OOM during joins/sorts | Tune spark.sql.shuffle.partitions , avoid skew |
Cached data eviction | Use .persist(DISK_ONLY) or increase executor memory |
Python jobs (PySpark) | Increase spark.executor.memoryOverhead |
YARN/K8s container OOM | Sum of memory + overhead must fit inside container limit |
๐ง Spark UI & Monitoring
- Executor tab: shows memory used for storage, execution, overhead.
- Storage tab: shows cached RDD/DataFrame sizes.
- Task tab: shows spills to disk (bad sign = not enough memory).
๐งช Example:
spark-submit \
--executor-memory 4G \
--executor-cores 4 \
--conf spark.memory.fraction=0.6 \
--conf spark.memory.storageFraction=0.5 \
--conf spark.executor.memoryOverhead=512M \
app.py
Memory layout:
4 GB executor heap
- 300 MB reserved
= 3.7 GB usable
0.6 ร 3.7 = 2.2 GB Unified Memory
โ 1.1 GB Execution
โ 1.1 GB Storage
~ 1.5 GB User Memory
+ 512 MB memoryOverhead (off-heap)
๐ Summary
Component | Description |
---|---|
Reserved Memory | System overhead (~300MB) |
User Memory | Non-Spark JVM usage |
Unified Memory | Execution + Storage |
Execution Memory | Joins, aggregates, shuffles |
Storage Memory | Caching and broadcast variables |
Memory Overhead | Off-heap: shuffle buffers, Python |