Here is a detailed step-by-step lineage of how a Spark job is initiated, executed, and managed in a cluster, including how components interact, programs are called, memory is allocated, and executors are assigned.
๐ฅ Apache Spark Job Execution โ Complete Lifecycle & Architecture
๐ Step 0: Your Program โ Spark Application Code
You write a PySpark or Scala/Java application like:
spark = SparkSession.builder.appName("MyJob").getOrCreate()
df = spark.read.csv("input.csv")
result = df.filter(...).groupBy(...).agg(...)
result.write.parquet("output/")
This script constitutes a Spark Application.
โ Step 1: Driver Process Launch
- The driver is the master JVM process that initiates SparkContext and orchestrates the entire job.
- This is where the
SparkSession
lives. - Driver responsibilities:
- Reads your code
- Builds DAG of transformations
- Converts to physical plan
- Schedules and coordinates executors
โ Step 2: Cluster Manager Interaction
When you call .getOrCreate()
, Spark communicates with a cluster manager:
Cluster Manager | Description |
---|---|
Standalone | Built-in Spark cluster manager |
YARN (Hadoop) | External cluster manager (very common) |
Kubernetes | Container-based cluster manager |
Mesos (rare now) | Another general-purpose manager |
The cluster manager allocates:
- 1 driver container/process
- N executor containers/processes
โ๏ธ Resource Allocation Parameters
--num-executors 4
--executor-memory 4G
--executor-cores 2
โ Step 3: SparkContext + DAG Creation
Logical Plan
- Spark creates a DAG (Directed Acyclic Graph) of transformations (e.g., map, filter, groupBy).
- This is the logical plan: it represents what to do, not how to do it.
โ Step 4: DAG Scheduler โ Stage Creation
- Spark divides the DAG into stages based on shuffle boundaries.
- Narrow transformations (map, filter): stay in same stage.
- Wide transformations (groupBy, join): new stage, causes a shuffle.
- Each stage is broken into tasks โ one task per partition.
Example:
df = spark.read.csv("...").filter(...).groupBy(...).count()
Stages:
- Stage 1: Read & Filter
- Stage 2: GroupBy (shuffle required)
โ Step 5: TaskScheduler โ Task Execution
The TaskScheduler sends tasks to executors.
๐น Executors
- JVM processes launched on worker nodes.
- Run tasks assigned by the driver.
- Store data in memory/disk using block manager.
- Cache/persist RDDs/DataFrames.
- Communicate results back to driver.
Each executor has:
- N cores (
executor-cores
) - M memory (
executor-memory
)
โ Step 6: Execution Flow
- Driver sends tasks to executors.
- Executors run tasks in parallel (one per core).
- Intermediate data is:
- Held in memory (if caching)
- Written to disk (if shuffling)
- Shuffled data is pulled by downstream stages.
โ Step 7: Memory Management
Spark divides executor memory:
Memory Pool | Purpose |
---|---|
Execution Memory | For shuffles, joins, aggregations |
Storage Memory | For caching/persisting data |
User Memory | For user variables |
Reserved Memory | Always reserved internally |
In Spark 3.x (Unified Memory Manager), execution and storage share memory dynamically.
Let’s now deep dive into Step 7: Memory Allocation per Executor in Spark, including how memory is calculated, what config values matter, and how dynamic memory sharing works in Sparkโs Unified Memory Manager.
๐ฆ Step 7: Memory Allocation per Executor (in Spark 3.x+)
๐ฅ What Is Executor Memory?
Each Spark executor runs on a worker node and has a JVM with its own memory space.
This memory is divided into:
Memory Category | Purpose |
---|---|
Reserved Memory | Safety buffer Spark always keeps (not user configurable) |
User Memory | For user variables, closures, data structures (10%) |
Spark Memory | Split into two types (combined as Unified Memory) |
– Execution Memory | Used for joins, aggregations, sorting, shuffles |
– Storage Memory | Used for caching (df.cache() , persist() ) |
โ Key Configuration Parameters
Parameter | Description | Default |
---|---|---|
spark.executor.memory | Total memory allocated per executor | (e.g., 4g ) |
spark.memory.fraction | Fraction of usable memory for Spark (exec+storage) | 0.6 |
spark.memory.storageFraction | Portion of Spark memory reserved for storage | 0.5 of Spark mem |
spark.executor.memoryOverhead | Extra memory for non-JVM stuff (shuffle, Python) | auto (max(384 MB, 0.1 * executor mem)) |
spark.driver.memory | Memory allocated to driver process | (e.g., 4g ) |
๐งฎ How Spark Memory Is Calculated
Letโs assume:
spark.executor.memory = 4g
๐ข Breakdown:
Type | Formula | Amount (Approx) |
---|---|---|
Reserved | Fixed (~300MB) | ~300 MB |
Usable JVM Heap | 4g - reserved | ~3.7 GB |
Unified Spark Memory | spark.memory.fraction ร usable | 0.6 ร 3.7 GB โ 2.2 GB |
โ Execution Memory | 50% of Unified Memory | ~1.1 GB |
โ Storage Memory | 50% of Unified Memory | ~1.1 GB |
User Memory | remaining JVM heap (1 - fraction) | ~1.5 GB |
Andโฆ
| Non-heap Overhead | spark.executor.memoryOverhead
| ~384 MB (default) |
โ Unified Memory Manager (Spark 2.0+)
Execution and storage memory are not fixed:
- If your job does no caching, storage memory is released to execution.
- If execution needs memory and storage is underused โ it borrows it.
- If caching is needed and there’s space โ storage can grow.
- Eviction happens if execution needs more and storage has cached data.
๐ Dynamic Memory Sharing
Scenario A: Execution-intensive stage
- Sorts, joins, shuffles โ use most of Unified Memory
- Spark evicts cached data if needed (LRU policy)
Scenario B: Heavy caching
- Many
.cache()
or.persist()
calls - Execution will fail if it canโt get enough memory (OOM)
โ Memory Tuning Tips
Issue | Fix / Action |
---|---|
Frequent spills | Increase executor.memory or memory.fraction |
OOM during joins/sorts | Tune spark.sql.shuffle.partitions , avoid skew |
Cached data eviction | Use .persist(DISK_ONLY) or increase executor memory |
Python jobs (PySpark) | Increase spark.executor.memoryOverhead |
YARN/K8s container OOM | Sum of memory + overhead must fit inside container limit |
๐ง Spark UI & Monitoring
- Executor tab: shows memory used for storage, execution, overhead.
- Storage tab: shows cached RDD/DataFrame sizes.
- Task tab: shows spills to disk (bad sign = not enough memory).
๐งช Example:
spark-submit \
--executor-memory 4G \
--executor-cores 4 \
--conf spark.memory.fraction=0.6 \
--conf spark.memory.storageFraction=0.5 \
--conf spark.executor.memoryOverhead=512M \
app.py
Memory layout:
4 GB executor heap
- 300 MB reserved
= 3.7 GB usable
0.6 ร 3.7 = 2.2 GB Unified Memory
โ 1.1 GB Execution
โ 1.1 GB Storage
~ 1.5 GB User Memory
+ 512 MB memoryOverhead (off-heap)
๐ Summary
Component | Description |
---|---|
Reserved Memory | System overhead (~300MB) |
User Memory | Non-Spark JVM usage |
Unified Memory | Execution + Storage |
Execution Memory | Joins, aggregates, shuffles |
Storage Memory | Caching and broadcast variables |
Memory Overhead | Off-heap: shuffle buffers, Python |
โ Step 8: Final Result Collection or Write
- If
.collect()
: data is pulled to the driver - If
.write()
: data is written from executors to storage (HDFS, S3, DB)
Here’s a deep dive into Step 8: Final Result Collection or Write in Apache Spark, covering:
- What happens when results are collected or written
- Differences in Databricks vs. on-prem BDL (Big Data Lake) Spark
- Memory, network, disk behavior
- Failure scenarios and performance tips
Operation | Description |
---|---|
.collect() | Brings result to Driver node memory |
.write() | Writes result from Executors to external sink (e.g. file, DB) |
โ ๏ธ .collect()
โ Result Collection to Driver
๐ What happens:
- Spark triggers a job.
- Tasks run on executors.
- Results are pulled back to the driver node via network.
- Returned as a list/array in local driver memory.
result = df.filter(...).collect()
๐ฅ Internals:
- Uses network (not HDFS) to transfer data from executors.
- If result size exceeds
driver.memory
โ ๐ฅ OutOfMemoryError - Can overwhelm driver if data > few MBs.
โ Use when:
- You need small datasets (like < 100K rows).
- For debugging, testing, or local export.
โ Avoid when:
- Result is large (millions of rows or >1GB).
- You are working in a distributed compute flow.
โ๏ธ Common Configs:
Config Name | Description |
---|---|
spark.driver.memory | Memory available to hold .collect() |
spark.driver.maxResultSize | Max result size allowed (default: 1G) |
--conf spark.driver.maxResultSize=2g
โ
.write()
โ Write Results to External Storage
This is the most production-safe way to handle large outputs.
df.write.mode("overwrite").parquet("s3://my-bucket/output/")
๐ What happens:
- Data is partitioned across executors.
- Each task writes its portion of data:
- HDFS / DBFS / S3 / ADLS
- JDBC / Hive / Delta Lake
- Metadata (schema, partition info) is optionally written.
โณ๏ธ Output Paths and Formats
Format | File Types | Parallel Write | Splittable | Typical Use |
---|---|---|---|---|
Parquet | .parquet | โ | โ | Analytics, ML |
ORC | .orc | โ | โ | Hive + compression |
Delta Lake | .parquet + txn | โ | โ | ACID tables in Databricks |
CSV | .csv | โ | โ | Raw exports |
JDBC | – | โ | โ | One row at a time |
โ Behavior in Databricks vs. On-Prem BDL
๐ฆ In Databricks
- Uses DBFS or ADLS/S3 paths:
df.write.format("delta").save("/mnt/datalake/output/")
- Uses Delta Lake format by default for tables
- Can write to:
- Managed/External Delta Tables
- Unity Catalog tables
- SQL Warehouses
Example:
df.write.format("delta").mode("overwrite").saveAsTable("sales_summary")
Benefits:
- Auto-commits via Delta Transaction Log
- Can use
VACUUM
,OPTIMIZE
,ZORDER
post-write - Fully scalable + trackable via job UI
๐ฉ In On-Prem BDL or Hadoop Cluster
- Uses HDFS or NFS-mounted output paths:
df.write.format("orc").save("/data/warehouse/output/")
- Needs correct permissions + HDFS quotas
- More manual cleanup and checkpointing
๐ง Behind the Scenes: Write Path
- Executor tasks write files in parallel (1 file per partition).
- Temporary files written in
_temporary
directory. - Upon success:
_SUCCESS
marker file is added- Temp files renamed and committed
- Driver finalizes metadata (for Hive/Delta)
โ ๏ธ Failure Points
Point of Failure | What Happens | Fix |
---|---|---|
Driver crash during collect | Job fails | Avoid .collect() on large data |
Network timeout | File write stalls | Use retries or increase spark.network.timeout |
Partition skew | Some partitions take too long | Use repartition() or salting |
Too many small files | Metadata overhead in storage | Use coalesce() or repartition() |
Overwrite in use | Data loss if misused | Use mode("append") for incremental writes |
โ Best Practices
Goal | Recommendation |
---|---|
Small sample to local | df.limit(100).collect() |
Full export to disk/S3/HDFS | .write.format(...).save(...) |
Reduce number of files | df.coalesce(1) or repartition(n) |
Save with schema & ACID | Use Delta Lake format in Databricks |
SQL insert | df.write.insertInto("table") or saveAsTable |
Avoid driver memory issues | NEVER .collect() large DataFrames |
๐งช Example: Complete Script in Databricks
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("WriteExample").getOrCreate()
df = spark.read.option("header", True).csv("/mnt/raw/sales.csv")
df_clean = df.filter("amount > 1000").withColumnRenamed("amount", "total")
# Write as a Delta Table
df_clean.write.format("delta").mode("overwrite").saveAsTable("sales_summary")
# Optional: Optimize
spark.sql("OPTIMIZE sales_summary ZORDER BY (region)")
๐งช Example: BDL On-Prem Script (HDFS Output)
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("OnPremWrite") \
.getOrCreate()
df = spark.read.json("hdfs:///data/in/orders")
df2 = df.select("order_id", "user_id", "amount")
# Repartition to reduce small files
df2.repartition(4).write \
.mode("overwrite") \
.parquet("hdfs:///warehouse/processed/orders/")
โ Step 9: Job/Stage/Task Completion
Spark shows the progress in:
- Spark UI (Driver web UI) โ available at
http://<driver-host>:4040
- Databricks: Task DAG + Job metrics
- Includes: Stages, Tasks, Shuffle Reads/Writes, GC Time, Input Size, etc.
โ Step 10: Cleanup
- Driver process exits
- Executors shut down
- Cluster resources are released
๐ง Diagram: Spark Architecture (Simplified)
+---------------------+ submits job to +----------------------+
| Your Spark Program |----------------------->| Spark Driver |
+---------------------+ +----------------------+
|
+---------------------------------------+
|
v
+--------------------+
| DAG Scheduler | โ divides into stages
+--------------------+
|
v
+--------------------+ โ tasks created per partition
| Task Scheduler |
+--------------------+
|
v
+--------------------+ +--------------------+ +--------------------+
| Executor on Node1 | | Executor on Node2 | | Executor on NodeN |
+--------------------+ +--------------------+ +--------------------+
โ Example: Real Job Flow
df = spark.read.csv("data.csv") # Stage 1: File read
df2 = df.filter("value > 100") # Still Stage 1 (narrow)
df3 = df2.groupBy("category").count() # Stage 2 (wide, shuffle)
df3.write.parquet("out/") # Stage 3: File write
๐ Monitoring Spark Execution
- Driver UI (port 4040): Stages, DAG, Storage, Executors
- Databricks: โViewโ โ โJob/Task Graphโ
- Use
.explain()
on DataFrames to inspect the physical plan.
โ ๏ธ Spark Optimization Tips
Task | Recommendation |
---|---|
Shuffle joins | Use broadcast() for small table |
Partition skew | Use salting or repartition() |
Memory management | Avoid .collect() on large datasets |
Resource allocation | Tune executor memory & cores |
Caching reused datasets | Use .cache() or .persist() |