Pyspark Wholesome Tutorial- Links to refer, PDfs

Here is a detailed step-by-step lineage of how a Spark job is initiated, executed, and managed in a cluster, including how components interact, programs are called, memory is allocated, and executors are assigned.


๐Ÿ”ฅ Apache Spark Job Execution โ€“ Complete Lifecycle & Architecture


๐Ÿ“Œ Step 0: Your Program โ€“ Spark Application Code

You write a PySpark or Scala/Java application like:

spark = SparkSession.builder.appName("MyJob").getOrCreate()
df = spark.read.csv("input.csv")
result = df.filter(...).groupBy(...).agg(...)
result.write.parquet("output/")

This script constitutes a Spark Application.


โœ… Step 1: Driver Process Launch

  • The driver is the master JVM process that initiates SparkContext and orchestrates the entire job.
  • This is where the SparkSession lives.
  • Driver responsibilities:
    • Reads your code
    • Builds DAG of transformations
    • Converts to physical plan
    • Schedules and coordinates executors

โœ… Step 2: Cluster Manager Interaction

When you call .getOrCreate(), Spark communicates with a cluster manager:

Cluster ManagerDescription
StandaloneBuilt-in Spark cluster manager
YARN (Hadoop)External cluster manager (very common)
KubernetesContainer-based cluster manager
Mesos (rare now)Another general-purpose manager

The cluster manager allocates:

  • 1 driver container/process
  • N executor containers/processes

โš™๏ธ Resource Allocation Parameters

--num-executors 4
--executor-memory 4G
--executor-cores 2

โœ… Step 3: SparkContext + DAG Creation

Logical Plan

  • Spark creates a DAG (Directed Acyclic Graph) of transformations (e.g., map, filter, groupBy).
  • This is the logical plan: it represents what to do, not how to do it.

โœ… Step 4: DAG Scheduler โ†’ Stage Creation

  • Spark divides the DAG into stages based on shuffle boundaries.
    • Narrow transformations (map, filter): stay in same stage.
    • Wide transformations (groupBy, join): new stage, causes a shuffle.
  • Each stage is broken into tasks โ€” one task per partition.

Example:

df = spark.read.csv("...").filter(...).groupBy(...).count()

Stages:

  • Stage 1: Read & Filter
  • Stage 2: GroupBy (shuffle required)

โœ… Step 5: TaskScheduler โ†’ Task Execution

The TaskScheduler sends tasks to executors.

๐Ÿ”น Executors

  • JVM processes launched on worker nodes.
  • Run tasks assigned by the driver.
  • Store data in memory/disk using block manager.
  • Cache/persist RDDs/DataFrames.
  • Communicate results back to driver.

Each executor has:

  • N cores (executor-cores)
  • M memory (executor-memory)



โœ… Step 6: Execution Flow

  1. Driver sends tasks to executors.
  2. Executors run tasks in parallel (one per core).
  3. Intermediate data is:
    • Held in memory (if caching)
    • Written to disk (if shuffling)
  4. Shuffled data is pulled by downstream stages.

โœ… Step 7: Memory Management

Spark divides executor memory:

Memory PoolPurpose
Execution MemoryFor shuffles, joins, aggregations
Storage MemoryFor caching/persisting data
User MemoryFor user variables
Reserved MemoryAlways reserved internally

In Spark 3.x (Unified Memory Manager), execution and storage share memory dynamically.

Let’s now deep dive into Step 7: Memory Allocation per Executor in Spark, including how memory is calculated, what config values matter, and how dynamic memory sharing works in Sparkโ€™s Unified Memory Manager.


๐Ÿ“ฆ Step 7: Memory Allocation per Executor (in Spark 3.x+)


๐Ÿ”ฅ What Is Executor Memory?

Each Spark executor runs on a worker node and has a JVM with its own memory space.

This memory is divided into:

Memory CategoryPurpose
Reserved MemorySafety buffer Spark always keeps (not user configurable)
User MemoryFor user variables, closures, data structures (10%)
Spark MemorySplit into two types (combined as Unified Memory)
– Execution MemoryUsed for joins, aggregations, sorting, shuffles
– Storage MemoryUsed for caching (df.cache(), persist())

โœ… Key Configuration Parameters

ParameterDescriptionDefault
spark.executor.memoryTotal memory allocated per executor(e.g., 4g)
spark.memory.fractionFraction of usable memory for Spark (exec+storage)0.6
spark.memory.storageFractionPortion of Spark memory reserved for storage0.5 of Spark mem
spark.executor.memoryOverheadExtra memory for non-JVM stuff (shuffle, Python)auto (max(384 MB, 0.1 * executor mem))
spark.driver.memoryMemory allocated to driver process(e.g., 4g)

๐Ÿงฎ How Spark Memory Is Calculated

Letโ€™s assume:

spark.executor.memory = 4g

๐Ÿ”ข Breakdown:

