Here’s a quick and clear guide on how to write data in Spark using DataFrame API, including different formats, modes, and use cases.


βœ… Basic Syntax

df.write.format("format_type").option("key", "value").save("path_or_table")

πŸ’Ύ Common Data Write Formats

Format.format() stringExample Path
Parquet"parquet""s3://bucket/folder" or "/tmp/output"
CSV"csv""file:/tmp/csv_output"
JSON"json""dbfs:/data/json"
Delta"delta""dbfs:/delta/events"
ORC"orc""hdfs:///output"
Table.saveAsTable()Hive or Metastore table

✍️ Write Modes

ModeDescription
"overwrite"Replace existing data
"append"Add to existing data
"ignore"Skip write if data exists
"error" / "errorifexists"Default; fails if path/table exists
df.write.mode("overwrite").format("parquet").save("/path/output")

πŸ”Ή Examples

1. Write as Parquet

df.write.mode("overwrite").parquet("/path/to/parquet_output")

2. Write as CSV with Header

df.write.option("header", True).csv("/path/to/csv_output")

3. Write as Delta Table

df.write.format("delta").mode("overwrite").save("/delta/events")

4. Save as Hive Table (using metastore)

df.write.mode("overwrite").saveAsTable("db_name.table_name")

5. Partitioned Write

df.write.partitionBy("country", "year").mode("overwrite").parquet("/path/partitioned_output")

πŸš€ Advanced Options

βœ… Compression

df.write.option("compression", "gzip").csv("/path")

βœ… Save to JDBC

df.write \
  .format("jdbc") \
  .option("url", "jdbc:mysql://host/db") \
  .option("dbtable", "table_name") \
  .option("user", "username") \
  .option("password", "pass") \
  .save()

🧠 Best Practices

TipWhy It’s Useful
Use Parquet or DeltaEfficient, supports schema, compression
Use partitionBy on large dataHelps pruning & performance
Use coalesce/repartition before writeControl number of output files
Avoid small filesGroup partitions or use .repartition(n)
df.repartition(4).write.mode("overwrite").parquet("/optimized/output")

Yes, absolutely! βœ… You can and should optimize writes in both vanilla PySpark and Databricks. Below are write optimization techniques, split into general PySpark (vanilla) and Databricks-specific approaches:


πŸš€ WRITE OPTIMIZATIONS IN VANILLA PYSPARK

1. πŸ”„ Repartition Before Write

  • Prevents too many small files
  • Example:
df.repartition(10).write.parquet("/output/path")

2. 🧩 Coalesce to Reduce Output Files

  • Use .coalesce(n) when writing less partitions
  • Ideal for appends or final writes
df.coalesce(1).write.mode("overwrite").csv("/output/single_file")

3. 🧱 Partition By Columns

  • Use .partitionBy() on high-cardinality columns for large tables
df.write.partitionBy("year", "country").parquet("/partitioned/output")

4. πŸ—ƒοΈ Use Efficient Formats

  • Prefer Parquet, ORC, or Delta over CSV/JSON
df.write.format("parquet").save("/parquet_path")

5. πŸ”’ Avoid Small Files Problem

  • Write with repartition() or coalesce() to avoid file explosion

⚑ ADDITIONAL WRITE OPTIMIZATIONS IN DATABRICKS

Databricks offers Delta Lake and platform-level features:


1. πŸ” optimize + ZORDER (Delta Lake only)

  • Optimizes file layout
OPTIMIZE delta.`/delta/events` ZORDER BY (event_date, user_id)

2. 🧹 Auto Compaction + Optimized Writes

  • Enable in Delta Lake to reduce small files
SET spark.databricks.delta.autoCompact.enabled = true;
SET spark.databricks.delta.optimizeWrite.enabled = true;

3. 🧯 Schema Evolution

  • Enable auto schema merge (only Delta supports it)
df.write.option("mergeSchema", "true").format("delta").mode("overwrite").save("/path")

4. βœ… Use Databricks Runtime File Writers

  • Faster than open-source Spark for writes
  • Databricks Runtime uses Photon, vectorized IO, and write optimizations under the hood

πŸ“‹ SUMMARY TABLE

TechniqueVanilla PySparkDatabricksFormat
repartition(n) / coalesce(n)βœ…βœ…All
partitionBy()βœ…βœ…All
optimize + ZORDERβŒβœ…Delta
Auto-compactionβŒβœ…Delta
Optimized WriteβŒβœ…Delta
Schema EvolutionLimitedβœ…Delta
Format: Parquet/Deltaβœ…βœ…All

