Here is a detailed step-by-step lineage of how a Spark job is initiated, executed, and managed in a cluster, including how components interact, programs are called, memory is allocated, and executors are assigned.
๐ฅ Apache Spark Job Execution โ Complete Lifecycle & Architecture
๐ Step 0: Your Program โ Spark Application Code
You write a PySpark or Scala/Java application like:
spark = SparkSession.builder.appName("MyJob").getOrCreate()
df = spark.read.csv("input.csv")
result = df.filter(...).groupBy(...).agg(...)
result.write.parquet("output/")
This script constitutes a Spark Application.
โ Step 1: Driver Process Launch
- The driver is the master JVM process that initiates SparkContext and orchestrates the entire job.
- This is where the
SparkSessionlives. - Driver responsibilities:
- Reads your code
- Builds DAG of transformations
- Converts to physical plan
- Schedules and coordinates executors
โ Step 2: Cluster Manager Interaction
When you call .getOrCreate(), Spark communicates with a cluster manager:
| Cluster Manager | Description |
|---|---|
| Standalone | Built-in Spark cluster manager |
| YARN (Hadoop) | External cluster manager (very common) |
| Kubernetes | Container-based cluster manager |
| Mesos (rare now) | Another general-purpose manager |
The cluster manager allocates:
- 1 driver container/process
- N executor containers/processes
โ๏ธ Resource Allocation Parameters
--num-executors 4
--executor-memory 4G
--executor-cores 2
โ Step 3: SparkContext + DAG Creation
Logical Plan
- Spark creates a DAG (Directed Acyclic Graph) of transformations (e.g., map, filter, groupBy).
- This is the logical plan: it represents what to do, not how to do it.
โ Step 4: DAG Scheduler โ Stage Creation
- Spark divides the DAG into stages based on shuffle boundaries.
- Narrow transformations (map, filter): stay in same stage.
- Wide transformations (groupBy, join): new stage, causes a shuffle.
- Each stage is broken into tasks โ one task per partition.
Example:
df = spark.read.csv("...").filter(...).groupBy(...).count()
Stages:
- Stage 1: Read & Filter
- Stage 2: GroupBy (shuffle required)
โ Step 5: TaskScheduler โ Task Execution
The TaskScheduler sends tasks to executors.
๐น Executors
- JVM processes launched on worker nodes.
- Run tasks assigned by the driver.
- Store data in memory/disk using block manager.
- Cache/persist RDDs/DataFrames.
- Communicate results back to driver.
Each executor has:
- N cores (
executor-cores) - M memory (
executor-memory)
โ Step 6: Execution Flow
- Driver sends tasks to executors.
- Executors run tasks in parallel (one per core).
- Intermediate data is:
- Held in memory (if caching)
- Written to disk (if shuffling)
- Shuffled data is pulled by downstream stages.
โ Step 7: Memory Management
Spark divides executor memory:
| Memory Pool | Purpose |
|---|---|
| Execution Memory | For shuffles, joins, aggregations |
| Storage Memory | For caching/persisting data |
| User Memory | For user variables |
| Reserved Memory | Always reserved internally |
In Spark 3.x (Unified Memory Manager), execution and storage share memory dynamically.
Let’s now deep dive into Step 7: Memory Allocation per Executor in Spark, including how memory is calculated, what config values matter, and how dynamic memory sharing works in Sparkโs Unified Memory Manager.
๐ฆ Step 7: Memory Allocation per Executor (in Spark 3.x+)
๐ฅ What Is Executor Memory?
Each Spark executor runs on a worker node and has a JVM with its own memory space.
This memory is divided into:
| Memory Category | Purpose |
|---|---|
| Reserved Memory | Safety buffer Spark always keeps (not user configurable) |
| User Memory | For user variables, closures, data structures (10%) |
| Spark Memory | Split into two types (combined as Unified Memory) |
| – Execution Memory | Used for joins, aggregations, sorting, shuffles |
| – Storage Memory | Used for caching (df.cache(), persist()) |
โ Key Configuration Parameters
| Parameter | Description | Default |
|---|---|---|
spark.executor.memory | Total memory allocated per executor | (e.g., 4g) |
spark.memory.fraction | Fraction of usable memory for Spark (exec+storage) | 0.6 |
spark.memory.storageFraction | Portion of Spark memory reserved for storage | 0.5 of Spark mem |
spark.executor.memoryOverhead | Extra memory for non-JVM stuff (shuffle, Python) | auto (max(384 MB, 0.1 * executor mem)) |
spark.driver.memory | Memory allocated to driver process | (e.g., 4g) |
๐งฎ How Spark Memory Is Calculated
Letโs assume:
spark.executor.memory = 4g
๐ข Breakdown:
| Type | Formula | Amount (Approx) |
|---|---|---|
| Reserved | Fixed (~300MB) | ~300 MB |
| Usable JVM Heap | 4g - reserved | ~3.7 GB |
| Unified Spark Memory | spark.memory.fraction ร usable | 0.6 ร 3.7 GB โ 2.2 GB |
| โ Execution Memory | 50% of Unified Memory | ~1.1 GB |
| โ Storage Memory | 50% of Unified Memory | ~1.1 GB |
| User Memory | remaining JVM heap (1 - fraction) | ~1.5 GB |
Andโฆ
| Non-heap Overhead | spark.executor.memoryOverhead | ~384 MB (default) |
โ Unified Memory Manager (Spark 2.0+)
Execution and storage memory are not fixed:
- If your job does no caching, storage memory is released to execution.
- If execution needs memory and storage is underused โ it borrows it.
- If caching is needed and there’s space โ storage can grow.
- Eviction happens if execution needs more and storage has cached data.
๐ Dynamic Memory Sharing
Scenario A: Execution-intensive stage
- Sorts, joins, shuffles โ use most of Unified Memory
- Spark evicts cached data if needed (LRU policy)
Scenario B: Heavy caching
- Many
.cache()or.persist()calls - Execution will fail if it canโt get enough memory (OOM)
โ Memory Tuning Tips
| Issue | Fix / Action |
|---|---|
| Frequent spills | Increase executor.memory or memory.fraction |
| OOM during joins/sorts | Tune spark.sql.shuffle.partitions, avoid skew |
| Cached data eviction | Use .persist(DISK_ONLY) or increase executor memory |
| Python jobs (PySpark) | Increase spark.executor.memoryOverhead |
| YARN/K8s container OOM | Sum of memory + overhead must fit inside container limit |
๐ง Spark UI & Monitoring
- Executor tab: shows memory used for storage, execution, overhead.
- Storage tab: shows cached RDD/DataFrame sizes.
- Task tab: shows spills to disk (bad sign = not enough memory).
๐งช Example:
spark-submit \
--executor-memory 4G \
--executor-cores 4 \
--conf spark.memory.fraction=0.6 \
--conf spark.memory.storageFraction=0.5 \
--conf spark.executor.memoryOverhead=512M \
app.py
Memory layout:
4 GB executor heap
- 300 MB reserved
= 3.7 GB usable
0.6 ร 3.7 = 2.2 GB Unified Memory
โ 1.1 GB Execution
โ 1.1 GB Storage
~ 1.5 GB User Memory
+ 512 MB memoryOverhead (off-heap)
๐ Summary
| Component | Description |
|---|---|
| Reserved Memory | System overhead (~300MB) |
| User Memory | Non-Spark JVM usage |
| Unified Memory | Execution + Storage |
| Execution Memory | Joins, aggregates, shuffles |
| Storage Memory | Caching and broadcast variables |
| Memory Overhead | Off-heap: shuffle buffers, Python |
โ Step 8: Final Result Collection or Write
- If
.collect(): data is pulled to the driver - If
.write(): data is written from executors to storage (HDFS, S3, DB)
Here’s a deep dive into Step 8: Final Result Collection or Write in Apache Spark, covering:
- What happens when results are collected or written
- Differences in Databricks vs. on-prem BDL (Big Data Lake) Spark
- Memory, network, disk behavior
- Failure scenarios and performance tips
| Operation | Description |
|---|---|
.collect() | Brings result to Driver node memory |
.write() | Writes result from Executors to external sink (e.g. file, DB) |
โ ๏ธ .collect() โ Result Collection to Driver
๐ What happens:
- Spark triggers a job.
- Tasks run on executors.
- Results are pulled back to the driver node via network.
- Returned as a list/array in local driver memory.
result = df.filter(...).collect()
๐ฅ Internals:
- Uses network (not HDFS) to transfer data from executors.
- If result size exceeds
driver.memoryโ ๐ฅ OutOfMemoryError - Can overwhelm driver if data > few MBs.
โ Use when:
- You need small datasets (like < 100K rows).
- For debugging, testing, or local export.
โ Avoid when:
- Result is large (millions of rows or >1GB).
- You are working in a distributed compute flow.
โ๏ธ Common Configs:
| Config Name | Description |
|---|---|
spark.driver.memory | Memory available to hold .collect() |
spark.driver.maxResultSize | Max result size allowed (default: 1G) |
--conf spark.driver.maxResultSize=2g
โ
.write() โ Write Results to External Storage
This is the most production-safe way to handle large outputs.
df.write.mode("overwrite").parquet("s3://my-bucket/output/")
๐ What happens:
- Data is partitioned across executors.
- Each task writes its portion of data:
- HDFS / DBFS / S3 / ADLS
- JDBC / Hive / Delta Lake
- Metadata (schema, partition info) is optionally written.
โณ๏ธ Output Paths and Formats
| Format | File Types | Parallel Write | Splittable | Typical Use |
|---|---|---|---|---|
| Parquet | .parquet | โ | โ | Analytics, ML |
| ORC | .orc | โ | โ | Hive + compression |
| Delta Lake | .parquet + txn | โ | โ | ACID tables in Databricks |
| CSV | .csv | โ | โ | Raw exports |
| JDBC | – | โ | โ | One row at a time |
โ Behavior in Databricks vs. On-Prem BDL
๐ฆ In Databricks
- Uses DBFS or ADLS/S3 paths:
df.write.format("delta").save("/mnt/datalake/output/") - Uses Delta Lake format by default for tables
- Can write to:
- Managed/External Delta Tables
- Unity Catalog tables
- SQL Warehouses
Example:
df.write.format("delta").mode("overwrite").saveAsTable("sales_summary")
Benefits:
- Auto-commits via Delta Transaction Log
- Can use
VACUUM,OPTIMIZE,ZORDERpost-write - Fully scalable + trackable via job UI
๐ฉ In On-Prem BDL or Hadoop Cluster
- Uses HDFS or NFS-mounted output paths:
df.write.format("orc").save("/data/warehouse/output/") - Needs correct permissions + HDFS quotas
- More manual cleanup and checkpointing
๐ง Behind the Scenes: Write Path
- Executor tasks write files in parallel (1 file per partition).
- Temporary files written in
_temporarydirectory. - Upon success:
_SUCCESSmarker file is added- Temp files renamed and committed
- Driver finalizes metadata (for Hive/Delta)
โ ๏ธ Failure Points
| Point of Failure | What Happens | Fix |
|---|---|---|
| Driver crash during collect | Job fails | Avoid .collect() on large data |
| Network timeout | File write stalls | Use retries or increase spark.network.timeout |
| Partition skew | Some partitions take too long | Use repartition() or salting |
| Too many small files | Metadata overhead in storage | Use coalesce() or repartition() |
| Overwrite in use | Data loss if misused | Use mode("append") for incremental writes |
โ Best Practices
| Goal | Recommendation |
|---|---|
| Small sample to local | df.limit(100).collect() |
| Full export to disk/S3/HDFS | .write.format(...).save(...) |
| Reduce number of files | df.coalesce(1) or repartition(n) |
| Save with schema & ACID | Use Delta Lake format in Databricks |
| SQL insert | df.write.insertInto("table") or saveAsTable |
| Avoid driver memory issues | NEVER .collect() large DataFrames |
๐งช Example: Complete Script in Databricks
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("WriteExample").getOrCreate()
df = spark.read.option("header", True).csv("/mnt/raw/sales.csv")
df_clean = df.filter("amount > 1000").withColumnRenamed("amount", "total")
# Write as a Delta Table
df_clean.write.format("delta").mode("overwrite").saveAsTable("sales_summary")
# Optional: Optimize
spark.sql("OPTIMIZE sales_summary ZORDER BY (region)")
๐งช Example: BDL On-Prem Script (HDFS Output)
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("OnPremWrite") \
.getOrCreate()
df = spark.read.json("hdfs:///data/in/orders")
df2 = df.select("order_id", "user_id", "amount")
# Repartition to reduce small files
df2.repartition(4).write \
.mode("overwrite") \
.parquet("hdfs:///warehouse/processed/orders/")
โ Step 9: Job/Stage/Task Completion
Spark shows the progress in:
- Spark UI (Driver web UI) โ available at
http://<driver-host>:4040 - Databricks: Task DAG + Job metrics
- Includes: Stages, Tasks, Shuffle Reads/Writes, GC Time, Input Size, etc.
โ Step 10: Cleanup
- Driver process exits
- Executors shut down
- Cluster resources are released
๐ง Diagram: Spark Architecture (Simplified)
+---------------------+ submits job to +----------------------+
| Your Spark Program |----------------------->| Spark Driver |
+---------------------+ +----------------------+
|
+---------------------------------------+
|
v
+--------------------+
| DAG Scheduler | โ divides into stages
+--------------------+
|
v
+--------------------+ โ tasks created per partition
| Task Scheduler |
+--------------------+
|
v
+--------------------+ +--------------------+ +--------------------+
| Executor on Node1 | | Executor on Node2 | | Executor on NodeN |
+--------------------+ +--------------------+ +--------------------+
โ Example: Real Job Flow
df = spark.read.csv("data.csv") # Stage 1: File read
df2 = df.filter("value > 100") # Still Stage 1 (narrow)
df3 = df2.groupBy("category").count() # Stage 2 (wide, shuffle)
df3.write.parquet("out/") # Stage 3: File write
๐ Monitoring Spark Execution
- Driver UI (port 4040): Stages, DAG, Storage, Executors
- Databricks: โViewโ โ โJob/Task Graphโ
- Use
.explain()on DataFrames to inspect the physical plan.
โ ๏ธ Spark Optimization Tips
| Task | Recommendation |
|---|---|
| Shuffle joins | Use broadcast() for small table |
| Partition skew | Use salting or repartition() |
| Memory management | Avoid .collect() on large datasets |
| Resource allocation | Tune executor memory & cores |
| Caching reused datasets | Use .cache() or .persist() |