TypeFormulaAmount (Approx)
ReservedFixed (~300MB)~300 MB
Usable JVM Heap4g - reserved~3.7 GB
Unified Spark Memoryspark.memory.fraction ร— usable0.6 ร— 3.7 GB โ‰ˆ 2.2 GB
โ†’ Execution Memory50% of Unified Memory~1.1 GB
โ†’ Storage Memory50% of Unified Memory~1.1 GB
User Memoryremaining JVM heap (1 - fraction)~1.5 GB

Andโ€ฆ

| Non-heap Overhead | spark.executor.memoryOverhead | ~384 MB (default) |


โœ… Unified Memory Manager (Spark 2.0+)

Execution and storage memory are not fixed:

  • If your job does no caching, storage memory is released to execution.
  • If execution needs memory and storage is underused โ†’ it borrows it.
  • If caching is needed and there’s space โ†’ storage can grow.
  • Eviction happens if execution needs more and storage has cached data.

๐Ÿ”€ Dynamic Memory Sharing

Scenario A: Execution-intensive stage

  • Sorts, joins, shuffles โ†’ use most of Unified Memory
  • Spark evicts cached data if needed (LRU policy)

Scenario B: Heavy caching

  • Many .cache() or .persist() calls
  • Execution will fail if it canโ€™t get enough memory (OOM)

โœ… Memory Tuning Tips

IssueFix / Action
Frequent spillsIncrease executor.memory or memory.fraction
OOM during joins/sortsTune spark.sql.shuffle.partitions, avoid skew
Cached data evictionUse .persist(DISK_ONLY) or increase executor memory
Python jobs (PySpark)Increase spark.executor.memoryOverhead
YARN/K8s container OOMSum of memory + overhead must fit inside container limit

๐Ÿง  Spark UI & Monitoring

  • Executor tab: shows memory used for storage, execution, overhead.
  • Storage tab: shows cached RDD/DataFrame sizes.
  • Task tab: shows spills to disk (bad sign = not enough memory).

๐Ÿงช Example:

spark-submit \
  --executor-memory 4G \
  --executor-cores 4 \
  --conf spark.memory.fraction=0.6 \
  --conf spark.memory.storageFraction=0.5 \
  --conf spark.executor.memoryOverhead=512M \
  app.py

Memory layout:

4 GB executor heap
- 300 MB reserved
= 3.7 GB usable

0.6 ร— 3.7 = 2.2 GB Unified Memory
โ†’ 1.1 GB Execution
โ†’ 1.1 GB Storage
~ 1.5 GB User Memory
+ 512 MB memoryOverhead (off-heap)

๐Ÿ”š Summary

ComponentDescription
Reserved MemorySystem overhead (~300MB)
User MemoryNon-Spark JVM usage
Unified MemoryExecution + Storage
Execution MemoryJoins, aggregates, shuffles
Storage MemoryCaching and broadcast variables
Memory OverheadOff-heap: shuffle buffers, Python


โœ… Step 8: Final Result Collection or Write

  • If .collect(): data is pulled to the driver
  • If .write(): data is written from executors to storage (HDFS, S3, DB)

Here’s a deep dive into Step 8: Final Result Collection or Write in Apache Spark, covering:

  • What happens when results are collected or written
  • Differences in Databricks vs. on-prem BDL (Big Data Lake) Spark
  • Memory, network, disk behavior
  • Failure scenarios and performance tips

OperationDescription
.collect()Brings result to Driver node memory
.write()Writes result from Executors to external sink (e.g. file, DB)

โš ๏ธ .collect() โ€“ Result Collection to Driver

๐Ÿ” What happens:

  1. Spark triggers a job.
  2. Tasks run on executors.
  3. Results are pulled back to the driver node via network.
  4. Returned as a list/array in local driver memory.
result = df.filter(...).collect()

๐Ÿ”ฅ Internals:

  • Uses network (not HDFS) to transfer data from executors.
  • If result size exceeds driver.memory โ†’ ๐Ÿ’ฅ OutOfMemoryError
  • Can overwhelm driver if data > few MBs.

โœ… Use when:

  • You need small datasets (like < 100K rows).
  • For debugging, testing, or local export.

โŒ Avoid when:

  • Result is large (millions of rows or >1GB).
  • You are working in a distributed compute flow.

โš™๏ธ Common Configs:

Config NameDescription
spark.driver.memoryMemory available to hold .collect()
spark.driver.maxResultSizeMax result size allowed (default: 1G)
--conf spark.driver.maxResultSize=2g

โœ… .write() โ€“ Write Results to External Storage

This is the most production-safe way to handle large outputs.

df.write.mode("overwrite").parquet("s3://my-bucket/output/")

๐Ÿ” What happens:

  1. Data is partitioned across executors.
  2. Each task writes its portion of data:
    • HDFS / DBFS / S3 / ADLS
    • JDBC / Hive / Delta Lake
  3. Metadata (schema, partition info) is optionally written.

โœณ๏ธ Output Paths and Formats

FormatFile TypesParallel WriteSplittableTypical Use
Parquet.parquetโœ…โœ…Analytics, ML
ORC.orcโœ…โœ…Hive + compression
Delta Lake.parquet + txnโœ…โœ…ACID tables in Databricks
CSV.csvโœ…โŒRaw exports
JDBCโŒโŒOne row at a time

