Here’s a structured, detailed set of PySpark + Databricks notebooks showing how traditional database features (ACID, SCD, schema evolution, etc.) are (or are not) supported in:

  1. Traditional RDBMS (Oracle/Postgres/SQL Server)
  2. Vanilla PySpark (with Parquet/ORC)
  3. Delta Lake (Databricks Delta Table)

βœ… Notebook Set: RDBMS vs PySpark vs Delta Lake Feature Comparison


πŸ”Ή 1. Atomicity: Transaction Commit/Rollback

πŸ§ͺ RDBMS:

BEGIN TRANSACTION;
INSERT INTO orders VALUES (1, 'A');
INSERT INTO orders VALUES (2, 'B');
ROLLBACK;  -- Nothing saved
-- COMMIT; would save both rows

❌ Vanilla PySpark:

# No atomic rollback
df1.write.mode("append").parquet("/tmp/orders")  # If this fails halfway, partial writes exist

βœ… Delta Lake:

spark.sql("BEGIN")
spark.sql("INSERT INTO delta_orders VALUES (1, 'A')")
spark.sql("INSERT INTO delta_orders VALUES (2, 'B')")
spark.sql("ROLLBACK")  # or COMMIT

πŸ”Ή 2. SCD Type 1 and 2

πŸ§ͺ RDBMS (SCD2):

-- End current row
UPDATE customers
SET end_date = CURRENT_DATE
WHERE cust_id = 101 AND end_date IS NULL;

-- Insert new row
INSERT INTO customers VALUES (101, 'NewName', CURRENT_DATE, NULL);

❌ PySpark with Parquet (manual logic):

# Read old, expire matching rows
existing = spark.read.parquet("/path")
expired = existing.withColumn("end_date", F.current_date())

# Append new version
new = incoming.withColumn("start_date", F.current_date()).withColumn("end_date", None)

final_df = expired.unionByName(new)
final_df.write.mode("overwrite").parquet("/path")  # Risky!

βœ… Delta Lake (MERGE):

from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "/delta/customers")

delta_table.alias("target").merge(
    source=incoming.alias("source"),
    condition="target.id = source.id AND target.end_date IS NULL"
).whenMatchedUpdate(set={
    "end_date": "current_date()"
}).whenNotMatchedInsert(values={
    "id": "source.id",
    "name": "source.name",
    "start_date": "current_date()",
    "end_date": "null"
}).execute()

πŸ”Ή 3. Schema Evolution

πŸ§ͺ RDBMS:

ALTER TABLE employees ADD department VARCHAR(100);

❌ Vanilla PySpark:

df_new.write.mode("append").parquet("/data/employees")  # Fails if schema changed!

βœ… Delta Lake:

df_new.write.option("mergeSchema", "true").format("delta").mode("append").save("/delta/employees")

πŸ”Ή 4. Time Travel

πŸ§ͺ RDBMS:

Some support via logs or flashback (Oracle Flashback, PostgreSQL WAL replay).

❌ Vanilla PySpark:

Not supported.

βœ… Delta Lake:

df_current = spark.read.format("delta").load("/delta/orders")
df_old = spark.read.format("delta").option("versionAsOf", 5).load("/delta/orders")

πŸ”Ή 5. MERGE / UPSERT

πŸ§ͺ RDBMS:

MERGE INTO orders USING new_orders
ON orders.id = new_orders.id
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...

❌ Vanilla PySpark:

Requires multiple read, join, filter, and overwrite steps.

βœ… Delta Lake:

deltaTable.merge(source_df, "target.id = source.id") \
  .whenMatchedUpdate(set={...}) \
  .whenNotMatchedInsert(values={...}) \
  .execute()

πŸ”Ή 6. Deletes / Updates

πŸ§ͺ RDBMS:

DELETE FROM orders WHERE status = 'cancelled';
UPDATE orders SET amount = amount * 1.1;

❌ PySpark:

No delete support natively. Need:

df = df.filter("status != 'cancelled'")
df.write.mode("overwrite").parquet("/path")

βœ… Delta Lake:

spark.sql("DELETE FROM orders WHERE status = 'cancelled'")

πŸ”Ή 7. Audit Logs / Change Tracking

πŸ§ͺ RDBMS:

Via triggers or system logs.

❌ PySpark:

Not supported. Manual logging required.

βœ… Delta Lake:

Automatically stored in _delta_log/ folder. You can inspect commit history:

spark.sql("DESCRIBE HISTORY delta_table")

πŸ”Ή 8. Indexing

πŸ§ͺ RDBMS:

CREATE INDEX idx_name ON employees(name);

❌ PySpark:

No indexing in Parquet/ORC.

βœ… Delta Lake + Z-Ordering:

spark.sql("OPTIMIZE delta_table ZORDER BY (customer_id)")

πŸ”Ή 9. Access Control

πŸ§ͺ RDBMS:

Role/User/Grant-based access.

❌ PySpark:

No built-in access control.

βœ… Databricks + Unity Catalog:

Fine-grained row/column access, data lineage.


βœ… BONUS Tips for PySpark Optimization (ETL Pipelines)

Optimization TechniqueWhen to Use
.checkpoint()Break large lineage, cache intermediate result safely
.persist(StorageLevel.DISK_ONLY)For heavy data, avoid memory overflow
.broadcast()For small lookup tables (like dimension) in joins
.repartition(n)Optimize number of files / avoid skew
.coalesce(n)Reduce number of partitions before write
OPTIMIZE + ZORDER (Delta)Efficient reads by clustering
mergeSchema in DeltaHandle evolving data structures cleanly
.foreachPartition()Avoid driver overload in custom writes
.rdd.checkpoint()Same for RDD-based pipelines

πŸ“ Notebook Folder Structure (Recommended)

/dbfs/FileStore/notebooks/
β”‚
β”œβ”€β”€ 01_acid_transactions_rdbms_vs_delta.ipynb
β”œβ”€β”€ 02_scd_type1_type2_comparisons.ipynb
β”œβ”€β”€ 03_schema_evolution_delta.ipynb
β”œβ”€β”€ 04_merge_vs_upsert.ipynb
β”œβ”€β”€ 05_delta_time_travel.ipynb
β”œβ”€β”€ 06_delete_update_comparisons.ipynb
β”œβ”€β”€ 07_delta_logs_audit.ipynb
β”œβ”€β”€ 08_zorder_indexing_example.ipynb
β”œβ”€β”€ 09_optimization_tips.ipynb
└── 10_delta_vs_parquet_feature_matrix.ipynb

Pages: 1 2


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Posted in

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading