In PySpark, optimizing transformations is crucial for performance, especially when working with large datasets. Here’s a breakdown of best practices focused on broadcasting, caching, partitioning, and related Spark operations, with a focus on correct order and reasoning:
π Broadcast vs Cache: Which First?
β Best Practice: Broadcast Before Cache
- Reason: You should broadcast before caching, because Spark uses the current state of a DataFrame at the time of caching.
- If you cache first, then broadcast later, Spark will not re-optimize with the broadcast hint unless the DataFrame is recomputed.
# β
Correct
small_df = spark.read.parquet("small_table").hint("broadcast")
small_df.cache()
# β Incorrect
small_df = spark.read.parquet("small_table").cache()
small_df = small_df.hint("broadcast") # Hint might not be effective now
β‘ Best Practices for Optimizing PySpark Code
πΉ 1. Use broadcast()
for small lookup tables
- Avoid large shuffles in joins
- Ideal for dimension tables (< 10MBβ100MB)
from pyspark.sql.functions import broadcast
df_joined = large_df.join(broadcast(small_df), "key")
πΉ 2. Persist/Cache only when reused
- Use
.cache()
or.persist()
when a DataFrame is reused across multiple stages. - Don’t cache everything β Spark stores it in memory, which could cause eviction or spills.
df.cache() # Use only if df is used multiple times
πΉ 3. Filter Early (Pushdown Predicate)
- Apply
.filter()
or.where()
as early as possible to reduce data size. - Especially effective if the filter can be pushed down to the source (e.g., Parquet, JDBC).
πΉ 4. Avoid UDFs where possible
- UDFs are black boxes β Spark can’t optimize them.
- Prefer built-in functions (
pyspark.sql.functions
) or SQL expressions.
πΉ 5. Repartition Intelligently
- Avoid full
.repartition()
unless necessary. - Use
.coalesce(n)
to reduce partitions. - Use
.repartition("key")
before wide transformations like joins.
πΉ 6. Use Column Pruning
- Select only required columns early:
df.select("col1", "col2")
- Reduces serialization, shuffle, and memory usage.
πΉ 7. Avoid Narrow-to-Wide Transformations
- Avoid code like:
df1.join(df2, "key").groupBy("key").agg(...)
- Instead: pre-aggregate before the join if possible.
πΉ 8. Use Bucketing and Partitioning for Hive/Delta Tables
- Helps Spark skip unnecessary data.
- Use
.bucketBy()
or.partitionBy()
when writing large tables.
πΉ 9. Materialize Intermediate Steps
- For complex pipelines, materialize with
cache()
,persist()
, or write to disk with.checkpoint()
or.write.format("delta")
.
πΉ 10. Avoid Collect/Count on Large Data
- These actions pull data to the driver and may cause OOM errors.
- Use
take(n)
instead ofcollect()
for sampling.
Bonus Tip: π§ Use Explain() to Analyze DAG
df.explain(True)
- Use to verify if broadcast join is actually applied
- Understand physical plan and detect unnecessary shuffles
Here’s a clean, concise mini reference table comparing cache
vs persist
vs broadcast
vs checkpoint
in PySpark:
β PySpark Optimization: Comparison Table
Feature | Purpose | Storage Level | Triggers Execution? | Use Case |
---|---|---|---|---|
cache() | Store DataFrame in memory (default) | MEMORY_AND_DISK | β No (needs action) | Reuse same DataFrame multiple times, fits mostly in memory |
persist() | Custom storage levels (e.g., disk-only) | Customizable | β No (needs action) | When memory is insufficient or network reuse is needed |
broadcast() | Send small DataFrame to all executors | N/A | β Yes (used in join) | Speed up joins with small dimension tables |
checkpoint() | Break lineage, save to reliable storage | HDFS (or configured) | β No (needs action) | Long DAGs, avoid recomputation, recovery from faults |
π Detailed Notes
π’ cache()
- Shortcut for
.persist(StorageLevel.MEMORY_AND_DISK)
- Good default when using the DataFrame 2+ times
- Use after filtering or transforming to avoid redoing work
π‘ persist(StorageLevel)
from pyspark import StorageLevel
df.persist(StorageLevel.DISK_ONLY)
- Choose memory, disk, or memory+disk+serialization
- Use when memory is limited
π΅ broadcast()
from pyspark.sql.functions import broadcast
df_large.join(broadcast(df_small), "key")
- Efficient for joining large DataFrame with small one
- Avoids shuffle
- Should be used before caching
π΄ checkpoint()
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
df.checkpoint()
- Used to truncate long lineage chains
- Helps with stability and performance on iterative algorithms (e.g., Graph, ML)
- Requires HDFS or a distributed file system
β When to Use What
Scenario | Use |
---|---|
Reusing a DataFrame many times | cache() or persist() |
Small lookup table in a join | broadcast() |
Running into stage failure/lineage issues | checkpoint() |
Memory-constrained environment | persist(StorageLevel.DISK_ONLY) |
Yes, absolutely! β You can and should optimize writes in both vanilla PySpark and Databricks. Below are write optimization techniques, split into general PySpark (vanilla) and Databricks-specific approaches:
π WRITE OPTIMIZATIONS IN VANILLA PYSPARK
1. π Repartition Before Write
- Prevents too many small files
- Example:
df.repartition(10).write.parquet("/output/path")
2. π§© Coalesce to Reduce Output Files
- Use
.coalesce(n)
when writing less partitions - Ideal for appends or final writes
df.coalesce(1).write.mode("overwrite").csv("/output/single_file")
3. π§± Partition By Columns
- Use
.partitionBy()
on high-cardinality columns for large tables
df.write.partitionBy("year", "country").parquet("/partitioned/output")
4. ποΈ Use Efficient Formats
- Prefer Parquet, ORC, or Delta over CSV/JSON
df.write.format("parquet").save("/parquet_path")
5. π Avoid Small Files Problem
- Write with
repartition()
orcoalesce()
to avoid file explosion
β‘ ADDITIONAL WRITE OPTIMIZATIONS IN DATABRICKS
Databricks offers Delta Lake and platform-level features:
1. π optimize
+ ZORDER
(Delta Lake only)
- Optimizes file layout
OPTIMIZE delta.`/delta/events` ZORDER BY (event_date, user_id)
2. π§Ή Auto Compaction + Optimized Writes
- Enable in Delta Lake to reduce small files
SET spark.databricks.delta.autoCompact.enabled = true;
SET spark.databricks.delta.optimizeWrite.enabled = true;
3. π§― Schema Evolution
- Enable auto schema merge (only Delta supports it)
df.write.option("mergeSchema", "true").format("delta").mode("overwrite").save("/path")
4. β Use Databricks Runtime File Writers
- Faster than open-source Spark for writes
- Databricks Runtime uses Photon, vectorized IO, and write optimizations under the hood
π SUMMARY TABLE
Technique | Vanilla PySpark | Databricks | Format |
---|---|---|---|
repartition(n) / coalesce(n) | β | β | All |
partitionBy() | β | β | All |
optimize + ZORDER | β | β | Delta |
Auto-compaction | β | β | Delta |
Optimized Write | β | β | Delta |
Schema Evolution | Limited | β | Delta |
Format: Parquet/Delta | β | β | All |
Yes, absolutely! β Compression is one of the most effective and often overlooked write optimizations in both vanilla PySpark and Databricks.
ποΈ COMPRESSION OPTIONS IN PYSPARK
You can specify compression using .option("compression", "type")
when writing data.
πΉ Compression Types by Format
Format | Supported Compression | Default Compression |
---|---|---|
Parquet | snappy , gzip , brotli , lz4 , zstd , uncompressed | snappy |
CSV | gzip , bzip2 , lz4 , deflate , none | none |
JSON | Same as CSV | none |
ORC | zlib , snappy , none | zlib |
Delta | Uses Parquet compression (snappy by default) | snappy |
β Example: Write CSV with GZIP
df.write.option("compression", "gzip").csv("/path/csv_gzip")
β Example: Write Parquet with ZSTD
df.write.option("compression", "zstd").parquet("/path/parquet_zstd")
β Set Global Compression via Spark Config
spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
spark.conf.set("spark.sql.orc.compression.codec", "snappy")
π When to Use Which Compression?
Compression | Best For | Trade-offs |
---|---|---|
snappy | Balanced: speed + size | Fastest; slightly larger files |
gzip | Maximum compression | Slower to write/read |
zstd | High compression + speed | Newer; often best overall |
brotli | Text-heavy data | Slower; best for web/text data |
none | Testing or full I/O speed | Large files; not optimized |
π TIP: Delta Format
When using Delta, compression is handled via Parquet, so you can still configure it globally:
spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
df.write.format("delta").save("/delta/path")
π¦ Real-World Best Practices
- β
Use
snappy
orzstd
for Parquet/Delta in most workloads. - π§ͺ Use
gzip
if file size is more important than speed. - β Set compression globally in Spark configs for consistency.
- π« Donβt use
compression=None
in production unless required.
Leave a Reply