โœ… Behavior in Databricks vs. On-Prem BDL

๐ŸŸฆ In Databricks

  • Uses DBFS or ADLS/S3 paths: df.write.format("delta").save("/mnt/datalake/output/")
  • Uses Delta Lake format by default for tables
  • Can write to:
    • Managed/External Delta Tables
    • Unity Catalog tables
    • SQL Warehouses

Example:

df.write.format("delta").mode("overwrite").saveAsTable("sales_summary")

Benefits:

  • Auto-commits via Delta Transaction Log
  • Can use VACUUM, OPTIMIZE, ZORDER post-write
  • Fully scalable + trackable via job UI

๐ŸŸฉ In On-Prem BDL or Hadoop Cluster

  • Uses HDFS or NFS-mounted output paths: df.write.format("orc").save("/data/warehouse/output/")
  • Needs correct permissions + HDFS quotas
  • More manual cleanup and checkpointing

๐Ÿง  Behind the Scenes: Write Path

  1. Executor tasks write files in parallel (1 file per partition).
  2. Temporary files written in _temporary directory.
  3. Upon success:
    • _SUCCESS marker file is added
    • Temp files renamed and committed
  4. Driver finalizes metadata (for Hive/Delta)

โš ๏ธ Failure Points

Point of FailureWhat HappensFix
Driver crash during collectJob failsAvoid .collect() on large data
Network timeoutFile write stallsUse retries or increase spark.network.timeout
Partition skewSome partitions take too longUse repartition() or salting
Too many small filesMetadata overhead in storageUse coalesce() or repartition()
Overwrite in useData loss if misusedUse mode("append") for incremental writes

โœ… Best Practices

GoalRecommendation
Small sample to localdf.limit(100).collect()
Full export to disk/S3/HDFS.write.format(...).save(...)
Reduce number of filesdf.coalesce(1) or repartition(n)
Save with schema & ACIDUse Delta Lake format in Databricks
SQL insertdf.write.insertInto("table") or saveAsTable
Avoid driver memory issuesNEVER .collect() large DataFrames

๐Ÿงช Example: Complete Script in Databricks

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WriteExample").getOrCreate()

df = spark.read.option("header", True).csv("/mnt/raw/sales.csv")

df_clean = df.filter("amount > 1000").withColumnRenamed("amount", "total")

# Write as a Delta Table
df_clean.write.format("delta").mode("overwrite").saveAsTable("sales_summary")

# Optional: Optimize
spark.sql("OPTIMIZE sales_summary ZORDER BY (region)")

๐Ÿงช Example: BDL On-Prem Script (HDFS Output)

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("OnPremWrite") \
    .getOrCreate()

df = spark.read.json("hdfs:///data/in/orders")

df2 = df.select("order_id", "user_id", "amount")

# Repartition to reduce small files
df2.repartition(4).write \
    .mode("overwrite") \
    .parquet("hdfs:///warehouse/processed/orders/")


โœ… Step 9: Job/Stage/Task Completion

Spark shows the progress in:

  • Spark UI (Driver web UI) โ€“ available at http://<driver-host>:4040
  • Databricks: Task DAG + Job metrics
  • Includes: Stages, Tasks, Shuffle Reads/Writes, GC Time, Input Size, etc.

โœ… Step 10: Cleanup

  • Driver process exits
  • Executors shut down
  • Cluster resources are released

๐Ÿง  Diagram: Spark Architecture (Simplified)

+---------------------+     submits job to     +----------------------+
| Your Spark Program  |----------------------->| Spark Driver         |
+---------------------+                        +----------------------+
                                                      |
              +---------------------------------------+
              |
              v
     +--------------------+
     |  DAG Scheduler     |    โ†’ divides into stages
     +--------------------+
              |
              v
     +--------------------+     โ†’ tasks created per partition
     | Task Scheduler     |
     +--------------------+
              |
              v
+--------------------+  +--------------------+   +--------------------+
|  Executor on Node1 |  | Executor on Node2  |   | Executor on NodeN  |
+--------------------+  +--------------------+   +--------------------+

โœ… Example: Real Job Flow

df = spark.read.csv("data.csv")          # Stage 1: File read
df2 = df.filter("value > 100")           # Still Stage 1 (narrow)
df3 = df2.groupBy("category").count()    # Stage 2 (wide, shuffle)
df3.write.parquet("out/")                # Stage 3: File write

๐Ÿ” Monitoring Spark Execution

  • Driver UI (port 4040): Stages, DAG, Storage, Executors
  • Databricks: โ€œViewโ€ โ†’ โ€œJob/Task Graphโ€
  • Use .explain() on DataFrames to inspect the physical plan.

โš ๏ธ Spark Optimization Tips

TaskRecommendation
Shuffle joinsUse broadcast() for small table
Partition skewUse salting or repartition()
Memory managementAvoid .collect() on large datasets
Resource allocationTune executor memory & cores
Caching reused datasetsUse .cache() or .persist()


Pages: 1 2 3 4 5 6