Here’s the full code from the Databricks notebook, followed by a handy Join Optimization Cheatsheet.


๐Ÿ““ Azure Databricks PySpark Notebook Code

๐Ÿ”— Broadcast Join vs Sort-Merge Join + Partitioning vs Bucketing

# โฌ› 1. Setup
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast, rand

spark = SparkSession.builder.appName("JoinOptimization").getOrCreate()

# ๐Ÿ”น 2. Create Sample DataFrames

# Large DataFrame (10 million rows)
large_df = spark.range(0, 10_000_000).withColumn("value", rand())

# Small DataFrame (100 rows)
small_df = spark.createDataFrame([(i, f"cat_{i}") for i in range(100)], ["id", "category"])

# Rename columns for join consistency
large_df = large_df.withColumnRenamed("id", "user_id")
small_df = small_df.withColumnRenamed("id", "user_id")

# ๐Ÿš€ 3. Broadcast Join (Force it)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)  # Disable auto broadcast
broadcast_join = large_df.join(broadcast(small_df), "user_id")

broadcast_join.explain(True)
broadcast_join.count()

# ๐ŸŒ€ 4. Sort-Merge Join (No broadcast)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)  # Ensure broadcast is off
sort_merge_join = large_df.join(small_df, "user_id")

sort_merge_join.explain(True)
sort_merge_join.count()

# ๐Ÿงฑ 5. Join Optimization: Partitioning

# Save as partitioned Parquet
large_df.write.mode("overwrite").partitionBy("user_id").parquet("/tmp/large_partitioned")
small_df.write.mode("overwrite").partitionBy("user_id").parquet("/tmp/small_partitioned")

# Read back partitioned data
df_large_p = spark.read.parquet("/tmp/large_partitioned")
df_small_p = spark.read.parquet("/tmp/small_partitioned")

# Partitioned join
partitioned_join = df_large_p.join(df_small_p, "user_id")
partitioned_join.explain(True)
partitioned_join.count()

# ๐Ÿ“ฆ 6. Join Optimization: Bucketing (Hive-style)

# Drop if exists
spark.sql("DROP TABLE IF EXISTS large_bucketed")
spark.sql("DROP TABLE IF EXISTS small_bucketed")

# Save as bucketed tables
large_df.write.bucketBy(8, "user_id").sortBy("user_id").saveAsTable("large_bucketed")
small_df.write.bucketBy(8, "user_id").sortBy("user_id").saveAsTable("small_bucketed")

# Read back
df_large_b = spark.table("large_bucketed")
df_small_b = spark.table("small_bucketed")

# Bucketed join
bucketed_join = df_large_b.join(df_small_b, "user_id")
bucketed_join.explain(True)
bucketed_join.count()

โœ… Notes on Optimization

Partitioning:
- Good for filtering (e.g., read only a single partition).
- Stored as separate folders on disk.
- Best for "read" optimization.

Bucketing:
- Optimizes joins by pre-sorting data and dividing into hash buckets.
- Avoids shuffle if both sides are bucketed on same column and count.
- Best for "join" optimization.

๐Ÿ“˜ Join Optimization Cheatsheet

AspectBroadcast JoinSort-Merge JoinPartitioningBucketing
TriggerSmall table < threshold (10MB)Default fallbackUser-definedUser-defined
Best ForJoining small โ†’ large tablesLarge tables on equality keyEfficient scansEfficient joins
ShuffleโŒ Noโœ… YesโŒ No (for filters)โŒ No (if matched buckets)
Write MethodRegular DataFrameRegular DataFrame.write.partitionBy().write.bucketBy().sortBy().saveAsTable()
Use in PySparkbroadcast(df)AutoParquet-basedHive-table required
ProsFastest for small lookupsHandles large datasetsReduces I/OJoins without shuffle
ConsOOM risk if table is too bigExpensive shuffleCan create many small filesSetup complexity, limited in PySpark

Pages: 1 2 3 4 5 6 7


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Posted in

Leave a Reply

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading