repartition() and coalesce() can help resolve OOM (Out of Memory) errors in Spark β but how you use them depends on the root cause of the memory pressure.
Letβs break it down clearly.
π§ First: What causes OOM in Spark?
OOM (OutOfMemoryError) in Spark usually happens when:
- A task processes too much data (input partition is too large)
- There’s data skew: one partition has most of the data
- Too many small tasks and high task scheduling overhead
- A shuffle stage pulls in too much intermediate data
- Caching large datasets without enough executor memory
β
How repartition() and coalesce() help
| Function | Purpose | When to Use |
|---|---|---|
repartition(n) | Increases number of partitions (shuffles data) | When partitions are too large and causing tasks to run out of memory |
coalesce(n) | Decreases number of partitions (avoids shuffle) | When you have too many small partitions (inefficient) |
β
1. repartition() helps avoid OOM due to large partitions
If you’re doing something like:
large_df = spark.read.parquet("huge_data")
result = large_df.groupBy("country").agg(...)
Symptoms of OOM:
- Only a few tasks doing all the work
- Spark UI shows huge partition sizes (like 500MB, 1GB)
β Fix:
# Break data into more partitions before a wide operation
large_df = large_df.repartition(200) # or based on a column
This spreads the data across more tasks, reducing per-task memory pressure.
β
2. coalesce() helps avoid OOM from too many tasks
If your data has many small files (common in S3/HDFS) and you do:
df = spark.read.json("s3://many-small-files/")
df.count()
You may end up with 1000s of partitions, causing:
- High scheduling overhead
- Memory issues due to many simultaneous tasks
β Fix:
df = df.coalesce(50) # Reduce to fewer partitions without shuffle
This makes the job more manageable and reduces memory strain from task overhead.
β οΈ Important Tips
| Situation | Use |
|---|---|
| Data skew / large partitions | repartition(n) or repartition(col) |
| Too many small files/partitions | coalesce(n) |
| Writing to disk (reduce output files) | Use coalesce() before .write() |
Before wide shuffle (like groupBy, join) | Use repartition() to avoid shuffle spill |
π§ Example: Fixing OOM in Practice
# You're reading a 50GB dataset
df = spark.read.parquet("/data/big")
# BEFORE: Only 8 partitions (default), each 6+ GB β causes OOM
df.rdd.getNumPartitions()
# >>> 8
# FIX: Increase partitions before aggregation
df = df.repartition(100) # Now each partition ~500MB
# Proceed with join or groupBy safely
result = df.groupBy("user_id").agg(...)
π Combine both for optimal memory control
df = df.repartition(200) # For better memory use during shuffle
... # run heavy operations
df = df.coalesce(50) # Reduce output files during write
df.write.parquet("final_output/")
β Conclusion
| Problem | Fix |
|---|---|
| OOM due to large partitions | repartition(n) to split them |
| OOM due to task overhead from too many partitions | coalesce(n) to combine them |
| Shuffle-heavy stages | Use repartition(col) to distribute load evenly |
Great follow-up! Let’s compare how data is distributed in:
repartition(100)repartition('col', 40)coalesce()bucketing
Each has very different distribution logic β here’s a clear and complete explanation π
π 1. repartition(100): Random & Even Distribution (with shuffle)
- β Shuffles data randomly across 100 partitions.
- π Spark reshuffles all rows with a round-robin-like strategy.
- β This gives even-sized partitions, assuming no data skew.
df = df.repartition(100)
π Use when:
- You want to increase partitions
- Fix large partition size causing OOM
- Preparing for wide transformations (join, groupBy)
π‘ Good for memory load balancing across executors.
π― 2. repartition('col', 40): Hash Partitioning by Column (with shuffle)
- β
Spark hashes the
colvalues and assigns rows to 40 partitions. - β
Ensures all rows with same key (
col) go to the same partition.
df = df.repartition(40, "country")
π Use when:
- You’re preparing for join or groupBy on
col - You want co-location of keys for better performance
β οΈ Even distribution only if the values in col are not skewed.
- If one value appears much more than others β youβll get data skew.
β οΈ 3. coalesce(20): Skewed, No Shuffle, Merge-only
- β No shuffle.
- π It combines existing partitions by reducing them.
- Often results in uneven partition sizes.
df = df.coalesce(20)
π Use when:
- Youβre reducing number of partitions after filtering
- Before writing to avoid too many small output files
β οΈ Does NOT distribute evenly. Example:
- If you had 100 partitions with 1MB each
- Coalescing to 5: first few partitions may be 10β20MB, last few 1MB
π¦ 4. Bucketing (bucketBy): Even by Hash, but only at Write-Time
df.write.bucketBy(8, "id").sortBy("id").saveAsTable("users")
- β
Spark hashes on
id, distributes data evenly across8buckets - π Applied at write time only (needs
saveAsTable) - Later joins on bucketed column are more efficient
π Use when:
- You write Delta or Hive tables
- Want to optimize read performance (especially for joins)
β οΈ You need to:
- Save as table
- Enable bucketing
- Sometimes manually read with
spark.sql("SELECT ...")
π Summary Table
| Method | Shuffle? | Even Distribution? | Partitioning Logic | Use Case |
|---|---|---|---|---|
repartition(100) | β Yes | β Yes (random) | Round-robin/random | Balance data load across tasks |
repartition('col', 40) | β Yes | β οΈ Depends on skew | Hash-based on column | Join, groupBy optimizations |
coalesce(20) | β No | β Skewed | Merge existing partitions | Reduce partitions before .write() |
bucketBy(8, 'col') | β Yes | β Yes (if uniform) | Hash on column during write | Table-level optimization for joins/filters |
π§ Rule of Thumb
| If you need to… | Use |
|---|---|
| Balance data load before a heavy op | repartition(n) |
| Partition smartly by a key | repartition(col,n) |
| Write fewer output files | coalesce(n) |
| Optimize future reads & joins | bucketBy() |
Hereβs a short and clear solution for each common OOM cause in Spark:
β 1. Large Partition β Task processes too much data
- Fix: Use
repartition(n)to increase number of partitions and reduce per-task data size.
β 2. Data Skew β One partition has most of the data
- Fix: Use salting/skew hint or repartition by key with pre-processing.
- Ex: Add random prefix to skewed key before join.
β 3. Too Many Small Tasks β High overhead
- Fix: Use
coalesce(n)to reduce number of small partitions.
β 4. Shuffle Overload β Too much intermediate data in memory
- Fix:
- Use broadcast join for small tables
- Tune
spark.sql.shuffle.partitions - Increase
spark.memory.fractionif needed
β 5. Caching Large Dataset β Insufficient memory
- Fix:
- Use
persist(StorageLevel.DISK_ONLY)or selective caching - Unpersist unused RDDs/DataFrames with
.unpersist() - Avoid caching large intermediate joins
- Use
Hereβs your Spark Configuration Tuning Cheat Sheet π§ βοΈ
Organized by problem area, with optimal settings, defaults, and tips for solving OOM errors, slow jobs, skew, and shuffle issues.
π GENERAL PERFORMANCE
| Setting | Description | Recommendation |
|---|---|---|
spark.executor.memory | Memory per executor | Tune based on available cluster memory (e.g., 4g, 8g) |
spark.executor.cores | Cores per executor | 2β5 is typical (avoid 1 core/executor unless special case) |
spark.num.executors | Number of executors | Scale based on data size and parallelism |
spark.driver.memory | Driver memory | E.g., 2g, increase for large aggregations in driver |
spark.default.parallelism | RDD ops (not SQL) parallelism | Set to 2Γ total cores or more |
spark.sql.shuffle.partitions | Shuffle parallelism | Default is 200, reduce for small data or increase for large jobs (e.g., 400β1000) |
π₯ OOM ERROR RESOLUTION
| Problem | Config/Action | Fix |
|---|---|---|
| Large tasks | repartition(n) | Break data into more tasks |
| Many small tasks | coalesce(n) | Reduce overhead |
| Shuffle too large | spark.shuffle.compress = true spark.reducer.maxSizeInFlight | Reduce shuffle load |
| Memory spill | spark.memory.fraction = 0.6 (default), try 0.7 | More memory for execution |
| Cached data too large | spark.storage.memoryFraction (<= 0.4) | Less caching pressure |
π¦ MEMORY & STORAGE
| Setting | Description | Tips |
|---|---|---|
spark.memory.fraction | Fraction for execution + storage | Default is 0.6 |
spark.memory.storageFraction | How much of above is for storage | Default 0.5 of memory.fraction |
spark.storage.level | Caching level | Use MEMORY_AND_DISK, or DISK_ONLY to prevent OOM |
spark.memory.offHeap.enabled | Off-heap memory | Enable (true) for Tungsten, only when needed |
spark.memory.offHeap.size | Off-heap size | Must be set if above is enabled (e.g., 1g) |
π SHUFFLE OPTIMIZATION
| Setting | Description | Tuning Tips |
|---|---|---|
spark.sql.shuffle.partitions | # of partitions post-shuffle | Increase for big joins/aggregations |
spark.shuffle.compress | Compress shuffle data | true (default) β keep it on |
spark.shuffle.file.buffer | Buffer size for shuffle | Default: 32k, can increase to 64k |
spark.reducer.maxSizeInFlight | Max data in transit per reducer | Default 48MB, reduce if memory issue |
spark.shuffle.spill.compress | Compress spilled data | true β keep it on |
π JOIN OPTIMIZATION
| Problem | Config | Solution |
|---|---|---|
| Small table join | broadcast() or spark.sql.autoBroadcastJoinThreshold | Default: 10MB. Increase (e.g., 20MB) for small lookup joins |
| Skewed key join | salting, skew hint, or custom repartition | Handle hot keys manually |
π₯ SKEW HANDLING
| Scenario | Fix |
|---|---|
| One key dominates join/group | Salt keys with random prefix |
| Broadcast small side of join | Use broadcast(df) |
| Monitor partition size | Use .rdd.glom().map(len).collect() or spark.sql.files.maxPartitionBytes |
π§ͺ DEBUGGING & DIAGNOSTICS
| Setting | Description |
|---|---|
spark.eventLog.enabled = true | Enables Spark History Server logs |
spark.sql.adaptive.enabled = true | Enables AQE (Adaptive Query Execution) |
spark.sql.adaptive.skewJoin.enabled = true | Automatic skew handling (>= Spark 3.x) |
β οΈ Bonus: Safe Memory Limits
| Cluster Size | Recommended Executor Memory | Executors |
|---|---|---|
| 32 GB node | ~4gβ5g per executor | 5β6 per node |
| 64 GB node | ~8g per executor | 6β8 per node |
Leave a Reply