repartition() and coalesce() can help resolve OOM (Out of Memory) errors in Spark β€” but how you use them depends on the root cause of the memory pressure.

Let’s break it down clearly.


🧠 First: What causes OOM in Spark?

OOM (OutOfMemoryError) in Spark usually happens when:

  • A task processes too much data (input partition is too large)
  • There’s data skew: one partition has most of the data
  • Too many small tasks and high task scheduling overhead
  • A shuffle stage pulls in too much intermediate data
  • Caching large datasets without enough executor memory

βœ… How repartition() and coalesce() help

FunctionPurposeWhen to Use
repartition(n)Increases number of partitions (shuffles data)When partitions are too large and causing tasks to run out of memory
coalesce(n)Decreases number of partitions (avoids shuffle)When you have too many small partitions (inefficient)

βœ… 1. repartition() helps avoid OOM due to large partitions

If you’re doing something like:

large_df = spark.read.parquet("huge_data")
result = large_df.groupBy("country").agg(...)

Symptoms of OOM:

  • Only a few tasks doing all the work
  • Spark UI shows huge partition sizes (like 500MB, 1GB)

βœ… Fix:

# Break data into more partitions before a wide operation
large_df = large_df.repartition(200)  # or based on a column

This spreads the data across more tasks, reducing per-task memory pressure.


βœ… 2. coalesce() helps avoid OOM from too many tasks

If your data has many small files (common in S3/HDFS) and you do:

df = spark.read.json("s3://many-small-files/")
df.count()

You may end up with 1000s of partitions, causing:

  • High scheduling overhead
  • Memory issues due to many simultaneous tasks

βœ… Fix:

df = df.coalesce(50)  # Reduce to fewer partitions without shuffle

This makes the job more manageable and reduces memory strain from task overhead.


⚠️ Important Tips

SituationUse
Data skew / large partitionsrepartition(n) or repartition(col)
Too many small files/partitionscoalesce(n)
Writing to disk (reduce output files)Use coalesce() before .write()
Before wide shuffle (like groupBy, join)Use repartition() to avoid shuffle spill

πŸ”§ Example: Fixing OOM in Practice

# You're reading a 50GB dataset
df = spark.read.parquet("/data/big")

# BEFORE: Only 8 partitions (default), each 6+ GB β€” causes OOM
df.rdd.getNumPartitions()
# >>> 8

# FIX: Increase partitions before aggregation
df = df.repartition(100)  # Now each partition ~500MB

# Proceed with join or groupBy safely
result = df.groupBy("user_id").agg(...)

πŸ”„ Combine both for optimal memory control

df = df.repartition(200)  # For better memory use during shuffle
... # run heavy operations
df = df.coalesce(50)      # Reduce output files during write
df.write.parquet("final_output/")

βœ… Conclusion

ProblemFix
OOM due to large partitionsrepartition(n) to split them
OOM due to task overhead from too many partitionscoalesce(n) to combine them
Shuffle-heavy stagesUse repartition(col) to distribute load evenly

Great follow-up! Let’s compare how data is distributed in:

  • repartition(100)
  • repartition('col', 40)
  • coalesce()
  • bucketing

Each has very different distribution logic β€” here’s a clear and complete explanation πŸ‘‡


πŸ” 1. repartition(100): Random & Even Distribution (with shuffle)

  • βœ… Shuffles data randomly across 100 partitions.
  • πŸ” Spark reshuffles all rows with a round-robin-like strategy.
  • βœ… This gives even-sized partitions, assuming no data skew.
df = df.repartition(100)

πŸ“Œ Use when:

  • You want to increase partitions
  • Fix large partition size causing OOM
  • Preparing for wide transformations (join, groupBy)

πŸ’‘ Good for memory load balancing across executors.


🎯 2. repartition('col', 40): Hash Partitioning by Column (with shuffle)

  • βœ… Spark hashes the col values and assigns rows to 40 partitions.
  • βœ… Ensures all rows with same key (col) go to the same partition.
df = df.repartition(40, "country")

πŸ“Œ Use when:

  • You’re preparing for join or groupBy on col
  • You want co-location of keys for better performance

⚠️ Even distribution only if the values in col are not skewed.

  • If one value appears much more than others β†’ you’ll get data skew.

⚠️ 3. coalesce(20): Skewed, No Shuffle, Merge-only

  • ❌ No shuffle.
  • πŸ” It combines existing partitions by reducing them.
  • Often results in uneven partition sizes.
df = df.coalesce(20)

πŸ“Œ Use when:

  • You’re reducing number of partitions after filtering
  • Before writing to avoid too many small output files

⚠️ Does NOT distribute evenly. Example:

  • If you had 100 partitions with 1MB each
  • Coalescing to 5: first few partitions may be 10–20MB, last few 1MB

πŸ“¦ 4. Bucketing (bucketBy): Even by Hash, but only at Write-Time

df.write.bucketBy(8, "id").sortBy("id").saveAsTable("users")
  • βœ… Spark hashes on id, distributes data evenly across 8 buckets
  • πŸ” Applied at write time only (needs saveAsTable)
  • Later joins on bucketed column are more efficient

πŸ“Œ Use when:

  • You write Delta or Hive tables
  • Want to optimize read performance (especially for joins)

⚠️ You need to:

  • Save as table
  • Enable bucketing
  • Sometimes manually read with spark.sql("SELECT ...")

πŸ” Summary Table

MethodShuffle?Even Distribution?Partitioning LogicUse Case
repartition(100)βœ… Yesβœ… Yes (random)Round-robin/randomBalance data load across tasks
repartition('col', 40)βœ… Yes⚠️ Depends on skewHash-based on columnJoin, groupBy optimizations
coalesce(20)❌ No❌ SkewedMerge existing partitionsReduce partitions before .write()
bucketBy(8, 'col')βœ… Yesβœ… Yes (if uniform)Hash on column during writeTable-level optimization for joins/filters

πŸ”§ Rule of Thumb

If you need to…Use
Balance data load before a heavy oprepartition(n)
Partition smartly by a keyrepartition(col,n)
Write fewer output filescoalesce(n)
Optimize future reads & joinsbucketBy()

Here’s a short and clear solution for each common OOM cause in Spark:


βœ… 1. Large Partition β†’ Task processes too much data

  • Fix: Use repartition(n) to increase number of partitions and reduce per-task data size.

βœ… 2. Data Skew β†’ One partition has most of the data

  • Fix: Use salting/skew hint or repartition by key with pre-processing.
  • Ex: Add random prefix to skewed key before join.

βœ… 3. Too Many Small Tasks β†’ High overhead

  • Fix: Use coalesce(n) to reduce number of small partitions.

βœ… 4. Shuffle Overload β†’ Too much intermediate data in memory

  • Fix:
    • Use broadcast join for small tables
    • Tune spark.sql.shuffle.partitions
    • Increase spark.memory.fraction if needed

βœ… 5. Caching Large Dataset β†’ Insufficient memory

  • Fix:
    • Use persist(StorageLevel.DISK_ONLY) or selective caching
    • Unpersist unused RDDs/DataFrames with .unpersist()
    • Avoid caching large intermediate joins

Here’s your Spark Configuration Tuning Cheat Sheet πŸ§ βš™οΈ
Organized by problem area, with optimal settings, defaults, and tips for solving OOM errors, slow jobs, skew, and shuffle issues.


πŸš€ GENERAL PERFORMANCE

SettingDescriptionRecommendation
spark.executor.memoryMemory per executorTune based on available cluster memory (e.g., 4g, 8g)
spark.executor.coresCores per executor2–5 is typical (avoid 1 core/executor unless special case)
spark.num.executorsNumber of executorsScale based on data size and parallelism
spark.driver.memoryDriver memoryE.g., 2g, increase for large aggregations in driver
spark.default.parallelismRDD ops (not SQL) parallelismSet to 2Γ— total cores or more
spark.sql.shuffle.partitionsShuffle parallelismDefault is 200, reduce for small data or increase for large jobs (e.g., 400–1000)

πŸ’₯ OOM ERROR RESOLUTION

ProblemConfig/ActionFix
Large tasksrepartition(n)Break data into more tasks
Many small taskscoalesce(n)Reduce overhead
Shuffle too largespark.shuffle.compress = true spark.reducer.maxSizeInFlightReduce shuffle load
Memory spillspark.memory.fraction = 0.6 (default), try 0.7More memory for execution
Cached data too largespark.storage.memoryFraction (<= 0.4)Less caching pressure

πŸ“¦ MEMORY & STORAGE

SettingDescriptionTips
spark.memory.fractionFraction for execution + storageDefault is 0.6
spark.memory.storageFractionHow much of above is for storageDefault 0.5 of memory.fraction
spark.storage.levelCaching levelUse MEMORY_AND_DISK, or DISK_ONLY to prevent OOM
spark.memory.offHeap.enabledOff-heap memoryEnable (true) for Tungsten, only when needed
spark.memory.offHeap.sizeOff-heap sizeMust be set if above is enabled (e.g., 1g)

πŸ” SHUFFLE OPTIMIZATION

SettingDescriptionTuning Tips
spark.sql.shuffle.partitions# of partitions post-shuffleIncrease for big joins/aggregations
spark.shuffle.compressCompress shuffle datatrue (default) β€” keep it on
spark.shuffle.file.bufferBuffer size for shuffleDefault: 32k, can increase to 64k
spark.reducer.maxSizeInFlightMax data in transit per reducerDefault 48MB, reduce if memory issue
spark.shuffle.spill.compressCompress spilled datatrue β€” keep it on

πŸ“Š JOIN OPTIMIZATION

ProblemConfigSolution
Small table joinbroadcast() or spark.sql.autoBroadcastJoinThresholdDefault: 10MB. Increase (e.g., 20MB) for small lookup joins
Skewed key joinsalting, skew hint, or custom repartitionHandle hot keys manually

πŸ”₯ SKEW HANDLING

ScenarioFix
One key dominates join/groupSalt keys with random prefix
Broadcast small side of joinUse broadcast(df)
Monitor partition sizeUse .rdd.glom().map(len).collect() or spark.sql.files.maxPartitionBytes

πŸ§ͺ DEBUGGING & DIAGNOSTICS

SettingDescription
spark.eventLog.enabled = trueEnables Spark History Server logs
spark.sql.adaptive.enabled = trueEnables AQE (Adaptive Query Execution)
spark.sql.adaptive.skewJoin.enabled = trueAutomatic skew handling (>= Spark 3.x)

⚠️ Bonus: Safe Memory Limits

Cluster SizeRecommended Executor MemoryExecutors
32 GB node~4g–5g per executor5–6 per node
64 GB node~8g per executor6–8 per node

Pages: 1 2 3 4


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Posted in

Leave a Reply

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading