repartition()
and coalesce()
can help resolve OOM (Out of Memory) errors in Spark β but how you use them depends on the root cause of the memory pressure.
Letβs break it down clearly.
π§ First: What causes OOM in Spark?
OOM (OutOfMemoryError) in Spark usually happens when:
- A task processes too much data (input partition is too large)
- There’s data skew: one partition has most of the data
- Too many small tasks and high task scheduling overhead
- A shuffle stage pulls in too much intermediate data
- Caching large datasets without enough executor memory
β
How repartition()
and coalesce()
help
Function | Purpose | When to Use |
---|---|---|
repartition(n) | Increases number of partitions (shuffles data) | When partitions are too large and causing tasks to run out of memory |
coalesce(n) | Decreases number of partitions (avoids shuffle) | When you have too many small partitions (inefficient) |
β
1. repartition()
helps avoid OOM due to large partitions
If you’re doing something like:
large_df = spark.read.parquet("huge_data")
result = large_df.groupBy("country").agg(...)
Symptoms of OOM:
- Only a few tasks doing all the work
- Spark UI shows huge partition sizes (like 500MB, 1GB)
β Fix:
# Break data into more partitions before a wide operation
large_df = large_df.repartition(200) # or based on a column
This spreads the data across more tasks, reducing per-task memory pressure.
β
2. coalesce()
helps avoid OOM from too many tasks
If your data has many small files (common in S3/HDFS) and you do:
df = spark.read.json("s3://many-small-files/")
df.count()
You may end up with 1000s of partitions, causing:
- High scheduling overhead
- Memory issues due to many simultaneous tasks
β Fix:
df = df.coalesce(50) # Reduce to fewer partitions without shuffle
This makes the job more manageable and reduces memory strain from task overhead.
β οΈ Important Tips
Situation | Use |
---|---|
Data skew / large partitions | repartition(n) or repartition(col) |
Too many small files/partitions | coalesce(n) |
Writing to disk (reduce output files) | Use coalesce() before .write() |
Before wide shuffle (like groupBy , join ) | Use repartition() to avoid shuffle spill |
π§ Example: Fixing OOM in Practice
# You're reading a 50GB dataset
df = spark.read.parquet("/data/big")
# BEFORE: Only 8 partitions (default), each 6+ GB β causes OOM
df.rdd.getNumPartitions()
# >>> 8
# FIX: Increase partitions before aggregation
df = df.repartition(100) # Now each partition ~500MB
# Proceed with join or groupBy safely
result = df.groupBy("user_id").agg(...)
π Combine both for optimal memory control
df = df.repartition(200) # For better memory use during shuffle
... # run heavy operations
df = df.coalesce(50) # Reduce output files during write
df.write.parquet("final_output/")
β Conclusion
Problem | Fix |
---|---|
OOM due to large partitions | repartition(n) to split them |
OOM due to task overhead from too many partitions | coalesce(n) to combine them |
Shuffle-heavy stages | Use repartition(col) to distribute load evenly |
Great follow-up! Let’s compare how data is distributed in:
repartition(100)
repartition('col', 40)
coalesce()
bucketing
Each has very different distribution logic β here’s a clear and complete explanation π
π 1. repartition(100)
: Random & Even Distribution (with shuffle)
- β Shuffles data randomly across 100 partitions.
- π Spark reshuffles all rows with a round-robin-like strategy.
- β This gives even-sized partitions, assuming no data skew.
df = df.repartition(100)
π Use when:
- You want to increase partitions
- Fix large partition size causing OOM
- Preparing for wide transformations (join, groupBy)
π‘ Good for memory load balancing across executors.
π― 2. repartition('col', 40)
: Hash Partitioning by Column (with shuffle)
- β
Spark hashes the
col
values and assigns rows to 40 partitions. - β
Ensures all rows with same key (
col
) go to the same partition.
df = df.repartition(40, "country")
π Use when:
- You’re preparing for join or groupBy on
col
- You want co-location of keys for better performance
β οΈ Even distribution only if the values in col
are not skewed.
- If one value appears much more than others β youβll get data skew.
β οΈ 3. coalesce(20)
: Skewed, No Shuffle, Merge-only
- β No shuffle.
- π It combines existing partitions by reducing them.
- Often results in uneven partition sizes.
df = df.coalesce(20)
π Use when:
- Youβre reducing number of partitions after filtering
- Before writing to avoid too many small output files
β οΈ Does NOT distribute evenly. Example:
- If you had 100 partitions with 1MB each
- Coalescing to 5: first few partitions may be 10β20MB, last few 1MB
π¦ 4. Bucketing (bucketBy
): Even by Hash, but only at Write-Time
df.write.bucketBy(8, "id").sortBy("id").saveAsTable("users")
- β
Spark hashes on
id
, distributes data evenly across8
buckets - π Applied at write time only (needs
saveAsTable
) - Later joins on bucketed column are more efficient
π Use when:
- You write Delta or Hive tables
- Want to optimize read performance (especially for joins)
β οΈ You need to:
- Save as table
- Enable bucketing
- Sometimes manually read with
spark.sql("SELECT ...")
π Summary Table
Method | Shuffle? | Even Distribution? | Partitioning Logic | Use Case |
---|---|---|---|---|
repartition(100) | β Yes | β Yes (random) | Round-robin/random | Balance data load across tasks |
repartition('col', 40) | β Yes | β οΈ Depends on skew | Hash-based on column | Join, groupBy optimizations |
coalesce(20) | β No | β Skewed | Merge existing partitions | Reduce partitions before .write() |
bucketBy(8, 'col') | β Yes | β Yes (if uniform) | Hash on column during write | Table-level optimization for joins/filters |
π§ Rule of Thumb
If you need to… | Use |
---|---|
Balance data load before a heavy op | repartition(n) |
Partition smartly by a key | repartition(col,n) |
Write fewer output files | coalesce(n) |
Optimize future reads & joins | bucketBy() |
Hereβs a short and clear solution for each common OOM cause in Spark:
β 1. Large Partition β Task processes too much data
- Fix: Use
repartition(n)
to increase number of partitions and reduce per-task data size.
β 2. Data Skew β One partition has most of the data
- Fix: Use salting/skew hint or repartition by key with pre-processing.
- Ex: Add random prefix to skewed key before join.
β 3. Too Many Small Tasks β High overhead
- Fix: Use
coalesce(n)
to reduce number of small partitions.
β 4. Shuffle Overload β Too much intermediate data in memory
- Fix:
- Use broadcast join for small tables
- Tune
spark.sql.shuffle.partitions
- Increase
spark.memory.fraction
if needed
β 5. Caching Large Dataset β Insufficient memory
- Fix:
- Use
persist(StorageLevel.DISK_ONLY)
or selective caching - Unpersist unused RDDs/DataFrames with
.unpersist()
- Avoid caching large intermediate joins
- Use
Hereβs your Spark Configuration Tuning Cheat Sheet π§ βοΈ
Organized by problem area, with optimal settings, defaults, and tips for solving OOM errors, slow jobs, skew, and shuffle issues.
π GENERAL PERFORMANCE
Setting | Description | Recommendation |
---|---|---|
spark.executor.memory | Memory per executor | Tune based on available cluster memory (e.g., 4g , 8g ) |
spark.executor.cores | Cores per executor | 2β5 is typical (avoid 1 core/executor unless special case) |
spark.num.executors | Number of executors | Scale based on data size and parallelism |
spark.driver.memory | Driver memory | E.g., 2g , increase for large aggregations in driver |
spark.default.parallelism | RDD ops (not SQL) parallelism | Set to 2Γ total cores or more |
spark.sql.shuffle.partitions | Shuffle parallelism | Default is 200, reduce for small data or increase for large jobs (e.g., 400β1000) |
π₯ OOM ERROR RESOLUTION
Problem | Config/Action | Fix |
---|---|---|
Large tasks | repartition(n) | Break data into more tasks |
Many small tasks | coalesce(n) | Reduce overhead |
Shuffle too large | spark.shuffle.compress = true spark.reducer.maxSizeInFlight | Reduce shuffle load |
Memory spill | spark.memory.fraction = 0.6 (default), try 0.7 | More memory for execution |
Cached data too large | spark.storage.memoryFraction (<= 0.4) | Less caching pressure |
π¦ MEMORY & STORAGE
Setting | Description | Tips |
---|---|---|
spark.memory.fraction | Fraction for execution + storage | Default is 0.6 |
spark.memory.storageFraction | How much of above is for storage | Default 0.5 of memory.fraction |
spark.storage.level | Caching level | Use MEMORY_AND_DISK , or DISK_ONLY to prevent OOM |
spark.memory.offHeap.enabled | Off-heap memory | Enable (true ) for Tungsten, only when needed |
spark.memory.offHeap.size | Off-heap size | Must be set if above is enabled (e.g., 1g ) |
π SHUFFLE OPTIMIZATION
Setting | Description | Tuning Tips |
---|---|---|
spark.sql.shuffle.partitions | # of partitions post-shuffle | Increase for big joins/aggregations |
spark.shuffle.compress | Compress shuffle data | true (default) β keep it on |
spark.shuffle.file.buffer | Buffer size for shuffle | Default: 32k , can increase to 64k |
spark.reducer.maxSizeInFlight | Max data in transit per reducer | Default 48MB , reduce if memory issue |
spark.shuffle.spill.compress | Compress spilled data | true β keep it on |
π JOIN OPTIMIZATION
Problem | Config | Solution |
---|---|---|
Small table join | broadcast() or spark.sql.autoBroadcastJoinThreshold | Default: 10MB. Increase (e.g., 20MB ) for small lookup joins |
Skewed key join | salting , skew hint , or custom repartition | Handle hot keys manually |
π₯ SKEW HANDLING
Scenario | Fix |
---|---|
One key dominates join/group | Salt keys with random prefix |
Broadcast small side of join | Use broadcast(df) |
Monitor partition size | Use .rdd.glom().map(len).collect() or spark.sql.files.maxPartitionBytes |
π§ͺ DEBUGGING & DIAGNOSTICS
Setting | Description |
---|---|
spark.eventLog.enabled = true | Enables Spark History Server logs |
spark.sql.adaptive.enabled = true | Enables AQE (Adaptive Query Execution) |
spark.sql.adaptive.skewJoin.enabled = true | Automatic skew handling (>= Spark 3.x) |
β οΈ Bonus: Safe Memory Limits
Cluster Size | Recommended Executor Memory | Executors |
---|---|---|
32 GB node | ~4gβ5g per executor | 5β6 per node |
64 GB node | ~8g per executor | 6β8 per node |
Leave a Reply