Pyspark Wholesome Tutorial- Links to refer, PDfs

Let’s break down two important performance optimization concepts in Apache Spark:


πŸ”₯ 1. Speculative Execution in Spark

βœ… What is it?

Speculative Execution helps handle straggler tasks β€” tasks that are unusually slow due to hardware issues, data skew, etc.

Spark launches backup copies of slow tasks on other executors. Whichever finishes first, Spark keeps that result and kills the slower one.


βœ… Why is it needed?

In a cluster, one node might be slower due to:

  • CPU/network bottlenecks
  • Disk I/O problems
  • Data skew (some partitions have way more data)

Rather than wait for that node, Spark uses speculative execution to mitigate delays.


βœ… How to enable it?

spark.conf.set("spark.speculation", "true")                    # Enable it
spark.conf.set("spark.speculation.multiplier", "1.5")          # 1.5Γ— average task time triggers speculation
spark.conf.set("spark.speculation.quantile", "0.75")           # Only consider slowest 25% for retry
spark.conf.set("spark.speculation.interval", "100ms")          # How often to check

You can also set it in spark-defaults.conf or at job level.


πŸ§ͺ Example Use Case:

You’re running a Spark job processing log files across 200 tasks.
One executor has a disk issue, and its task takes 3Γ— longer.
β†’ Spark runs a speculative copy of that task on a healthy executor.
β†’ The fast one finishes, the result is used.


πŸ” Monitoring:

You can track speculative tasks in Spark UI under the “Tasks” tab (look for duplicated attempts).


βš™οΈ 2. Resource Allocation Methods in Spark

βœ… What are we allocating?

Spark dynamically or statically allocates:

  • Executors (JVMs that run tasks)
  • Cores per executor
  • Memory per executor

🚦 Allocation Modes

ModeDescription
Static AllocationYou define number of executors, cores, memory manually.
Dynamic AllocationSpark adds/removes executors automatically based on workload.

βœ… Static Allocation (Manual)

You specify everything upfront:

--num-executors 4
--executor-cores 4
--executor-memory 8G

Total cluster capacity = 4 Γ— 4 cores = 16 cores

Useful when:

  • You know job behavior
  • Fixed-size data
  • Cluster has limited flexibility

βœ… Dynamic Resource Allocation

Spark scales up/down executors automatically at runtime.

Enable in config:

spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.shuffle.service.enabled", "true")   # Needed to keep shuffle data
spark.conf.set("spark.dynamicAllocation.minExecutors", "2")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "10")
spark.conf.set("spark.dynamicAllocation.initialExecutors", "3")

Useful for:

  • Jobs with uneven stages
  • Clusters with shared resources
  • Streaming jobs with bursts

🧠 Best Practices:

ScenarioRecommendation
Large jobs, heavy skewEnable speculative execution
Shared cluster, bursty loadEnable dynamic allocation
Stable job, consistent loadUse static allocation

🎯 Real-Life Analogy:

  • Speculative Execution = sending another person to do the job when the first is taking too long.
  • Dynamic Allocation = hiring more workers when workload increases, releasing them when done.

βœ… Example in Spark Submit

spark-submit \
  --conf spark.speculation=true \
  --conf spark.dynamicAllocation.enabled=true \
  --conf spark.dynamicAllocation.minExecutors=2 \
  --conf spark.dynamicAllocation.maxExecutors=8 \
  --num-executors 4 \
  --executor-memory 4G \
  --executor-cores 2 \
  my_job.py

Here’s a practical guide to using Spark DataFrame Hints in Databricks, with real examples, and how to inspect their effects using .explain() and the Spark UI (Query Plan).


βœ… Setup: Sample Tables

Let’s say you have the following:

large_df = spark.range(1, 1_000_000).withColumnRenamed("id", "user_id")
small_df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["user_id", "name"])

🧠 1. Broadcast Hint

πŸ“Œ Use Case: Small lookup/dimension table join

πŸ”§ Code:

from pyspark.sql.functions import broadcast

result_df = large_df.join(broadcast(small_df), "user_id")
result_df.explain(True)

πŸ§ͺ Output (EXPLAIN PLAN includes):

... BroadcastHashJoin ...
... BroadcastExchange ...

βœ… BroadcastExchange confirms Spark sent the small table to all executors.


🧠 2. Coalesce Hint

πŸ“Œ Use Case: You want to reduce output shuffle partitions after aggregation

df = large_df.groupBy("user_id").count().hint("coalesce", 4)
df.explain(True)

πŸ§ͺ EXPLAIN Output:

Check for this in physical plan:

... CoalescedShuffleReader ...

βœ… It tells Spark to reduce post-shuffle partitions to 4.


🧠 3. Repartition Hint

πŸ“Œ Use Case: Force repartitioning during a join or aggregation

df = large_df.join(small_df.hint("repartition", 8), "user_id")
df.explain(True)

πŸ§ͺ EXPLAIN Output:

Repartition ...

βœ… Spark performs an extra repartition stage to match the hint.


🧠 4. Shuffle Hash Join

πŸ“Œ Use Case: You want to override sort-merge and force a hash join

df = large_df.hint("shuffle_hash").join(small_df, "user_id")
df.explain(True)

πŸ§ͺ EXPLAIN Output:

... ShuffledHashJoin ...

βœ… Confirms Spark used hash join even if sort-merge was eligible.


🧠 5. Shuffle Replicate NL Join (advanced)

πŸ“Œ Use Case: Star schema or non-equi join where one table is small

df = large_df.hint("shuffle_replicate_nl").join(small_df, large_df.user_id > small_df.user_id)
df.explain(True)

πŸ§ͺ EXPLAIN Output:

CartesianProduct ...

⚠️ Confirms Spark replicated the small table and used nested loop (very expensive).


🧠 How to Read .explain(True) Output

  • Parsed Logical Plan: Your raw SQL/dataframe
  • Analyzed Logical Plan: Column types and structure
  • Optimized Logical Plan: After Catalyst transformations
  • Physical Plan: Actual execution strategy
    • Look for BroadcastHashJoin, SortMergeJoin, ShuffledHashJoin, CartesianProduct, etc.

🧠 How to View in Databricks UI

  1. Run your notebook cell (with or without .explain()).
  2. Click on the “View” β†’ “Query” link just under the result.
  3. Go to Query Details β†’ DAG β†’ Physical Plan tab.
  4. You’ll see the join strategy and whether broadcast or shuffle happened.

🚦 Pro Tip: When Hints Are Ignored

Spark may ignore hints when:

  • Broadcast size exceeds spark.sql.autoBroadcastJoinThreshold
  • Partition count is too low/high
  • Another plan is significantly cheaper

You’ll know hints were respected if you see them in .explain() output or Spark UI DAG.


βœ… Summary Table of Real Usage

Hint NameCode ExamplePlan Keyword
broadcastjoin(broadcast(df2), "key")BroadcastHashJoin
coalesce.hint("coalesce", 4)CoalescedShuffleReader
repartition.hint("repartition", 8)Repartition
shuffle_hash.hint("shuffle_hash")ShuffledHashJoin
shuffle_replicate_nl.hint("shuffle_replicate_nl") + non-equi joinCartesianProduct

Pages: 1 2 3 4 5 6 7 8