Pyspark Wholesome Tutorial- Links to refer, PDfs


βœ… Step 8: Final Result Collection or Write

  • If .collect(): data is pulled to the driver
  • If .write(): data is written from executors to storage (HDFS, S3, DB)

Here’s a deep dive into Step 8: Final Result Collection or Write in Apache Spark, covering:

  • What happens when results are collected or written
  • Differences in Databricks vs. on-prem BDL (Big Data Lake) Spark
  • Memory, network, disk behavior
  • Failure scenarios and performance tips

OperationDescription
.collect()Brings result to Driver node memory
.write()Writes result from Executors to external sink (e.g. file, DB)

⚠️ .collect() – Result Collection to Driver

πŸ” What happens:

  1. Spark triggers a job.
  2. Tasks run on executors.
  3. Results are pulled back to the driver node via network.
  4. Returned as a list/array in local driver memory.
result = df.filter(...).collect()

πŸ”₯ Internals:

  • Uses network (not HDFS) to transfer data from executors.
  • If result size exceeds driver.memory β†’ πŸ’₯ OutOfMemoryError
  • Can overwhelm driver if data > few MBs.

βœ… Use when:

  • You need small datasets (like < 100K rows).
  • For debugging, testing, or local export.

❌ Avoid when:

  • Result is large (millions of rows or >1GB).
  • You are working in a distributed compute flow.

βš™οΈ Common Configs:

Config NameDescription
spark.driver.memoryMemory available to hold .collect()
spark.driver.maxResultSizeMax result size allowed (default: 1G)
--conf spark.driver.maxResultSize=2g

βœ… .write() – Write Results to External Storage

This is the most production-safe way to handle large outputs.

df.write.mode("overwrite").parquet("s3://my-bucket/output/")

πŸ” What happens:

  1. Data is partitioned across executors.
  2. Each task writes its portion of data:
    • HDFS / DBFS / S3 / ADLS
    • JDBC / Hive / Delta Lake
  3. Metadata (schema, partition info) is optionally written.

✳️ Output Paths and Formats

FormatFile TypesParallel WriteSplittableTypical Use
Parquet.parquetβœ…βœ…Analytics, ML
ORC.orcβœ…βœ…Hive + compression
Delta Lake.parquet + txnβœ…βœ…ACID tables in Databricks
CSV.csvβœ…βŒRaw exports
JDBC❌❌One row at a time

βœ… Behavior in Databricks vs. On-Prem BDL

🟦 In Databricks

  • Uses DBFS or ADLS/S3 paths: df.write.format("delta").save("/mnt/datalake/output/")
  • Uses Delta Lake format by default for tables
  • Can write to:
    • Managed/External Delta Tables
    • Unity Catalog tables
    • SQL Warehouses

Example:

df.write.format("delta").mode("overwrite").saveAsTable("sales_summary")

Benefits:

  • Auto-commits via Delta Transaction Log
  • Can use VACUUM, OPTIMIZE, ZORDER post-write
  • Fully scalable + trackable via job UI

🟩 In On-Prem BDL or Hadoop Cluster

  • Uses HDFS or NFS-mounted output paths: df.write.format("orc").save("/data/warehouse/output/")
  • Needs correct permissions + HDFS quotas
  • More manual cleanup and checkpointing

🧠 Behind the Scenes: Write Path

  1. Executor tasks write files in parallel (1 file per partition).
  2. Temporary files written in _temporary directory.
  3. Upon success:
    • _SUCCESS marker file is added
    • Temp files renamed and committed
  4. Driver finalizes metadata (for Hive/Delta)

⚠️ Failure Points

Point of FailureWhat HappensFix
Driver crash during collectJob failsAvoid .collect() on large data
Network timeoutFile write stallsUse retries or increase spark.network.timeout
Partition skewSome partitions take too longUse repartition() or salting
Too many small filesMetadata overhead in storageUse coalesce() or repartition()
Overwrite in useData loss if misusedUse mode("append") for incremental writes

βœ… Best Practices

GoalRecommendation
Small sample to localdf.limit(100).collect()
Full export to disk/S3/HDFS.write.format(...).save(...)
Reduce number of filesdf.coalesce(1) or repartition(n)
Save with schema & ACIDUse Delta Lake format in Databricks
SQL insertdf.write.insertInto("table") or saveAsTable
Avoid driver memory issuesNEVER .collect() large DataFrames

πŸ§ͺ Example: Complete Script in Databricks

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WriteExample").getOrCreate()

df = spark.read.option("header", True).csv("/mnt/raw/sales.csv")

df_clean = df.filter("amount > 1000").withColumnRenamed("amount", "total")

# Write as a Delta Table
df_clean.write.format("delta").mode("overwrite").saveAsTable("sales_summary")

# Optional: Optimize
spark.sql("OPTIMIZE sales_summary ZORDER BY (region)")

πŸ§ͺ Example: BDL On-Prem Script (HDFS Output)

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("OnPremWrite") \
    .getOrCreate()

df = spark.read.json("hdfs:///data/in/orders")

df2 = df.select("order_id", "user_id", "amount")

# Repartition to reduce small files
df2.repartition(4).write \
    .mode("overwrite") \
    .parquet("hdfs:///warehouse/processed/orders/")


βœ… Step 9: Job/Stage/Task Completion

Spark shows the progress in:

  • Spark UI (Driver web UI) – available at http://<driver-host>:4040
  • Databricks: Task DAG + Job metrics
  • Includes: Stages, Tasks, Shuffle Reads/Writes, GC Time, Input Size, etc.

βœ… Step 10: Cleanup

  • Driver process exits
  • Executors shut down
  • Cluster resources are released

🧠 Diagram: Spark Architecture (Simplified)

+---------------------+     submits job to     +----------------------+
| Your Spark Program  |----------------------->| Spark Driver         |
+---------------------+                        +----------------------+
                                                      |
              +---------------------------------------+
              |
              v
     +--------------------+
     |  DAG Scheduler     |    β†’ divides into stages
     +--------------------+
              |
              v
     +--------------------+     β†’ tasks created per partition
     | Task Scheduler     |
     +--------------------+
              |
              v
+--------------------+  +--------------------+   +--------------------+
|  Executor on Node1 |  | Executor on Node2  |   | Executor on NodeN  |
+--------------------+  +--------------------+   +--------------------+

βœ… Example: Real Job Flow

df = spark.read.csv("data.csv")          # Stage 1: File read
df2 = df.filter("value > 100")           # Still Stage 1 (narrow)
df3 = df2.groupBy("category").count()    # Stage 2 (wide, shuffle)
df3.write.parquet("out/")                # Stage 3: File write

πŸ” Monitoring Spark Execution

  • Driver UI (port 4040): Stages, DAG, Storage, Executors
  • Databricks: β€œView” β†’ β€œJob/Task Graph”
  • Use .explain() on DataFrames to inspect the physical plan.

⚠️ Spark Optimization Tips

TaskRecommendation
Shuffle joinsUse broadcast() for small table
Partition skewUse salting or repartition()
Memory managementAvoid .collect() on large datasets
Resource allocationTune executor memory & cores
Caching reused datasetsUse .cache() or .persist()


Pages: 1 2 3 4 5 6 7 8