Spark Configuration, Monitoring, and Tuning, covering theory + code examples

Here’s a comprehensive guide to Spark Configuration, Monitoring, and Tuning, covering theory + code examples. It’s especially helpful for Data Engineers working on performance optimization or preparing for interviews.


βš™οΈ Spark Configuration, Monitoring, and Tuning


πŸ”§ 1. Understand Components of the Spark Cluster

A Spark cluster consists of the following core components:

ComponentDescription
Driver ProgramRuns the main function of the Spark application; manages job execution
Cluster ManagerAllocates resources; can be YARN, Standalone, Mesos, or Kubernetes
Worker Nodes (Executors)Execute the tasks and return results to the driver
ExecutorsRun on worker nodes to process data and store it in memory/disk
TasksUnits of work sent by the driver to executors

Hierarchy: Application β†’ Jobs β†’ Stages β†’ Tasks


βš™οΈ 2. Configure Spark Properties

➀ You can configure Spark using:

  • spark-submit options
  • spark-defaults.conf
  • Code (SparkConf, SparkSession.builder)
  • Environment variables
  • log4j.properties for logging

βœ… Example: Setting properties in code

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.sql.shuffle.partitions", "100") \
    .getOrCreate()

βœ… Example: spark-submit config

spark-submit \
  --executor-memory 4G \
  --executor-cores 2 \
  --conf "spark.sql.shuffle.partitions=100" \
  app.py

🌐 3. Monitoring Spark

βœ… A. Spark Web UI (default: port 4040)

  • Stages tab: Time taken, input size, shuffle reads/writes
  • Executors tab: Memory usage, GC time, task time
  • SQL tab: Physical/execution plan and optimizations

For YARN: Use http://<resourcemanager>:8088 to track your app

βœ… B. Metrics and Logs

  • Metrics via JMX, Prometheus, Ganglia, or Graphite
  • Logs in stdout, stderr, or centralized (e.g., ELK stack)
  • Configurable in log4j.properties

βœ… C. Spark History Server (post-application monitoring)

spark.eventLog.enabled = true
spark.eventLog.dir = hdfs://eventlog_dir

πŸš€ 4. Performance Tuning Considerations

βœ… A. Partitioning

  • Too few: underutilization
  • Too many: overhead
df.repartition(100)       # Rebalance partition count
df.coalesce(10)           # Reduce number of partitions

βœ… B. Shuffle Optimization

  • Set spark.sql.shuffle.partitions appropriately
  • Use broadcast joins for small dimension tables:
from pyspark.sql.functions import broadcast
df.join(broadcast(dim_df), "id")

βœ… C. Caching / Persistence

df.cache()       # Stores in memory
df.persist()     # Customize memory/disk behavior

βœ… D. Memory Settings

  • spark.executor.memory
  • spark.driver.memory
  • spark.memory.fraction (default 0.6)

βœ… E. Parallelism

  • spark.default.parallelism (for RDD)
  • spark.sql.shuffle.partitions (for DataFrames)

πŸ“Š Spark Tuning Summary Table

ParameterPurposeDefaultTuning Tip
spark.executor.memoryMemory per executor1gIncrease for large data
spark.executor.coresCores per executor12-5 for best balance
spark.sql.shuffle.partitionsNum partitions after shuffle200Reduce for small jobs
spark.sql.autoBroadcastJoinThresholdMax size for broadcast10MBIncrease for bigger lookup tables
spark.memory.fractionFraction of heap for execution0.6Use 0.8 if no caching

this is very commonly asked in Data Engineering interviews, especially for PySpark / Big Data roles. Here’s how you can structure realistic, confident, and technically grounded answers about:


🧠 How to Answer: Data Size, Pipeline, Optimization, Cluster


βœ… 1. β€œWhat is the size of data you’ve worked on?”

πŸ—£οΈ Sample Answer (based on your profile & resume):

“In my recent projects at Axis Bank BIU and Capgemini, I’ve worked with datasets ranging from 50 GB to over 2 TB per day. For customer-level analytics and cross-sell campaigns, the largest pipeline processed about 5–8 TB of historical data across 100+ partitions using PySpark with Hive and Delta Lake.”

πŸ“Œ TIP: Avoid saying “Petabytes” unless you’re 100% sure and worked on distributed storage (like S3 or HDFS clusters) with multi-TB daily volumes.


βœ… 2. β€œDescribe your data pipeline.”

πŸ—£οΈ Sample Answer:

“Our pipeline follows a metadata-driven ETL architecture built on PySpark + Hive. It fetches data from Oracle, Hive, and flat files like CSVs, transforms them in Spark (handling nulls, deduplication, and joins), writes to Delta Lake tables, and registers the outputs for downstream BI use.

We use Airflow or CI/CD pipelines (via Jenkins) to orchestrate the job sequence, handle failure retries, and trigger alerts. Processing time is optimized using broadcast joins, partitioning logic, and Spark SQL tuning.”


βœ… 3. β€œHow do you optimize memory usage in Spark?”

πŸ—£οΈ Sample Answer:

“We start with tuning memory properties like spark.executor.memory, spark.memory.fraction, and spark.sql.shuffle.partitions. I usually:

  • Use persist() with MEMORY_AND_DISK when caching transformed DataFrames
  • Reduce shuffle overhead with broadcast joins for small dimension tables
  • Use coalesce() or repartition() to optimize partition skew
  • Disable automatic caching in unnecessary stages and use unpersist() when no longer needed

We also fine-tune GC behavior and monitor the storage vs execution memory ratio using Spark UI and logs.”


βœ… 4. β€œWhat was the size and configuration of your cluster?”

πŸ—£οΈ Sample Answer:

“Our typical Spark cluster on Azure Databricks used:

  • 8–10 worker nodes
  • Each with 16–32 GB memory, 4–8 cores
  • Driver with 32 GB, and 4–6 cores

We used autoscaling clusters for variable loads, and kept jobs in cluster mode for better resilience.”

πŸ“Œ Alternate (for on-prem Hadoop):

“We had a Cloudera-managed on-prem cluster with YARN as the resource manager. Each node had 64 GB RAM, and Spark used 4–6 cores per executor with 2–4 executors per node.”


πŸ“‹ Bonus: Ready Interview Snippet Summary Table

QuestionCrisp Answer (Customize for You)
Data Size50 GB to 2 TB daily; 5–8 TB historical
Pipeline TypeMetadata-driven ETL in PySpark + Hive + Delta
OptimizationsBroadcast joins, partitioning, persist/unpersist, tuning shuffle
Cluster Config8–10 nodes, 32 GB RAM each, 4–8 cores, Databricks autoscale

Excellent observation. Let’s break it down and clarify what automatic caching refers to in Sparkβ€”and how/when you disable caching or use unpersist().


πŸ” What Is β€œAutomatic Caching” in Spark?

Strictly speaking, Spark does not β€œautomatically” cache every DataFrame or RDD. However, there are two cases where caching may appear β€œautomatic”:

βœ… 1. Spark SQL Tables / Views in Interactive Notebooks (e.g., Databricks)

In interactive environments like Databricks or Zeppelin, when you create a temporary view and repeatedly query it:

df.createOrReplaceTempView("my_table")
spark.sql("SELECT * FROM my_table WHERE age > 25").show()

Spark might cache the result behind the scenes if:

  • You’re using Databricks interactive clusters
  • The query is reused in multiple cells
  • Adaptive Query Execution (AQE) decides it’s beneficial

πŸ‘‰ This is not standard Spark behavior in spark-submit batch jobs.


βœ… 2. Caching via UI Option or Delta Table Optimization

Databricks sometimes enables caching of Delta tables for faster UI queries or dashboard interactions.

CACHE TABLE sales_data

This caches the table in memory implicitly across jobs.


❌ What You Should NOT Do: Keep DataFrames Cached Unnecessarily

If you use:

df.cache()

Then don’t forget to unpersist when done:

df.unpersist()

Otherwise, you waste memory, leading to:

  • Out-of-memory errors
  • Excessive garbage collection (GC)
  • Slower job performance

βœ… Best Practice: When to Use .unpersist()

ScenarioAction
DataFrame is used across multiple transformationsdf.cache() or df.persist()
DataFrame is no longer neededdf.unpersist()
Intermediate stage cached only for debuggingRemove .cache() after done
Memory is constrainedPrefer persist(StorageLevel.MEMORY_AND_DISK) over cache() (which is MEMORY_ONLY)

πŸ” Quick Code Example

df = spark.read.parquet("big_data.parquet")
df.cache()  # Store in memory

# Several expensive ops
df_filtered = df.filter(df["value"] > 100)
df_agg = df_filtered.groupBy("category").count()

# Done with df
df.unpersist()  # Free up memory

❗ Summary

MythReality
“Spark automatically caches DataFrames”❌ Not unless explicitly told or in notebook environments with AQE or Delta caching
“No need to unpersist”❌ You must unpersist() if you’re done with cached data
“Cache everything”❌ Only cache reused, expensive intermediate results

Excellent β€” this is a top-tier Spark interview question. Let’s build a clear, end-to-end breakdown that will:

βœ… Impress interviewers
βœ… Show real understanding (not just buzzwords)
βœ… Help you visualize and explain Spark internals with a sample 3-step pipeline


βš™οΈ What is Spark Architecture (Big Picture)

πŸ“Œ Spark Architecture Components

ComponentRole
DriverYour main app (Python/Scala) that coordinates everything
Cluster ManagerAllocates resources (YARN, Standalone, Kubernetes)
ExecutorsRun tasks on worker nodes, store intermediate data
TasksAtomic units of work (e.g., a map/filter on a partition)
JobsCreated for every action() like .collect(), .count()
StagesGroup of tasks that can run without shuffle
ShuffleData movement across partitions between stages

πŸ§ͺ Let’s Take a Sample 3-Step Spark Code

df = spark.read.csv("data.csv", header=True)
df_filtered = df.filter(df["price"] > 100)
df_result = df_filtered.groupBy("category").count()
df_result.show()

πŸ” What Happens Behind the Scenes?

βœ… Step-by-Step Breakdown


🧠 Step 1: Reading the File

df = spark.read.csv("data.csv", header=True)
Behind the scenesDetails
Job triggered?❌ No job yet β€” lazy execution
Partitions?Depends on file β€” say file is 1 GB and default block size is 128 MB β†’ ~8 partitions
Executor action?None β€” just builds a logical plan
Stored as?Logical plan β†’ DataFrame metadata, not data

βœ… Step 2: Transformation

df_filtered = df.filter(df["price"] > 100)
Behind the scenesDetails
Job triggered?❌ Still no β€” transformations are lazy
Spark plan?Adds filter node to logical plan (Catalyst optimizer)
Execution?No action yet

βœ… Step 3: Action

df_result = df_filtered.groupBy("category").count()
df_result.show()
Behind the scenesDetails
Job triggered?βœ… YES β€” .show() is an action
Jobs?βœ… 1 Job triggered (each action = 1 job)
Stages?Usually 2 Stages:
  • Stage 1: reading/filtering β†’ narrow transformation
  • Stage 2: groupBy β†’ causes shuffle |
    | Tasks? | 1 task per partition. If 8 input partitions:
  • Stage 1 β†’ 8 tasks
  • Stage 2 β†’ 8 shuffle tasks |
    | Shuffle? | βœ… YES β€” groupBy triggers shuffle |
    | DAG? | Directed Acyclic Graph connecting these stages |

πŸ”„ Spark DAG Visualization (Simple)

Job 1
 β”œβ”€β”€ Stage 1 (narrow) β†’ read CSV + filter
 β”‚     └── Tasks: 8 (1 per input partition)
 └── Stage 2 (wide) β†’ groupBy, shuffle
       └── Tasks: 8 (repartitioned)

πŸ“¦ Partitioning Summary

OperationPartition Behavior
read.csv()Based on input splits (HDFS/CSV size)
filter()Narrow transformation β€” no shuffle
groupBy()Wide transformation β€” triggers shuffle
show()Triggers execution, collects result to driver

🧠 Final Interview-Ready Answer

“When we run a 3-step pipeline like read β†’ filter β†’ groupBy, Spark builds a logical plan first. No jobs are triggered until an action like .show() is called.

At that point, Spark breaks the DAG into stages, where narrow transformations (like filter) stay in the same stage, and wide transformations (like groupBy) cause a shuffle, splitting into multiple stages. Each stage is made of tasks equal to the number of partitions.

For a 1GB file with 128MB blocks, say we get 8 partitions β†’ Stage 1: 8 filter tasks, Stage 2: 8 shuffle tasks. Total: 1 job, 2 stages, 16 tasks.”


πŸ’‘ BONUS: How to Inspect This in Spark UI?

  • Jobs tab: shows 1 job triggered by .show()
  • Stages tab: 2 stages β€” 1 before shuffle, 1 after
  • Tasks tab: Number of partitions = number of tasks per stage

Absolutely β€” let’s break down your realistic data size claim (50 GB to 2 TB per day, 5–8 TB historical) and explain how partitions are decided, both automatically by Spark and tuned manually by you, especially when using spark-submit.


🧩 Realistic Statement (Broken Down)

“I’ve worked with pipelines that process 50 GB to over 2 TB per day, and our largest campaign-level pipeline handled 5–8 TB historical data using PySpark with Hive and Delta Lake, typically across 100+ partitions.

Let’s now unpack:


βš™οΈ 1. How Spark Decides Partitions by Default

βœ… For File-Based Reads (CSV, Parquet, Delta):

  • Spark creates 1 partition per file block (usually 128 MB or 256 MB).
  • Example:
    • 1 TB file β†’ with 128 MB blocks β†’ ~8000 partitions
  • It uses:
spark.conf.get("spark.sql.files.maxPartitionBytes")  # default = 134217728 (128 MB)

βœ… For Hive Tables (through HiveContext):

  • Spark honors underlying HDFS block splits and partition columns.
  • If Hive table has partition columns (like region, date), Spark will prune partitions based on query filter.

πŸ” 2. How You Can Control or Tune Partitions

βœ… A. While Reading

df = spark.read.csv("big.csv")  # Spark decides based on file size and block size

df = df.repartition(200)        # Explicitly repartition to 200 partitions
df = df.coalesce(50)            # Reduce partitions when fewer cores are used

πŸ”₯ You decide based on:

  • Cluster size (executors, cores)
  • Data size
  • Expected parallelism

βœ… B. In spark-submit or SparkSession

You control key parameters that influence partitioning:

❗ Critical Spark Parameters

ConfigMeaningTypical Value
spark.sql.shuffle.partitions# of partitions after shuffle ops (e.g., groupBy, join)default = 200
spark.default.parallelismFor RDDs (non-DF)usually 2–3 Γ— total cores
spark.sql.files.maxPartitionBytesMax bytes per partition when reading filesdefault = 128 MB

✳️ How to Set in spark-submit:

spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --executor-cores 4 \
  --executor-memory 8G \
  --num-executors 10 \
  --conf spark.sql.shuffle.partitions=300 \
  --conf spark.sql.files.maxPartitionBytes=268435456 \  # 256MB
  my_pipeline.py

🧠 Partition Planning Example:

Let’s say:

  • Input = 2 TB from Hive or Delta
  • Block size = 256 MB
  • Effective partitions = 2 TB Γ· 256 MB = ~8000 partitions
  • Cluster = 10 nodes Γ— 4 cores = 40 tasks in parallel

β†’ Too many partitions will cause:

  • Excessive overhead (task scheduling, small files)
    β†’ So you may:
df = df.repartition(500)  # Reasonable number

πŸ“Š How You Can Explain It in Interviews

“Spark by default creates partitions based on file block size β€” for example, 2 TB of data on 128 MB blocks gives us 16,000 partitions, which could be excessive. We typically repartition to around 300–600 depending on our executor count, memory, and job complexity.

For heavy transformations like groupBy or join, we tune spark.sql.shuffle.partitions to control the post-shuffle load.

On large-scale historical pipelines (5–8 TB), we used dynamic partition pruning in Hive and partition filters to reduce the scan footprint.”


βœ… Summary Table: Partition Tuning at Scale

Control PointWhat It DoesWhen to Tune
repartition(n)Redistribute DF into n partitionsWhen you know the cluster’s core count
coalesce(n)Reduce partitions with minimal shuffleBefore writing
spark.sql.shuffle.partitionsSet shuffle partitions (e.g., after join)For grouped/joined data
spark.sql.files.maxPartitionBytesControls input partition sizeWhen reading large files
Hive partitionsPhysical partitions like dt=20240701Leverage partition pruning

Excellent question β€” this dives deep into how partitioning before vs after shuffle works in Spark, and how to avoid manually tuning every DataFrame by leveraging Spark’s automatic features like dynamic partition pruning and smart defaults.


πŸ” Question Recap:

  • Original: 16,000 partitions (from 2 TB data)
  • Manually repartitioned to ~300–600
  • Then a shuffle operation (like groupBy, join) uses spark.sql.shuffle.partitions = 200
  • How do they work together?
  • Can we avoid manually checking each DataFrame?
  • How does dynamic partitioning/pruning work here?

🧠 Let’s Break It Down Step-by-Step


βœ… 1. Original Read Partitions (16,000)

Spark reads data from HDFS/S3/Delta β†’ splits into partitions based on file/block size:

  • 2 TB / 128 MB = 16,000 partitions
  • Used in Stage 1: input transformations (e.g., filter, select)

🟒 If left untouched, every task will process ~128MB.
❌ But 16,000 tasks = overhead (task scheduling, metadata, memory pressure)


βœ… 2. Manual Repartition (300–600)

We reduce partition count to better fit the cluster:

df = df.repartition(500)
  • Improves performance for wide transformations, which benefit from fewer but larger partitions
  • Also helps with writing fewer output files

βœ… 3. Shuffle Transformations Use spark.sql.shuffle.partitions

Now comes a wide transformation, like:

df.groupBy("region").agg(...)

This triggers a shuffle, and Spark will:

  • Discard existing partitions
  • Redistribute data across spark.sql.shuffle.partitions partitions

πŸ’‘ Even if original df had 500 partitions β†’ shuffle results in 200 partitions by default.

πŸ“Œ You can change it:

spark.conf.set("spark.sql.shuffle.partitions", 300)

πŸ“Œ ❓ So, how do repartition() and shuffle partitions interact?

StepPartitions
read() from 2 TB16,000
repartition(500)500
groupBy() (shuffle)200 ← uses spark.sql.shuffle.partitions

βœ… Repartition affects pre-shuffle data distribution
βœ… shuffle.partitions controls post-shuffle parallelism
➑️ They serve different stages of the DAG


😩 β€œBut I don’t want to manually check every DataFrame…”

You’re absolutely right.


βœ… 4. How to Handle Partitioning Dynamically

πŸ”Ή A. Use Adaptive Query Execution (AQE)

πŸ”§ Automatically optimizes:

  • Shuffle partition counts
  • Join strategies (broadcast vs shuffle)
  • Skew handling
spark.conf.set("spark.sql.adaptive.enabled", True)
spark.conf.set("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", "64MB")

πŸ’₯ Now Spark ignores fixed spark.sql.shuffle.partitions, and adjusts based on data at runtime.


πŸ”Ή B. Use Dynamic Partition Pruning (for Hive/Delta)

πŸš€ Optimizes queries like:

fact_df.join(dim_df, "region")  # where dim_df.region is small and filtered

βœ… Spark prunes partitions dynamically at runtime instead of scanning all partitions.

spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", True)

πŸ” End-to-End Example: Partition Logic in Action

# Read 2 TB β†’ initially 16,000 partitions
df = spark.read.parquet("2tb_table")

# Reduce input overhead
df = df.repartition(500)

# Enable AQE + DPP
spark.conf.set("spark.sql.adaptive.enabled", True)
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", True)

# Perform shuffle (AQE will tune the shuffle partition count)
result = df.join(dim, "region").groupBy("region").count()

βœ… Summary Table

ConceptWhat It ControlsWhen It Applies
repartition(n)Pre-shuffle input layoutBefore shuffle ops
coalesce(n)Shrink partitions (post filter)Before write
spark.sql.shuffle.partitionsPost-shuffle stage partition countWhen AQE is off
AQE (adaptive.enabled)Automatically adjusts shuffle partitionsAt runtime
DPP (dynamicPartitionPruning.enabled)Reads only necessary partitions (Hive/Delta)At runtime based on join filters

πŸ’¬ Interview-Ready Answer Snippet:

“We usually start with large data inputs, say 2 TB, resulting in ~16,000 partitions. Before transformations like join or groupBy, we explicitly repartition to ~300–600 to balance load. But for shuffle stages, Spark uses spark.sql.shuffle.partitions, typically set to 200.

To avoid manually tuning every DataFrame, we enable Adaptive Query Execution, which dynamically adjusts shuffle partition sizes based on runtime stats, and Dynamic Partition Pruning, which skips irrelevant partitions in fact tables during joins.”


Fantastic wrap-up question! Here’s a clear answer that every Spark engineer must know β€” and it’s a frequent interview point:


🧾 Q: “We are writing our final DataFrame to disk (Parquet/Delta/CSV). If we want to reduce the number of output files/partitions, should we use coalesce() or repartition()?”


βœ… TL;DR:

UseWhen
coalesce(n) βœ…When decreasing the number of partitions β€” especially just before writing output
repartition(n) ❌Not ideal for final write if reducing β€” it causes a full shuffle
Why?coalesce() avoids full shuffle; merges partitions efficiently

πŸ” Detailed Explanation:

βœ… coalesce(n): Merge partitions WITHOUT full shuffle

  • Used when reducing partitions
  • Spark tries to merge adjacent partitions β€” lightweight
  • Best when writing final output with fewer files
  • Preserves existing data ordering as much as possible

✳️ Example:

df = df.coalesce(10)
df.write.mode("overwrite").parquet("output/")

πŸ’‘ Output = 10 files


❌ repartition(n): Full Shuffle

  • Triggers full data shuffle
  • Used when increasing partitions or changing partition key
  • Expensive for large DataFrames
  • Should be used earlier in pipeline, not just before write

πŸ”» Not recommended before write:

df = df.repartition(10)  # Full shuffle!
df.write.parquet("output/")

πŸ”₯ Example Scenario: Writing Final Output

df = spark.read.parquet("input/")      # Assume 1000 partitions
df = df.filter(df["active"] == True)   # Maybe now ~200 partitions

df = df.coalesce(10)                   # Reduce output to 10 files (no shuffle)
df.write.mode("overwrite").parquet("final_output/")

🧠 Interview Tip: Mention coalesce before .write() to optimize output file count.


🧠 Interview-Ready Answer:

“Before writing data to disk, I use coalesce() to reduce the number of output partitions/files β€” say from 200 to 10. It’s more efficient than repartition() since it avoids a full shuffle and is optimized for scenarios where we’re only merging partitions. I use repartition() earlier in the pipeline when I need to increase parallelism or redistribute data.”


Pages: 1 2 3 4


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Posted in

Leave a Reply

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading