Hereβs a quick and clear guide on how to write data in Spark using DataFrame API, including different formats, modes, and use cases.
β Basic Syntax
df.write.format("format_type").option("key", "value").save("path_or_table")
πΎ Common Data Write Formats
Format | .format() string | Example Path |
---|---|---|
Parquet | "parquet" | "s3://bucket/folder" or "/tmp/output" |
CSV | "csv" | "file:/tmp/csv_output" |
JSON | "json" | "dbfs:/data/json" |
Delta | "delta" | "dbfs:/delta/events" |
ORC | "orc" | "hdfs:///output" |
Table | .saveAsTable() | Hive or Metastore table |
βοΈ Write Modes
Mode | Description |
---|---|
"overwrite" | Replace existing data |
"append" | Add to existing data |
"ignore" | Skip write if data exists |
"error" / "errorifexists" | Default; fails if path/table exists |
df.write.mode("overwrite").format("parquet").save("/path/output")
πΉ Examples
1. Write as Parquet
df.write.mode("overwrite").parquet("/path/to/parquet_output")
2. Write as CSV with Header
df.write.option("header", True).csv("/path/to/csv_output")
3. Write as Delta Table
df.write.format("delta").mode("overwrite").save("/delta/events")
4. Save as Hive Table (using metastore)
df.write.mode("overwrite").saveAsTable("db_name.table_name")
5. Partitioned Write
df.write.partitionBy("country", "year").mode("overwrite").parquet("/path/partitioned_output")
π Advanced Options
β Compression
df.write.option("compression", "gzip").csv("/path")
β Save to JDBC
df.write \
.format("jdbc") \
.option("url", "jdbc:mysql://host/db") \
.option("dbtable", "table_name") \
.option("user", "username") \
.option("password", "pass") \
.save()
π§ Best Practices
Tip | Why It’s Useful |
---|---|
Use Parquet or Delta | Efficient, supports schema, compression |
Use partitionBy on large data | Helps pruning & performance |
Use coalesce/repartition before write | Control number of output files |
Avoid small files | Group partitions or use .repartition(n) |
df.repartition(4).write.mode("overwrite").parquet("/optimized/output")
Yes, absolutely! β You can and should optimize writes in both vanilla PySpark and Databricks. Below are write optimization techniques, split into general PySpark (vanilla) and Databricks-specific approaches:
π WRITE OPTIMIZATIONS IN VANILLA PYSPARK
1. π Repartition Before Write
- Prevents too many small files
- Example:
df.repartition(10).write.parquet("/output/path")
2. π§© Coalesce to Reduce Output Files
- Use
.coalesce(n)
when writing less partitions - Ideal for appends or final writes
df.coalesce(1).write.mode("overwrite").csv("/output/single_file")
3. π§± Partition By Columns
- Use
.partitionBy()
on high-cardinality columns for large tables
df.write.partitionBy("year", "country").parquet("/partitioned/output")
4. ποΈ Use Efficient Formats
- Prefer Parquet, ORC, or Delta over CSV/JSON
df.write.format("parquet").save("/parquet_path")
5. π Avoid Small Files Problem
- Write with
repartition()
orcoalesce()
to avoid file explosion
β‘ ADDITIONAL WRITE OPTIMIZATIONS IN DATABRICKS
Databricks offers Delta Lake and platform-level features:
1. π optimize
+ ZORDER
(Delta Lake only)
- Optimizes file layout
OPTIMIZE delta.`/delta/events` ZORDER BY (event_date, user_id)
2. π§Ή Auto Compaction + Optimized Writes
- Enable in Delta Lake to reduce small files
SET spark.databricks.delta.autoCompact.enabled = true;
SET spark.databricks.delta.optimizeWrite.enabled = true;
3. π§― Schema Evolution
- Enable auto schema merge (only Delta supports it)
df.write.option("mergeSchema", "true").format("delta").mode("overwrite").save("/path")
4. β Use Databricks Runtime File Writers
- Faster than open-source Spark for writes
- Databricks Runtime uses Photon, vectorized IO, and write optimizations under the hood
π SUMMARY TABLE
Technique | Vanilla PySpark | Databricks | Format |
---|---|---|---|
repartition(n) / coalesce(n) | β | β | All |
partitionBy() | β | β | All |
optimize + ZORDER | β | β | Delta |
Auto-compaction | β | β | Delta |
Optimized Write | β | β | Delta |
Schema Evolution | Limited | β | Delta |
Format: Parquet/Delta | β | β | All |
Yes, absolutely! β Compression is one of the most effective and often overlooked write optimizations in both vanilla PySpark and Databricks.
ποΈ COMPRESSION OPTIONS IN PYSPARK
You can specify compression using .option("compression", "type")
when writing data.
πΉ Compression Types by Format
Format | Supported Compression | Default Compression |
---|---|---|
Parquet | snappy , gzip , brotli , lz4 , zstd , uncompressed | snappy |
CSV | gzip , bzip2 , lz4 , deflate , none | none |
JSON | Same as CSV | none |
ORC | zlib , snappy , none | zlib |
Delta | Uses Parquet compression (snappy by default) | snappy |
β Example: Write CSV with GZIP
df.write.option("compression", "gzip").csv("/path/csv_gzip")
β Example: Write Parquet with ZSTD
df.write.option("compression", "zstd").parquet("/path/parquet_zstd")
β Set Global Compression via Spark Config
spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
spark.conf.set("spark.sql.orc.compression.codec", "snappy")
π When to Use Which Compression?
Compression | Best For | Trade-offs |
---|---|---|
snappy | Balanced: speed + size | Fastest; slightly larger files |
gzip | Maximum compression | Slower to write/read |
zstd | High compression + speed | Newer; often best overall |
brotli | Text-heavy data | Slower; best for web/text data |
none | Testing or full I/O speed | Large files; not optimized |
π TIP: Delta Format
When using Delta, compression is handled via Parquet, so you can still configure it globally:
spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
df.write.format("delta").save("/delta/path")
π¦ Real-World Best Practices
- β
Use
snappy
orzstd
for Parquet/Delta in most workloads. - π§ͺ Use
gzip
if file size is more important than speed. - β Set compression globally in Spark configs for consistency.
- π« Donβt use
compression=None
in production unless required.
# Databricks-Ready Notebook: Delta Table Write with OPTIMIZE and ZORDER
# -----------------------------------
# Step 1: Setup (One-time)
# -----------------------------------
# Set paths and table names
delta_path = "/mnt/delta/sales_data"
delta_table = "sales_db.sales_delta"
# Sample data
data = [
(1, "2024-01-01", "US", 100),
(2, "2024-01-02", "IN", 150),
(3, "2024-01-01", "US", 200),
(4, "2024-01-03", "UK", 50),
(5, "2024-01-02", "IN", 75)
]
schema = ["id", "sale_date", "country", "amount"]
df = spark.createDataFrame(data, schema)
# -----------------------------------
# Step 2: Write as Delta Table
# -----------------------------------
df.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("country") \
.option("compression", "zstd") \
.save(delta_path)
# Register as a managed or external table
spark.sql(f"""
CREATE DATABASE IF NOT EXISTS sales_db;
DROP TABLE IF EXISTS {delta_table};
CREATE TABLE {delta_table}
USING DELTA
LOCATION '{delta_path}'
""")
# -----------------------------------
# Step 3: View Table
# -----------------------------------
spark.sql(f"SELECT * FROM {delta_table} LIMIT 10").show()
# -----------------------------------
# Step 4: Run OPTIMIZE + ZORDER (Delta Lake Only)
# -----------------------------------
spark.sql(f"""
OPTIMIZE {delta_table}
ZORDER BY (sale_date, country)
""")
# -----------------------------------
# Step 5: Enable Auto Optimizations (Optional)
# -----------------------------------
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", True)
spark.conf.set("spark.databricks.delta.autoCompact.enabled", True)
# -----------------------------------
# Step 6: Re-read Optimized Data (Optional Validation)
# -----------------------------------
optimized_df = spark.read.format("delta").load(delta_path)
optimized_df.show()
# β
Notebook Summary:
# - Created a Delta table with partitioning and compression
# - Ran Z-Ordering to optimize read performance
# - Enabled auto-compaction and optimized writes
Leave a Reply