Yes, absolutely! βœ… Compression is one of the most effective and often overlooked write optimizations in both vanilla PySpark and Databricks.


πŸ—œοΈ COMPRESSION OPTIONS IN PYSPARK

You can specify compression using .option("compression", "type") when writing data.


πŸ”Ή Compression Types by Format

FormatSupported CompressionDefault Compression
Parquetsnappy, gzip, brotli, lz4, zstd, uncompressedsnappy
CSVgzip, bzip2, lz4, deflate, nonenone
JSONSame as CSVnone
ORCzlib, snappy, nonezlib
DeltaUses Parquet compression (snappy by default)snappy

βœ… Example: Write CSV with GZIP

df.write.option("compression", "gzip").csv("/path/csv_gzip")

βœ… Example: Write Parquet with ZSTD

df.write.option("compression", "zstd").parquet("/path/parquet_zstd")

βœ… Set Global Compression via Spark Config

spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
spark.conf.set("spark.sql.orc.compression.codec", "snappy")

πŸ” When to Use Which Compression?

CompressionBest ForTrade-offs
snappyBalanced: speed + sizeFastest; slightly larger files
gzipMaximum compressionSlower to write/read
zstdHigh compression + speedNewer; often best overall
brotliText-heavy dataSlower; best for web/text data
noneTesting or full I/O speedLarge files; not optimized

πŸ“˜ TIP: Delta Format

When using Delta, compression is handled via Parquet, so you can still configure it globally:

spark.conf.set("spark.sql.parquet.compression.codec", "zstd")

df.write.format("delta").save("/delta/path")

πŸ“¦ Real-World Best Practices

  1. βœ… Use snappy or zstd for Parquet/Delta in most workloads.
  2. πŸ§ͺ Use gzip if file size is more important than speed.
  3. βœ… Set compression globally in Spark configs for consistency.
  4. 🚫 Don’t use compression=None in production unless required.

# Databricks-Ready Notebook: Delta Table Write with OPTIMIZE and ZORDER

# -----------------------------------
# Step 1: Setup (One-time)
# -----------------------------------
# Set paths and table names
delta_path = "/mnt/delta/sales_data"
delta_table = "sales_db.sales_delta"

# Sample data
data = [
    (1, "2024-01-01", "US", 100),
    (2, "2024-01-02", "IN", 150),
    (3, "2024-01-01", "US", 200),
    (4, "2024-01-03", "UK", 50),
    (5, "2024-01-02", "IN", 75)
]

schema = ["id", "sale_date", "country", "amount"]
df = spark.createDataFrame(data, schema)

# -----------------------------------
# Step 2: Write as Delta Table
# -----------------------------------
df.write \
  .format("delta") \
  .mode("overwrite") \
  .partitionBy("country") \
  .option("compression", "zstd") \
  .save(delta_path)

# Register as a managed or external table
spark.sql(f"""
    CREATE DATABASE IF NOT EXISTS sales_db;
    DROP TABLE IF EXISTS {delta_table};
    CREATE TABLE {delta_table} 
    USING DELTA 
    LOCATION '{delta_path}'
""")

# -----------------------------------
# Step 3: View Table
# -----------------------------------
spark.sql(f"SELECT * FROM {delta_table} LIMIT 10").show()

# -----------------------------------
# Step 4: Run OPTIMIZE + ZORDER (Delta Lake Only)
# -----------------------------------
spark.sql(f"""
    OPTIMIZE {delta_table}
    ZORDER BY (sale_date, country)
""")

# -----------------------------------
# Step 5: Enable Auto Optimizations (Optional)
# -----------------------------------
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", True)
spark.conf.set("spark.databricks.delta.autoCompact.enabled", True)

# -----------------------------------
# Step 6: Re-read Optimized Data (Optional Validation)
# -----------------------------------
optimized_df = spark.read.format("delta").load(delta_path)
optimized_df.show()

# βœ… Notebook Summary:
# - Created a Delta table with partitioning and compression
# - Ran Z-Ordering to optimize read performance
# - Enabled auto-compaction and optimized writes

Pages: 1 2 3 4 5 6 7


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Posted in

Leave a Reply

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading