Pyspark Wholesome Tutorial- Links to refer, PDfs

Here is a detailed step-by-step lineage of how a Spark job is initiated, executed, and managed in a cluster, including how components interact, programs are called, memory is allocated, and executors are assigned.


๐Ÿ”ฅ Apache Spark Job Execution โ€“ Complete Lifecycle & Architecture


๐Ÿ“Œ Step 0: Your Program โ€“ Spark Application Code

You write a PySpark or Scala/Java application like:

spark = SparkSession.builder.appName("MyJob").getOrCreate()
df = spark.read.csv("input.csv")
result = df.filter(...).groupBy(...).agg(...)
result.write.parquet("output/")

This script constitutes a Spark Application.


โœ… Step 1: Driver Process Launch

  • The driver is the master JVM process that initiates SparkContext and orchestrates the entire job.
  • This is where the SparkSession lives.
  • Driver responsibilities:
    • Reads your code
    • Builds DAG of transformations
    • Converts to physical plan
    • Schedules and coordinates executors

โœ… Step 2: Cluster Manager Interaction

When you call .getOrCreate(), Spark communicates with a cluster manager:

Cluster ManagerDescription
StandaloneBuilt-in Spark cluster manager
YARN (Hadoop)External cluster manager (very common)
KubernetesContainer-based cluster manager
Mesos (rare now)Another general-purpose manager

The cluster manager allocates:

  • 1 driver container/process
  • N executor containers/processes

โš™๏ธ Resource Allocation Parameters

--num-executors 4
--executor-memory 4G
--executor-cores 2

โœ… Step 3: SparkContext + DAG Creation

Logical Plan

  • Spark creates a DAG (Directed Acyclic Graph) of transformations (e.g., map, filter, groupBy).
  • This is the logical plan: it represents what to do, not how to do it.

โœ… Step 4: DAG Scheduler โ†’ Stage Creation

  • Spark divides the DAG into stages based on shuffle boundaries.
    • Narrow transformations (map, filter): stay in same stage.
    • Wide transformations (groupBy, join): new stage, causes a shuffle.
  • Each stage is broken into tasks โ€” one task per partition.

Example:

df = spark.read.csv("...").filter(...).groupBy(...).count()

Stages:

  • Stage 1: Read & Filter
  • Stage 2: GroupBy (shuffle required)

โœ… Step 5: TaskScheduler โ†’ Task Execution

The TaskScheduler sends tasks to executors.

๐Ÿ”น Executors

  • JVM processes launched on worker nodes.
  • Run tasks assigned by the driver.
  • Store data in memory/disk using block manager.
  • Cache/persist RDDs/DataFrames.
  • Communicate results back to driver.

Each executor has:

  • N cores (executor-cores)
  • M memory (executor-memory)



โœ… Step 6: Execution Flow

  1. Driver sends tasks to executors.
  2. Executors run tasks in parallel (one per core).
  3. Intermediate data is:
    • Held in memory (if caching)
    • Written to disk (if shuffling)
  4. Shuffled data is pulled by downstream stages.

โœ… Step 7: Memory Management

Spark divides executor memory:

Memory PoolPurpose
Execution MemoryFor shuffles, joins, aggregations
Storage MemoryFor caching/persisting data
User MemoryFor user variables
Reserved MemoryAlways reserved internally

In Spark 3.x (Unified Memory Manager), execution and storage share memory dynamically.

Great! Let’s now deep dive into Step 7: Memory Allocation per Executor in Spark, including how memory is calculated, what config values matter, and how dynamic memory sharing works in Sparkโ€™s Unified Memory Manager.


๐Ÿ“ฆ Step 7: Memory Allocation per Executor (in Spark 3.x+)


๐Ÿ”ฅ What Is Executor Memory?

Each Spark executor runs on a worker node and has a JVM with its own memory space.

This memory is divided into:

Memory CategoryPurpose
Reserved MemorySafety buffer Spark always keeps (not user configurable)
User MemoryFor user variables, closures, data structures (10%)
Spark MemorySplit into two types (combined as Unified Memory)
– Execution MemoryUsed for joins, aggregations, sorting, shuffles
– Storage MemoryUsed for caching (df.cache(), persist())

โœ… Key Configuration Parameters

ParameterDescriptionDefault
spark.executor.memoryTotal memory allocated per executor(e.g., 4g)
spark.memory.fractionFraction of usable memory for Spark (exec+storage)0.6
spark.memory.storageFractionPortion of Spark memory reserved for storage0.5 of Spark mem
spark.executor.memoryOverheadExtra memory for non-JVM stuff (shuffle, Python)auto (max(384 MB, 0.1 * executor mem))
spark.driver.memoryMemory allocated to driver process(e.g., 4g)

๐Ÿงฎ How Spark Memory Is Calculated

Letโ€™s assume:

spark.executor.memory = 4g

๐Ÿ”ข Breakdown:

TypeFormulaAmount (Approx)
ReservedFixed (~300MB)~300 MB
Usable JVM Heap4g - reserved~3.7 GB
Unified Spark Memoryspark.memory.fraction ร— usable0.6 ร— 3.7 GB โ‰ˆ 2.2 GB
โ†’ Execution Memory50% of Unified Memory~1.1 GB
โ†’ Storage Memory50% of Unified Memory~1.1 GB
User Memoryremaining JVM heap (1 - fraction)~1.5 GB

Andโ€ฆ

| Non-heap Overhead | spark.executor.memoryOverhead | ~384 MB (default) |


โœ… Unified Memory Manager (Spark 2.0+)

Execution and storage memory are not fixed:

  • If your job does no caching, storage memory is released to execution.
  • If execution needs memory and storage is underused โ†’ it borrows it.
  • If caching is needed and there’s space โ†’ storage can grow.
  • Eviction happens if execution needs more and storage has cached data.

๐Ÿ”€ Dynamic Memory Sharing

Scenario A: Execution-intensive stage

  • Sorts, joins, shuffles โ†’ use most of Unified Memory
  • Spark evicts cached data if needed (LRU policy)

Scenario B: Heavy caching

  • Many .cache() or .persist() calls
  • Execution will fail if it canโ€™t get enough memory (OOM)

โœ… Memory Tuning Tips

IssueFix / Action
Frequent spillsIncrease executor.memory or memory.fraction
OOM during joins/sortsTune spark.sql.shuffle.partitions, avoid skew
Cached data evictionUse .persist(DISK_ONLY) or increase executor memory
Python jobs (PySpark)Increase spark.executor.memoryOverhead
YARN/K8s container OOMSum of memory + overhead must fit inside container limit

๐Ÿง  Spark UI & Monitoring

  • Executor tab: shows memory used for storage, execution, overhead.
  • Storage tab: shows cached RDD/DataFrame sizes.
  • Task tab: shows spills to disk (bad sign = not enough memory).

๐Ÿงช Example:

spark-submit \
  --executor-memory 4G \
  --executor-cores 4 \
  --conf spark.memory.fraction=0.6 \
  --conf spark.memory.storageFraction=0.5 \
  --conf spark.executor.memoryOverhead=512M \
  app.py

Memory layout:

4 GB executor heap
- 300 MB reserved
= 3.7 GB usable

0.6 ร— 3.7 = 2.2 GB Unified Memory
โ†’ 1.1 GB Execution
โ†’ 1.1 GB Storage
~ 1.5 GB User Memory
+ 512 MB memoryOverhead (off-heap)

๐Ÿ”š Summary

ComponentDescription
Reserved MemorySystem overhead (~300MB)
User MemoryNon-Spark JVM usage
Unified MemoryExecution + Storage
Execution MemoryJoins, aggregates, shuffles
Storage MemoryCaching and broadcast variables
Memory OverheadOff-heap: shuffle buffers, Python

Pages: 1 2 3 4 5 6 7 8