Great question — PySpark joins are a core interview topic, and understanding how they work, how to optimize them, and which join strategy is used by default shows your depth as a Spark developer.


✅ 1. Join Methods in PySpark

PySpark provides the following join types:

Join TypeDescription
innerOnly matching rows from both DataFrames
left / left_outerAll rows from left + matching from right
right / right_outerAll rows from right + matching from left
full / outerAll rows from both sides, with NULLs for missing matches
left_semiOnly rows from left that match in right (like SQL EXISTS)
left_antiOnly rows from left that don’t match in right (like SQL NOT EXISTS)
crossCartesian product (A × B) — dangerous on large data

✅ 2. How Joins Are Implemented in Spark (Under the Hood)

🔁 Two Primary Physical Join Strategies:

Join StrategyWhen Used
Shuffle Hash Join (SHJ)Default when both tables are large
Broadcast Hash Join (BHJ)When one side is small enough to broadcast
Sort-Merge Join (SMJ)Used for equi-joins when tables are pre-sorted or large

🧠 Execution Workflow:

  • Logical Plan → Optimized → Physical Plan
  • Spark picks join strategy based on table size & config settings

✅ 3. Default Join Strategy in PySpark

🔹 By default, Spark uses Shuffle Hash Join (SHJ) unless broadcast join conditions are met.


✅ 4. Optimizing Joins in PySpark

🔹 A. Broadcast Join (most powerful optimization)

If one table is small (<10MB), broadcast it:

from pyspark.sql.functions import broadcast

df_large.join(broadcast(df_small), "key")

💡 Benefits:

  • No shuffle
  • Linear performance
  • Low memory usage

📌 Config:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10 * 1024 * 1024)  # 10MB

🔹 B. Skew Handling

If one key has too many matching rows, use:

  • Salting: Add/mod key values to spread data
  • Enable AQE:
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)

🔹 C. Reduce Join Size with Filtering First

# Instead of joining full tables
dim = dim.filter("region = 'APAC'")
fact.join(dim, "region")

🔹 D. Partitioning Strategy

If both sides are large:

df1 = df1.repartition("join_key")
df2 = df2.repartition("join_key")
df1.join(df2, "join_key")

Ensures co-partitioning = less shuffle


🔹 E. Use Proper Join Type

  • Use left_semi if you only need to know existence.
  • Avoid crossJoin() unless intentional — it multiplies data.

✅ 5. Summary Table: Join Strategy in Spark

StrategyTriggered WhenNotes
Broadcast Hash JoinOne side < autoBroadcastJoinThresholdFastest
Shuffle Hash JoinDefault for large tablesCauses full shuffle
Sort-Merge JoinLarge sorted inputsNeeds sorting, uses CPU
Skew Join (AQE)Enabled via configReduces skew impact

📌 Interview-Ready Statement:

“By default, Spark uses Shuffle Hash Join when joining two large DataFrames. But whenever possible, I try to force a Broadcast Hash Join for small lookup tables to avoid shuffle. I also enable AQE to let Spark handle join skew, and if both sides are large, I ensure they’re repartitioned on the join key for co-location. I also pick join types carefully — like left_semi for existence checks.”


Excellent question — this goes beyond the surface and dives into how Spark internally decides join strategies, how each works at the physical level, and how optimization is applied via Catalyst and AQE.

Let’s break it down like a pro:


🧠 How Join Strategy Internally Works in Spark


✅ Step 1: Logical Plan → Optimized Logical Plan

You write:

df1.join(df2, "id")

This creates a logical plan.


✅ Step 2: Catalyst Optimizer comes in

Catalyst applies:

  • Column pruning
  • Predicate pushdown
  • Filter pushdown before join
  • Reordering joins to reduce shuffle
  • Join selection (based on hints, stats, configs)

✅ Step 3: Physical Plan Generation

Spark chooses one of three physical join strategies based on:

  • Data size estimates (via spark.sql.statistics.size.autoUpdate.enabled)
  • Config thresholds
  • Join key types (equi vs non-equi)

⚙️ Join Strategies (Internally)


🔁 1. Broadcast Hash Join (BHJ)

📦 When: One side is small (less than spark.sql.autoBroadcastJoinThreshold)

📈 Internally:

  • Small table is collected and broadcast to all executors
  • Each executor does in-memory hash join with its partition of the big table

🧠 Zero shuffle
💡 Best for: small dimension tables (lookup/join)


🔄 2. Shuffle Hash Join (SHJ) – Default

📦 When: Both tables are large, and no sort exists

📈 Internally:

  • Both tables are shuffled by join key
  • Each executor builds a hash map for one side
  • Probes the other side

🧠 Shuffling cost + hash memory usage
🛠️ Use when: tables are unsorted and mid-sized


🔀 3. Sort Merge Join (SMJ)

📦 When: Both tables are large and already sorted or sorting is cheaper

📈 Internally:

  • Spark sorts both tables by join key
  • Executes merge-join by iterating over sorted records

🧠 Higher CPU cost for sorting
💡 Best for: large equi-joins with sorted input


🔥 4. Skew Join (via AQE)

📦 When: One join key is heavily skewed

📈 Internally:

  • AQE detects large partitions at runtime
  • Spark splits skewed partitions into multiple subtasks
  • Uses partial joins to balance the load

🧠 Enabled via:

spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)

🛠️ How Spark Optimizes Join Internally

OptimizationMechanism
CatalystLogical optimization: reorders filters, joins
Cost-Based Optimization (CBO)Uses size/stats to pick better join order/strategy
Broadcast HintYou can force broadcast: df1.join(broadcast(df2), "id")
AQE (Adaptive Query Execution)Changes join strategy at runtime (e.g., shuffle → broadcast)
Join ReorderingIf multiple joins, Spark rearranges them to reduce cost

✅ How AQE Improves Joins

spark.conf.set("spark.sql.adaptive.enabled", True)

🎯 AQE uses runtime statistics to:

  • Change join strategy (e.g., shuffle → broadcast)
  • Adjust number of shuffle partitions
  • Handle skew joins by splitting heavy partitions

🔍 Real Example: Decision Flow

You write:

df_big.join(df_small, "id")

What happens:

  1. Spark checks: Is df_small < 10MB?
    • ✅ Yes → broadcast join
  2. ❌ No → are inputs sorted?
    • ✅ Yes → sort-merge join
  3. ❌ No → fallback to shuffle hash join

🧠 Interview-Ready Summary

“Spark uses Catalyst and CBO to decide join strategies at the physical plan stage. If one table is small, Spark uses a broadcast join. If not, it defaults to shuffle hash join unless the data is sorted, in which case it uses sort-merge join. With AQE enabled, Spark can dynamically switch join types at runtime based on actual data — including handling skew by splitting heavy partitions. We optimize joins by broadcasting lookup tables, filtering early, partitioning properly, and enabling AQE.”


Great — let’s deep-dive into how hash join works internally in Spark, especially in the context of Broadcast Hash Join and Shuffle Hash Join. You’re asking exactly the kind of question senior engineers and interviewers appreciate: “How does the executor actually process the join step-by-step?”


🔍 What is a Hash Join in Spark?

A hash join is a physical join strategy where one side (called the build side) is loaded into memory as a hash map, and the other side (called the probe side) is scanned to find matches.

There are two common hash join variants in Spark:

Join TypeHash Map Built OnHow Probe Happens
Broadcast Hash JoinSmall table (broadcasted)Each executor probes it with its partition of big table
Shuffle Hash JoinOne shuffled side (often right)Each executor builds and probes locally

🧠 Internal Step-by-Step: How Hash Join Works

Let’s say you’re doing:

df_large.join(broadcast(df_small), "id")

Assume:

  • df_large: 1 billion rows
  • df_small: 10,000 rows

⚙️ Step-by-Step: Broadcast Hash Join (BHJ)

  1. Driver Side:
    • Collects df_small into memory (as local list of rows)
    • Broadcasts it to all executors
  2. Executor Side (parallel for each partition of df_large):
    • Receives the broadcasted small table
    • Builds a hash map from small table: broadcast_map = { "key1": row1, "key2": row2, ... }
    • Iterates over its own partition of df_large
      • For each row in df_large:
        • Look up matching id in broadcast_map
        • If match: emit joined row
  3. Result:
    • All executors emit joined rows in parallel, no shuffle involved

🧠 It’s super-fast, but only if the broadcasted table fits in memory (~10MB by default).


⚙️ Step-by-Step: Shuffle Hash Join (SHJ)

Used when both tables are large and broadcasting is not possible

  1. Shuffle Phase:
    • Both df1 and df2 are shuffled by the join key
    • Ensures same keys from both sides land in the same partition
  2. Executor Side (each gets one partition of df1 and df2):
    • Spark picks one side (usually right) to build the hash map
    • Scans left side to probe the hash map
    • For each row on the probe side:
      • Looks up key in hash map
      • If match: emit joined row
  3. Build Phase: hash_map = {} for row in df2_partition: key = row['id'] hash_map[key] = row
  4. Probe Phase: for row in df1_partition: key = row['id'] if key in hash_map: yield joined_row

🧠 It’s scalable but causes shuffle overhead (disk & network IO).


🎯 Build Side vs Probe Side — Key Decision

RoleSize CriteriaWhy Important
Build SideSmaller tableMust fit in memory for fast hash map
Probe SideLarger tableStreamed and compared

💡 You can control build side with join hints:

df1.join(df2.hint("broadcast"), "id")

⚠️ What If Hash Map is Too Large?

  • For broadcast joins: Out-of-memory error (cannot broadcast)
  • For shuffle hash join: Spark may fallback to sort-merge join, or spill to disk (expensive)

✅ Summary Table

Join TypeBuild SideMemory UseShuffle?Best For
Broadcast Hash JoinBroadcasted table (small)In-memory hash mapLookup/dim tables
Shuffle Hash JoinOne of shuffled tablesLocal hash mapMid-size tables
Sort-Merge JoinBoth sidesSorted iteratorsLarge equi-joins

💬 Interview-Ready Answer:

“In a hash join, Spark picks one side (usually the smaller) as the build side and loads it into memory as a hash map. The other side is the probe side that’s scanned row-by-row to find matching keys. In broadcast hash joins, this hash map is broadcast to all executors. In shuffle hash joins, Spark shuffles both sides and builds local hash maps on each executor for that partition. This is efficient, but if the build side is too big, Spark either spills to disk or switches to sort-merge join.”


Pages: 1 2


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Posted in

Leave a Reply

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading