π PySpark Architecture & Execution Engine β Complete Guide
π₯ 1. Spark Evolution Recap
- 2009: AMPLab (UC Berkeley)
- Goal: Overcome Hadoop MapReduce slowness
- 2013: Donated to Apache Foundation
- Built using Scala, runs on JVM, supports Python (via PySpark)
βοΈ 2. Spark vs Hadoop (Core Comparison)
Feature | Hadoop MapReduce | Apache Spark |
---|---|---|
Engine | Disk-based | In-memory |
Languages | Java-only | Scala, Python, R, SQL |
Iterative Support | Poor (writes to disk) | Native (in-memory) |
Speed | Slow (I/O bound) | Fast (RAM usage) |
Ecosystem | Limited | Unified stack |
π§± 3. Spark Design Goals
- Speed: 100x faster than Hadoop (in-memory)
- Unified Engine: Batch, Streaming, MLlib, GraphX
- Language APIs: Python (PySpark), Scala, Java, SQL
- Fault-Tolerant: Lineage tracking, RDDs
- Scalability: Tested with 8000+ nodes
π 4. Why PySpark? (Python + Spark Bridge)
PySpark is the Python API to the Spark engine, which is natively written in Scala/JVM.
π How Python Code Reaches JVM:
[Python Code]
β py4j Gateway
[Python Driver Process]
β
[JVM Driver (SparkContext)]
β
[Executors (JVM Workers)]
π§ What is py4j?
- A Java library embedded inside Spark that exposes JVM objects to Python.
- Python calls Java methods as if they were native β SparkContext, RDDs, etc.
β One-liner: PySpark uses py4j to route Python commands to the JVM-based Spark engine.
βοΈ 5. Master-Slave Architecture β Driver & Executors
π§ Core Components:
Component | Role |
---|---|
Driver | Orchestrates everything, holds the SparkContext |
SparkContext | Entry point; connects to Cluster Manager |
Cluster Manager | Allocates Executors (YARN, K8s, Standalone) |
Executor(s) | JVM processes on worker nodes; perform tasks + hold data |
Task | Atomic unit of execution (mapped to a partition) |
Stage | A set of tasks with no shuffle boundary |
Job | Triggered by an action (e.g. .show() , .count() ) |
π Diagram: Simplified Flow
Python Code
β
SparkSession β SparkContext
β
DAG Scheduler β Stage Plan
β
Task Scheduler β Tasks
β
Executors (on Worker Nodes)
β
Reads/Writes Data β Returns Result
π 6. DAG Scheduler & Task Lifecycle
πΉ What Happens When You Run .show()
?
- Logical Plan β based on DataFrame transformations
- Optimized Plan β Catalyst optimizes for pushdowns, projections
- Physical Plan β translates to stages
- Stages β narrow (e.g.,
map
) vs wide (e.g.,join
) - Tasks β 1 per partition, assigned to executors
β Lazy Evaluation: Transformations build a plan; actions trigger execution
π 7. py4j + JVM Bridge Deep Dive
π§ JVM Roles
- PySpark driver runs in Python
- Real execution happens on the JVM via py4j
- Each RDD/DataFrame operation is wrapped and sent over
π py4j Functions
Role | Example |
---|---|
JVM β Python callbacks | To access Python lambdas |
Python β JVM proxy | All Spark API calls |
Serialization Layer | Auto-translates Python β JVM |
π§΅ 8. Cluster Managers: Modes of Deployment
Mode | Use Case | Description |
---|---|---|
Local | Dev/Test | Run Spark on 1 machine |
Standalone | Small Prod | Sparkβs built-in manager |
YARN | Hadoop Cluster | Industry-standard scheduler |
Kubernetes | Cloud-Native | Elastic containers, scalable, cloud-friendly |
Examples:
spark-submit --master yarn --deploy-mode cluster script.py
spark-submit --master k8s://host --conf spark.kubernetes...
ποΈ 9. Spark Memory Hierarchy
- Storage Memory: For caching RDDs/DataFrames
- Execution Memory: Used while shuffles/sorts/joins happen
- Unified Memory Model: Shared space in executor
- Broadcast Memory: For small datasets to distribute across nodes
β Tune using:
--executor-memory 4g
--driver-memory 2g
π¦ 10. Monitoring via Spark UI
Tab | Insight Provided |
---|---|
Jobs | Each action triggers a job |
Stages | Shows stage DAGs, parallelism/shuffles |
SQL | Logical and physical plans |
Executors | Memory usage, task failure, GC time |
Storage | Cached RDDs/DataFrames |
Access via:
http://localhost:4040
- Or Spark History Server (for cluster jobs)
πΎ 11. Delta Lake + Databricks Essentials
Delta Lake runs on top of Spark + Parquet and adds:
Feature | Purpose |
---|---|
ACID Transactions | Safe multi-write concurrency |
Schema Evolution | Add/modify columns automatically |
MERGE INTO | SCD1/SCD2 Upserts |
Time Travel | Read past versions using version/timestamp |
Example:
df.write.format("delta").save("/mnt/delta_table")
# Upsert
from delta.tables import DeltaTable
DeltaTable.forPath(spark, "/mnt/table").alias("t") \
.merge(df.alias("s"), "t.id = s.id") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
π§ͺ 12. Complete Execution Flow (Real Job)
# PySpark Job
df = spark.read.csv("hdfs://cluster/sales.csv")
df_filtered = df.filter("region = 'Asia'")
df_grouped = df_filtered.groupBy("product").sum("amount")
df_grouped.write.parquet("hdfs://results")
Internal Execution:
1. SparkSession β SparkContext initialized
2. DAG created (3 transformations)
3. Catalyst optimizes it
4. DAG β Stages (filter, groupBy)
5. Task Scheduler assigns tasks
6. YARN allocates executors
7. Executors run tasks per partition
8. Shuffle for groupBy
9. Result written to HDFS
π§Ύ 13. Interview Cheat Sheet
β Terminologies to Memorize:
Concept | Explanation |
---|---|
RDD | Immutable, fault-tolerant collection |
DataFrame | Distributed table with schema |
Stage | Set of tasks before/after shuffle |
Task | Executes per partition |
Shuffle | Data movement across nodes |
Broadcast Join | Small β large table optimization |
Catalyst | Query optimizer |
Tungsten | Memory + code generation engine |
py4j | Python β JVM communication |
Cluster Manager | Allocates compute resources |
π§ One-Line Summary
PySpark wraps Python logic and routes it to Spark’s JVM backend using
py4j
. The Driver constructs the job, builds a DAG of transformations, breaks it into stages, and assigns tasks to Executors across the cluster, which execute tasks in parallel, read/write from HDFS, and return results via the SparkContext β all monitored using the Spark UI.
Interview-Ready Cheat Sheet
Core Concepts
- RDD is the base abstraction β fault-tolerant, distributed
- DataFrame is a distributed table with schema
- Wide Transformations (e.g., join) cause shuffle
- Actions trigger execution (
count()
,show()
)
PySpark Interview One-Liners
SparkSession
is the unified entry point in PySpark β₯ 2.0Broadcast
joins are optimal for small lookup tablesExecutor
memory is not the same as driver memoryShuffle
is expensive β avoid unless necessaryPersist()
vsCache()
β Cache = memory only, Persist = custom storage levels
π§ One-Liner Architecture Summary
Apache Spark uses a Driver-Executor model, where the Driver acts as the master to coordinate work, Executors run distributed tasks, and a Cluster Manager allocates resources. Spark builds a DAG of transformations, splits it into stages and tasks, and executes them in parallel on executors, using memory/disk/shuffle/broadcast optimizations. Storage can be HDFS, S3, Delta Lake, or JDBC, with monitoring via the Spark UI and event logs.
π Bonus: Real Job Flow Trace (Condensed for Interview)
- Spark code is submitted β Driver starts
- SparkContext connects to cluster
- Transformations β DAG β Optimized Plan
- Actions trigger DAG Scheduler
- Cluster Manager allocates Executors
- Tasks are assigned and run
- Data read/shuffled/written by Executors
- Results sent to Driver or written to output
- Progress tracked in Web UI or History Server
Absolutely, Rajeev. Here’s your complete Spark Architecture Mind Map (in text format) and a mock interview round focused entirely on PySpark Architecture & Internals β deep, realistic, and industry-proven.
π§ Mind Map: Spark Architecture (Text Outline Style)
Apache Spark Architecture
β
βββ 1. High-Level Abstractions
β βββ RDD: Resilient Distributed Dataset
β βββ DataFrame: Schema-aware RDD
β βββ Dataset (Scala/Java only)
β
βββ 2. Language Bindings
β βββ Scala (Native)
β βββ Java
β βββ Python (PySpark via py4j)
β βββ R
β
βββ 3. SparkSession
β βββ Entry point for DataFrame/SQL API
β
βββ 4. Execution Components
β βββ Driver
β β βββ Runs user code
β β βββ Hosts SparkContext
β β βββ Builds DAG, manages job lifecycle
β βββ Executors
β β βββ JVM processes on worker nodes
β β βββ Execute tasks
β β βββ Cache data & shuffle blocks
β βββ Cluster Manager
β βββ Standalone
β βββ YARN
β βββ Kubernetes
β
βββ 5. Internal Engines
β βββ Catalyst Optimizer (logical/physical plan)
β βββ Tungsten (memory management & codegen)
β
βββ 6. Execution Flow
β βββ Transformations (lazy)
β βββ Actions (trigger execution)
β βββ Logical Plan β Optimized Plan β Physical Plan
β βββ DAG Scheduler β Stages
β βββ Task Scheduler β Tasks β Executors
β
βββ 7. Memory Management
β βββ Execution Memory (shuffles, joins)
β βββ Storage Memory (caching)
β βββ Reserved Memory
β
βββ 8. Storage & IO
β βββ HDFS / S3 / Azure / GCS
β βββ JDBC / Hive Metastore
β βββ Delta Lake
β
βββ 9. Monitoring Tools
β βββ Spark UI (4040)
β βββ History Server
β βββ Event Logs
β
βββ 10. PySpark Internals
β βββ Python API via py4j
β βββ Python β JVM bridging
β βββ Serialization overhead considerations
β
βββ 11. Optimizations
βββ Broadcast joins
βββ Caching / Persistence
βββ Repartitioning / Coalesce
βββ AQE (Adaptive Query Execution)
You can visualize this as a circular or radial mind map with Apache Spark in the center.
π― Mock Interview: PySpark Architecture & Internals
Level: Mid to Senior
Format: Q&A + Suggested Ideal Answers
Q1. What is the role of the Driver in Spark?
β
Answer:
The Driver is the master process that runs the user’s main function and coordinates Spark jobs. It contains the SparkContext, builds the DAG, converts it into stages and tasks, and sends tasks to executors.
Q2. What happens when you call an action like .count()
on a DataFrame?
β Answer:
- A logical plan is built based on prior transformations
- Catalyst optimizes it into an optimized and physical plan
- DAG Scheduler splits it into stages
- Task Scheduler assigns tasks to executors
- Results are aggregated and returned to the driver
Q3. How does PySpark communicate with the Spark engine?
β
Answer:
PySpark uses a library called py4j to send Python commands to the JVM backend, which executes them using Spark’s Scala-based engine. The results are passed back via py4j.
Q4. What is the difference between an Executor and a Task?
β
Answer:
An Executor is a JVM process launched on worker nodes responsible for executing code and storing data.
A Task is the smallest unit of work sent to an executor, and corresponds to one data partition.
Q5. What are narrow and wide transformations?
β Answer:
- Narrow transformations (e.g.,
map
,filter
) do not require shuffling β data stays on the same node. - Wide transformations (e.g.,
groupBy
,join
) require shuffle, moving data across the network.
Q6. How is memory managed in Spark Executors?
β
Answer:
Memory in an executor is split into:
- Execution memory: Used for operations like joins/sorts.
- Storage memory: For caching RDD/DataFrame.
- Reserved memory: For internal Spark needs.
Unified Memory Model (default since Spark 1.6) shares space between execution and storage.
Q7. What is Catalyst Optimizer?
β
Answer:
Catalyst is Sparkβs query optimization engine for transforming logical plans into efficient physical plans. It supports constant folding, predicate pushdown, projection pruning, etc.
Q8. How do you handle small lookup tables during joins?
β
Answer:
Use Broadcast Join β small DataFrame is broadcast to all executors to avoid shuffle.
from pyspark.sql.functions import broadcast
df.join(broadcast(df_small), "key").show()
Q9. What is the role of py4j in PySpark?
β
Answer:
py4j is the gateway that allows Python code to invoke JVM methods. It bridges the Python driver with Sparkβs Scala backend.
Q10. Explain the complete life of a Spark job.
β Answer:
- SparkSession starts; SparkContext initialized
- Lazy transformations build a logical plan
- Catalyst optimizes it β physical plan
- DAG scheduler breaks it into stages
- Task scheduler schedules tasks per partition
- Executors run tasks and return results
- Spark UI shows status of all components
Bonus: β οΈ Common Pitfalls
Mistake | Solution |
---|---|
Too many small files (S3/HDFS) | Use .coalesce() or Delta OPTIMIZE |
Shuffles on large datasets | Use broadcast joins or repartitioning |
Overuse of UDFs | Prefer Spark SQL functions |
Caching everything | Cache only reused data |
Executor OOMs | Tune memory and partition sizes |
Absolutely, Rajeev! You’re in elite mastery mode now. Let’s hit all three topics in depth:
π§± DELTA LAKE ARCHITECTURE & TIME TRAVEL β INTERNALS
β What is Delta Lake?
Delta Lake is a storage layer on top of Parquet that brings ACID transactions, schema enforcement, time travel, and merge operations to Apache Spark.
π§° Delta Lake Key Features
Feature | Description |
---|---|
ACID Transactions | Atomicity, Consistency, Isolation, Durability across Spark jobs |
Time Travel | Query data as of previous versions (via version or timestamp) |
Schema Enforcement | Enforce column names/types on write |
Schema Evolution | Automatically handle schema changes when enabled |
Merge (Upserts) | Handle SCD Type 1 & Type 2 use cases |
OPTIMIZE/ZORDER | Improve scan speed via file compaction and indexing |
VACUUM | Clean obsolete files after retention period |
π¬ Internals: How Delta Lake Works
- Each write creates a new version (JSON + Parquet files).
- Transaction log is stored in
_delta_log
directory. - Read operations refer to the latest version (or past if
versionAsOf
is used). - All changes are append-only β no overwrite of existing data files.
- Delta maintains a commit log (
00000000.json
) + checkpointing for fast access.
π¦ Delta Time Travel
# Read latest
df = spark.read.format("delta").load("/mnt/sales")
# Read version 5
df_v5 = spark.read.format("delta").option("versionAsOf", 5).load("/mnt/sales")
# Read as of timestamp
df_time = spark.read.format("delta").option("timestampAsOf", "2024-12-01 12:00:00").load("/mnt/sales")
π§ͺ Merge (Upsert) Example
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "/mnt/sales")
delta_table.alias("t").merge(
updates_df.alias("u"),
"t.id = u.id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
π§Ή Optimize & Vacuum
spark.sql("OPTIMIZE delta.`/mnt/sales` ZORDER BY (region, product)")
spark.sql("VACUUM delta.`/mnt/sales` RETAIN 168 HOURS")
π RAW β BRONZE β SILVER β GOLD: LAKEHOUSE PIPELINE DESIGN
πΆ 1. Raw Layer (Landing)
- Source: CSVs, JSON, Kafka, APIs, RDBMS
- Storage: No schema enforcement, audit only
- Examples:
/datalake/raw/sales/
raw_df = spark.read.option("multiline", True).json("/raw/sales/")
π« 2. Bronze Layer (Cleaned & Ingested)
- Validated, minimally cleaned
- Add metadata:
_ingest_date
,_source
- Write to Delta Table
- Partitioned by key like
date
orregion
bronze_df = raw_df.withColumn("_ingest_date", current_date())
bronze_df.write.format("delta").mode("overwrite").save("/bronze/sales/")
βͺ 3. Silver Layer (Validated + Transformed)
- Business-ready format
- Joins with lookup tables
- Conform dimensions (e.g., DateDim, ProductDim)
- Column renaming, derived metrics
silver_df = bronze_df \
.filter("amount > 0") \
.join(region_dim, "region_id") \
.withColumn("profit", col("amount") - col("cost"))
silver_df.write.format("delta").mode("overwrite").save("/silver/sales/")
π‘ 4. Gold Layer (Aggregated/Reporting)
- Dashboard-ready format
- KPI aggregations, rollups
- Materialized views
gold_df = silver_df.groupBy("region", "month").agg(
sum("profit").alias("monthly_profit"),
count("*").alias("txn_count")
)
gold_df.write.format("delta").mode("overwrite").save("/gold/sales_summary/")
π‘ BONUS: Real-Time Trigger (Auto Loader + Streaming)
raw_stream = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.load("/raw/sales")
bronze_stream = raw_stream.withColumn("_ingest_time", current_timestamp())
bronze_stream.writeStream.format("delta") \
.option("checkpointLocation", "/checkpoints/bronze/") \
.start("/bronze/sales")
π MOCK INTERVIEW ROUND 2 β Delta Lake, Lakehouse, and Pipelines
Q1. How does Delta Lake achieve ACID transactions in Spark?
β
Answer:
Delta uses a transaction log (_delta_log
) that records every commit as a JSON file, similar to a WAL (write-ahead log). Operations are atomic, and concurrent writes are serialized.
Q2. What is the difference between Delta Lake and regular Parquet files?
β Answer:
Delta Lake | Parquet |
---|---|
ACID support | β |
Time Travel | β |
Schema evolution | β (with control) |
Merge/Update/Delete | β |
Performance Tuning | OPTIMIZE, ZORDER |
Reliability | Higher |
Q3. How would you implement SCD Type 2 using Delta Merge?
β
Answer:
Use merge()
with whenMatchedUpdate
, whenNotMatchedInsert
, and maintain a current_flag
, start_date
, end_date
column to track changes over time.
Q4. What is the purpose of Bronze/Silver/Gold architecture?
β Answer:
- Bronze: Raw cleaned data, audit trails
- Silver: Business logic applied, conform dimensions
- Gold: Aggregated, report-ready data
It enforces data quality, version control, and modularity.
Q5. How is Time Travel internally maintained in Delta Lake?
β
Answer:
Each write creates a new snapshot/version in _delta_log
. The log stores metadata and the exact set of files for that version. versionAsOf
or timestampAsOf
fetches that version during read.
Q6. How do you prevent VACUUM from deleting old data prematurely?
β
Answer:
Delta has a default 7-day retention (168 hours). Set lower manually only if absolutely safe:
SET spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM ... RETAIN 1 HOURS
Q7. How do you ensure schema consistency in Bronze β Silver flow?
β
Answer:
Use mergeSchema=True
when writing OR set strict enforcement to avoid corrupting downstream pipelines.
df.write.option("mergeSchema", True).format("delta")...
Leave a Reply