In PySpark, optimizing transformations is crucial for performance, especially when working with large datasets. Here’s a breakdown of best practices focused on broadcasting, caching, partitioning, and related Spark operations, with a focus on correct order and reasoning:


πŸ” Broadcast vs Cache: Which First?

βœ… Best Practice: Broadcast Before Cache

  • Reason: You should broadcast before caching, because Spark uses the current state of a DataFrame at the time of caching.
  • If you cache first, then broadcast later, Spark will not re-optimize with the broadcast hint unless the DataFrame is recomputed.
# βœ… Correct
small_df = spark.read.parquet("small_table").hint("broadcast")
small_df.cache()
# ❌ Incorrect
small_df = spark.read.parquet("small_table").cache()
small_df = small_df.hint("broadcast")  # Hint might not be effective now

⚑ Best Practices for Optimizing PySpark Code

πŸ”Ή 1. Use broadcast() for small lookup tables

  • Avoid large shuffles in joins
  • Ideal for dimension tables (< 10MB–100MB)
from pyspark.sql.functions import broadcast
df_joined = large_df.join(broadcast(small_df), "key")

πŸ”Ή 2. Persist/Cache only when reused

  • Use .cache() or .persist() when a DataFrame is reused across multiple stages.
  • Don’t cache everything β€” Spark stores it in memory, which could cause eviction or spills.
df.cache()  # Use only if df is used multiple times

πŸ”Ή 3. Filter Early (Pushdown Predicate)

  • Apply .filter() or .where() as early as possible to reduce data size.
  • Especially effective if the filter can be pushed down to the source (e.g., Parquet, JDBC).

πŸ”Ή 4. Avoid UDFs where possible

  • UDFs are black boxes β€” Spark can’t optimize them.
  • Prefer built-in functions (pyspark.sql.functions) or SQL expressions.

πŸ”Ή 5. Repartition Intelligently

  • Avoid full .repartition() unless necessary.
  • Use .coalesce(n) to reduce partitions.
  • Use .repartition("key") before wide transformations like joins.

πŸ”Ή 6. Use Column Pruning

  • Select only required columns early: df.select("col1", "col2")
  • Reduces serialization, shuffle, and memory usage.

πŸ”Ή 7. Avoid Narrow-to-Wide Transformations

  • Avoid code like:
df1.join(df2, "key").groupBy("key").agg(...)
  • Instead: pre-aggregate before the join if possible.

πŸ”Ή 8. Use Bucketing and Partitioning for Hive/Delta Tables

  • Helps Spark skip unnecessary data.
  • Use .bucketBy() or .partitionBy() when writing large tables.

πŸ”Ή 9. Materialize Intermediate Steps

  • For complex pipelines, materialize with cache(), persist(), or write to disk with .checkpoint() or .write.format("delta").

πŸ”Ή 10. Avoid Collect/Count on Large Data

  • These actions pull data to the driver and may cause OOM errors.
  • Use take(n) instead of collect() for sampling.

Bonus Tip: 🧠 Use Explain() to Analyze DAG

df.explain(True)
  • Use to verify if broadcast join is actually applied
  • Understand physical plan and detect unnecessary shuffles

Here’s a clean, concise mini reference table comparing cache vs persist vs broadcast vs checkpoint in PySpark:


βœ… PySpark Optimization: Comparison Table

FeaturePurposeStorage LevelTriggers Execution?Use Case
cache()Store DataFrame in memory (default)MEMORY_AND_DISK❌ No (needs action)Reuse same DataFrame multiple times, fits mostly in memory
persist()Custom storage levels (e.g., disk-only)Customizable❌ No (needs action)When memory is insufficient or network reuse is needed
broadcast()Send small DataFrame to all executorsN/Aβœ… Yes (used in join)Speed up joins with small dimension tables
checkpoint()Break lineage, save to reliable storageHDFS (or configured)❌ No (needs action)Long DAGs, avoid recomputation, recovery from faults

πŸ” Detailed Notes

🟒 cache()

  • Shortcut for .persist(StorageLevel.MEMORY_AND_DISK)
  • Good default when using the DataFrame 2+ times
  • Use after filtering or transforming to avoid redoing work

🟑 persist(StorageLevel)

from pyspark import StorageLevel
df.persist(StorageLevel.DISK_ONLY)
  • Choose memory, disk, or memory+disk+serialization
  • Use when memory is limited

πŸ”΅ broadcast()

from pyspark.sql.functions import broadcast
df_large.join(broadcast(df_small), "key")
  • Efficient for joining large DataFrame with small one
  • Avoids shuffle
  • Should be used before caching

πŸ”΄ checkpoint()

spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
df.checkpoint()
  • Used to truncate long lineage chains
  • Helps with stability and performance on iterative algorithms (e.g., Graph, ML)
  • Requires HDFS or a distributed file system

βœ… When to Use What

ScenarioUse
Reusing a DataFrame many timescache() or persist()
Small lookup table in a joinbroadcast()
Running into stage failure/lineage issuescheckpoint()
Memory-constrained environmentpersist(StorageLevel.DISK_ONLY)

Yes, absolutely! βœ… You can and should optimize writes in both vanilla PySpark and Databricks. Below are write optimization techniques, split into general PySpark (vanilla) and Databricks-specific approaches:


πŸš€ WRITE OPTIMIZATIONS IN VANILLA PYSPARK

1. πŸ”„ Repartition Before Write

  • Prevents too many small files
  • Example:
df.repartition(10).write.parquet("/output/path")

2. 🧩 Coalesce to Reduce Output Files

  • Use .coalesce(n) when writing less partitions
  • Ideal for appends or final writes
df.coalesce(1).write.mode("overwrite").csv("/output/single_file")

3. 🧱 Partition By Columns

  • Use .partitionBy() on high-cardinality columns for large tables
df.write.partitionBy("year", "country").parquet("/partitioned/output")

4. πŸ—ƒοΈ Use Efficient Formats

  • Prefer Parquet, ORC, or Delta over CSV/JSON
df.write.format("parquet").save("/parquet_path")

5. πŸ”’ Avoid Small Files Problem

  • Write with repartition() or coalesce() to avoid file explosion

⚑ ADDITIONAL WRITE OPTIMIZATIONS IN DATABRICKS

Databricks offers Delta Lake and platform-level features:


1. πŸ” optimize + ZORDER (Delta Lake only)

  • Optimizes file layout
OPTIMIZE delta.`/delta/events` ZORDER BY (event_date, user_id)

2. 🧹 Auto Compaction + Optimized Writes

  • Enable in Delta Lake to reduce small files
SET spark.databricks.delta.autoCompact.enabled = true;
SET spark.databricks.delta.optimizeWrite.enabled = true;

3. 🧯 Schema Evolution

  • Enable auto schema merge (only Delta supports it)
df.write.option("mergeSchema", "true").format("delta").mode("overwrite").save("/path")

4. βœ… Use Databricks Runtime File Writers

  • Faster than open-source Spark for writes
  • Databricks Runtime uses Photon, vectorized IO, and write optimizations under the hood

πŸ“‹ SUMMARY TABLE

TechniqueVanilla PySparkDatabricksFormat
repartition(n) / coalesce(n)βœ…βœ…All
partitionBy()βœ…βœ…All
optimize + ZORDERβŒβœ…Delta
Auto-compactionβŒβœ…Delta
Optimized WriteβŒβœ…Delta
Schema EvolutionLimitedβœ…Delta
Format: Parquet/Deltaβœ…βœ…All

Yes, absolutely! βœ… Compression is one of the most effective and often overlooked write optimizations in both vanilla PySpark and Databricks.


πŸ—œοΈ COMPRESSION OPTIONS IN PYSPARK

You can specify compression using .option("compression", "type") when writing data.


πŸ”Ή Compression Types by Format

FormatSupported CompressionDefault Compression
Parquetsnappy, gzip, brotli, lz4, zstd, uncompressedsnappy
CSVgzip, bzip2, lz4, deflate, nonenone
JSONSame as CSVnone
ORCzlib, snappy, nonezlib
DeltaUses Parquet compression (snappy by default)snappy

βœ… Example: Write CSV with GZIP

df.write.option("compression", "gzip").csv("/path/csv_gzip")

βœ… Example: Write Parquet with ZSTD

df.write.option("compression", "zstd").parquet("/path/parquet_zstd")

βœ… Set Global Compression via Spark Config

spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
spark.conf.set("spark.sql.orc.compression.codec", "snappy")

πŸ” When to Use Which Compression?

CompressionBest ForTrade-offs
snappyBalanced: speed + sizeFastest; slightly larger files
gzipMaximum compressionSlower to write/read
zstdHigh compression + speedNewer; often best overall
brotliText-heavy dataSlower; best for web/text data
noneTesting or full I/O speedLarge files; not optimized

πŸ“˜ TIP: Delta Format

When using Delta, compression is handled via Parquet, so you can still configure it globally:

spark.conf.set("spark.sql.parquet.compression.codec", "zstd")

df.write.format("delta").save("/delta/path")

πŸ“¦ Real-World Best Practices

  1. βœ… Use snappy or zstd for Parquet/Delta in most workloads.
  2. πŸ§ͺ Use gzip if file size is more important than speed.
  3. βœ… Set compression globally in Spark configs for consistency.
  4. 🚫 Don’t use compression=None in production unless required.

Pages: 1 2 3 4


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Posted in

Leave a Reply

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading