Here’s the full code from the Databricks notebook, followed by a handy Join Optimization Cheatsheet.
๐ Azure Databricks PySpark Notebook Code
๐ Broadcast Join vs Sort-Merge Join + Partitioning vs Bucketing
# โฌ 1. Setup
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast, rand
spark = SparkSession.builder.appName("JoinOptimization").getOrCreate()
# ๐น 2. Create Sample DataFrames
# Large DataFrame (10 million rows)
large_df = spark.range(0, 10_000_000).withColumn("value", rand())
# Small DataFrame (100 rows)
small_df = spark.createDataFrame([(i, f"cat_{i}") for i in range(100)], ["id", "category"])
# Rename columns for join consistency
large_df = large_df.withColumnRenamed("id", "user_id")
small_df = small_df.withColumnRenamed("id", "user_id")
# ๐ 3. Broadcast Join (Force it)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) # Disable auto broadcast
broadcast_join = large_df.join(broadcast(small_df), "user_id")
broadcast_join.explain(True)
broadcast_join.count()
# ๐ 4. Sort-Merge Join (No broadcast)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) # Ensure broadcast is off
sort_merge_join = large_df.join(small_df, "user_id")
sort_merge_join.explain(True)
sort_merge_join.count()
# ๐งฑ 5. Join Optimization: Partitioning
# Save as partitioned Parquet
large_df.write.mode("overwrite").partitionBy("user_id").parquet("/tmp/large_partitioned")
small_df.write.mode("overwrite").partitionBy("user_id").parquet("/tmp/small_partitioned")
# Read back partitioned data
df_large_p = spark.read.parquet("/tmp/large_partitioned")
df_small_p = spark.read.parquet("/tmp/small_partitioned")
# Partitioned join
partitioned_join = df_large_p.join(df_small_p, "user_id")
partitioned_join.explain(True)
partitioned_join.count()
# ๐ฆ 6. Join Optimization: Bucketing (Hive-style)
# Drop if exists
spark.sql("DROP TABLE IF EXISTS large_bucketed")
spark.sql("DROP TABLE IF EXISTS small_bucketed")
# Save as bucketed tables
large_df.write.bucketBy(8, "user_id").sortBy("user_id").saveAsTable("large_bucketed")
small_df.write.bucketBy(8, "user_id").sortBy("user_id").saveAsTable("small_bucketed")
# Read back
df_large_b = spark.table("large_bucketed")
df_small_b = spark.table("small_bucketed")
# Bucketed join
bucketed_join = df_large_b.join(df_small_b, "user_id")
bucketed_join.explain(True)
bucketed_join.count()
โ Notes on Optimization
Partitioning:
- Good for filtering (e.g., read only a single partition).
- Stored as separate folders on disk.
- Best for "read" optimization.
Bucketing:
- Optimizes joins by pre-sorting data and dividing into hash buckets.
- Avoids shuffle if both sides are bucketed on same column and count.
- Best for "join" optimization.
๐ Join Optimization Cheatsheet
Aspect | Broadcast Join | Sort-Merge Join | Partitioning | Bucketing |
---|---|---|---|---|
Trigger | Small table < threshold (10MB) | Default fallback | User-defined | User-defined |
Best For | Joining small โ large tables | Large tables on equality key | Efficient scans | Efficient joins |
Shuffle | โ No | โ Yes | โ No (for filters) | โ No (if matched buckets) |
Write Method | Regular DataFrame | Regular DataFrame | .write.partitionBy() | .write.bucketBy().sortBy().saveAsTable() |
Use in PySpark | broadcast(df) | Auto | Parquet-based | Hive-table required |
Pros | Fastest for small lookups | Handles large datasets | Reduces I/O | Joins without shuffle |
Cons | OOM risk if table is too big | Expensive shuffle | Can create many small files | Setup complexity, limited in PySpark |
Leave a Reply