Let’s break down two important performance optimization concepts in Apache Spark:
π₯ 1. Speculative Execution in Spark
β What is it?
Speculative Execution helps handle straggler tasks β tasks that are unusually slow due to hardware issues, data skew, etc.
Spark launches backup copies of slow tasks on other executors. Whichever finishes first, Spark keeps that result and kills the slower one.
β Why is it needed?
In a cluster, one node might be slower due to:
- CPU/network bottlenecks
- Disk I/O problems
- Data skew (some partitions have way more data)
Rather than wait for that node, Spark uses speculative execution to mitigate delays.
β How to enable it?
spark.conf.set("spark.speculation", "true") # Enable it
spark.conf.set("spark.speculation.multiplier", "1.5") # 1.5Γ average task time triggers speculation
spark.conf.set("spark.speculation.quantile", "0.75") # Only consider slowest 25% for retry
spark.conf.set("spark.speculation.interval", "100ms") # How often to check
You can also set it in spark-defaults.conf
or at job level.
π§ͺ Example Use Case:
Youβre running a Spark job processing log files across 200 tasks.
One executor has a disk issue, and its task takes 3Γ longer.
β Spark runs a speculative copy of that task on a healthy executor.
β The fast one finishes, the result is used.
π Monitoring:
You can track speculative tasks in Spark UI under the “Tasks” tab (look for duplicated attempts).
βοΈ 2. Resource Allocation Methods in Spark
β What are we allocating?
Spark dynamically or statically allocates:
- Executors (JVMs that run tasks)
- Cores per executor
- Memory per executor
π¦ Allocation Modes
Mode | Description |
---|---|
Static Allocation | You define number of executors, cores, memory manually. |
Dynamic Allocation | Spark adds/removes executors automatically based on workload. |
β Static Allocation (Manual)
You specify everything upfront:
--num-executors 4
--executor-cores 4
--executor-memory 8G
Total cluster capacity = 4 Γ 4 cores = 16 cores
Useful when:
- You know job behavior
- Fixed-size data
- Cluster has limited flexibility
β Dynamic Resource Allocation
Spark scales up/down executors automatically at runtime.
Enable in config:
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.shuffle.service.enabled", "true") # Needed to keep shuffle data
spark.conf.set("spark.dynamicAllocation.minExecutors", "2")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "10")
spark.conf.set("spark.dynamicAllocation.initialExecutors", "3")
Useful for:
- Jobs with uneven stages
- Clusters with shared resources
- Streaming jobs with bursts
π§ Best Practices:
Scenario | Recommendation |
---|---|
Large jobs, heavy skew | Enable speculative execution |
Shared cluster, bursty load | Enable dynamic allocation |
Stable job, consistent load | Use static allocation |
π― Real-Life Analogy:
- Speculative Execution = sending another person to do the job when the first is taking too long.
- Dynamic Allocation = hiring more workers when workload increases, releasing them when done.
β Example in Spark Submit
spark-submit \
--conf spark.speculation=true \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=2 \
--conf spark.dynamicAllocation.maxExecutors=8 \
--num-executors 4 \
--executor-memory 4G \
--executor-cores 2 \
my_job.py
Hereβs a practical guide to using Spark DataFrame Hints in Databricks, with real examples, and how to inspect their effects using .explain()
and the Spark UI (Query Plan).
β Setup: Sample Tables
Letβs say you have the following:
large_df = spark.range(1, 1_000_000).withColumnRenamed("id", "user_id")
small_df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["user_id", "name"])
π§ 1. Broadcast Hint
π Use Case: Small lookup/dimension table join
π§ Code:
from pyspark.sql.functions import broadcast
result_df = large_df.join(broadcast(small_df), "user_id")
result_df.explain(True)
π§ͺ Output (EXPLAIN PLAN includes):
... BroadcastHashJoin ...
... BroadcastExchange ...
β BroadcastExchange confirms Spark sent the small table to all executors.
π§ 2. Coalesce Hint
π Use Case: You want to reduce output shuffle partitions after aggregation
df = large_df.groupBy("user_id").count().hint("coalesce", 4)
df.explain(True)
π§ͺ EXPLAIN Output:
Check for this in physical plan:
... CoalescedShuffleReader ...
β It tells Spark to reduce post-shuffle partitions to 4.
π§ 3. Repartition Hint
π Use Case: Force repartitioning during a join or aggregation
df = large_df.join(small_df.hint("repartition", 8), "user_id")
df.explain(True)
π§ͺ EXPLAIN Output:
Repartition ...
β Spark performs an extra repartition stage to match the hint.
π§ 4. Shuffle Hash Join
π Use Case: You want to override sort-merge and force a hash join
df = large_df.hint("shuffle_hash").join(small_df, "user_id")
df.explain(True)
π§ͺ EXPLAIN Output:
... ShuffledHashJoin ...
β Confirms Spark used hash join even if sort-merge was eligible.
π§ 5. Shuffle Replicate NL Join (advanced)
π Use Case: Star schema or non-equi join where one table is small
df = large_df.hint("shuffle_replicate_nl").join(small_df, large_df.user_id > small_df.user_id)
df.explain(True)
π§ͺ EXPLAIN Output:
CartesianProduct ...
β οΈ Confirms Spark replicated the small table and used nested loop (very expensive).
π§ How to Read .explain(True)
Output
- Parsed Logical Plan: Your raw SQL/dataframe
- Analyzed Logical Plan: Column types and structure
- Optimized Logical Plan: After Catalyst transformations
- Physical Plan: Actual execution strategy
- Look for
BroadcastHashJoin
,SortMergeJoin
,ShuffledHashJoin
,CartesianProduct
, etc.
- Look for
π§ How to View in Databricks UI
- Run your notebook cell (with or without
.explain()
). - Click on the “View” β “Query” link just under the result.
- Go to Query Details β DAG β Physical Plan tab.
- You’ll see the join strategy and whether broadcast or shuffle happened.
π¦ Pro Tip: When Hints Are Ignored
Spark may ignore hints when:
- Broadcast size exceeds
spark.sql.autoBroadcastJoinThreshold
- Partition count is too low/high
- Another plan is significantly cheaper
Youβll know hints were respected if you see them in .explain()
output or Spark UI DAG.
β Summary Table of Real Usage
Hint Name | Code Example | Plan Keyword |
---|---|---|
broadcast | join(broadcast(df2), "key") | BroadcastHashJoin |
coalesce | .hint("coalesce", 4) | CoalescedShuffleReader |
repartition | .hint("repartition", 8) | Repartition |
shuffle_hash | .hint("shuffle_hash") | ShuffledHashJoin |
shuffle_replicate_nl | .hint("shuffle_replicate_nl") + non-equi join | CartesianProduct |