Hereβs a structured, detailed set of PySpark + Databricks notebooks showing how traditional database features (ACID, SCD, schema evolution, etc.) are (or are not) supported in:
- Traditional RDBMS (Oracle/Postgres/SQL Server)
- Vanilla PySpark (with Parquet/ORC)
- Delta Lake (Databricks Delta Table)
β Notebook Set: RDBMS vs PySpark vs Delta Lake Feature Comparison
πΉ 1. Atomicity: Transaction Commit/Rollback
π§ͺ RDBMS:
BEGIN TRANSACTION;
INSERT INTO orders VALUES (1, 'A');
INSERT INTO orders VALUES (2, 'B');
ROLLBACK; -- Nothing saved
-- COMMIT; would save both rows
β Vanilla PySpark:
# No atomic rollback
df1.write.mode("append").parquet("/tmp/orders") # If this fails halfway, partial writes exist
β Delta Lake:
spark.sql("BEGIN")
spark.sql("INSERT INTO delta_orders VALUES (1, 'A')")
spark.sql("INSERT INTO delta_orders VALUES (2, 'B')")
spark.sql("ROLLBACK") # or COMMIT
πΉ 2. SCD Type 1 and 2
π§ͺ RDBMS (SCD2):
-- End current row
UPDATE customers
SET end_date = CURRENT_DATE
WHERE cust_id = 101 AND end_date IS NULL;
-- Insert new row
INSERT INTO customers VALUES (101, 'NewName', CURRENT_DATE, NULL);
β PySpark with Parquet (manual logic):
# Read old, expire matching rows
existing = spark.read.parquet("/path")
expired = existing.withColumn("end_date", F.current_date())
# Append new version
new = incoming.withColumn("start_date", F.current_date()).withColumn("end_date", None)
final_df = expired.unionByName(new)
final_df.write.mode("overwrite").parquet("/path") # Risky!
β Delta Lake (MERGE):
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "/delta/customers")
delta_table.alias("target").merge(
source=incoming.alias("source"),
condition="target.id = source.id AND target.end_date IS NULL"
).whenMatchedUpdate(set={
"end_date": "current_date()"
}).whenNotMatchedInsert(values={
"id": "source.id",
"name": "source.name",
"start_date": "current_date()",
"end_date": "null"
}).execute()
πΉ 3. Schema Evolution
π§ͺ RDBMS:
ALTER TABLE employees ADD department VARCHAR(100);
β Vanilla PySpark:
df_new.write.mode("append").parquet("/data/employees") # Fails if schema changed!
β Delta Lake:
df_new.write.option("mergeSchema", "true").format("delta").mode("append").save("/delta/employees")
πΉ 4. Time Travel
π§ͺ RDBMS:
Some support via logs or flashback (Oracle Flashback, PostgreSQL WAL replay).
β Vanilla PySpark:
Not supported.
β Delta Lake:
df_current = spark.read.format("delta").load("/delta/orders")
df_old = spark.read.format("delta").option("versionAsOf", 5).load("/delta/orders")
πΉ 5. MERGE / UPSERT
π§ͺ RDBMS:
MERGE INTO orders USING new_orders
ON orders.id = new_orders.id
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...
β Vanilla PySpark:
Requires multiple read, join, filter, and overwrite steps.
β Delta Lake:
deltaTable.merge(source_df, "target.id = source.id") \
.whenMatchedUpdate(set={...}) \
.whenNotMatchedInsert(values={...}) \
.execute()
πΉ 6. Deletes / Updates
π§ͺ RDBMS:
DELETE FROM orders WHERE status = 'cancelled';
UPDATE orders SET amount = amount * 1.1;
β PySpark:
No delete support natively. Need:
df = df.filter("status != 'cancelled'")
df.write.mode("overwrite").parquet("/path")
β Delta Lake:
spark.sql("DELETE FROM orders WHERE status = 'cancelled'")
πΉ 7. Audit Logs / Change Tracking
π§ͺ RDBMS:
Via triggers or system logs.
β PySpark:
Not supported. Manual logging required.
β Delta Lake:
Automatically stored in _delta_log/
folder. You can inspect commit history:
spark.sql("DESCRIBE HISTORY delta_table")
πΉ 8. Indexing
π§ͺ RDBMS:
CREATE INDEX idx_name ON employees(name);
β PySpark:
No indexing in Parquet/ORC.
β Delta Lake + Z-Ordering:
spark.sql("OPTIMIZE delta_table ZORDER BY (customer_id)")
πΉ 9. Access Control
π§ͺ RDBMS:
Role/User/Grant-based access.
β PySpark:
No built-in access control.
β Databricks + Unity Catalog:
Fine-grained row/column access, data lineage.
β BONUS Tips for PySpark Optimization (ETL Pipelines)
Optimization Technique | When to Use |
---|---|
.checkpoint() | Break large lineage, cache intermediate result safely |
.persist(StorageLevel.DISK_ONLY) | For heavy data, avoid memory overflow |
.broadcast() | For small lookup tables (like dimension) in joins |
.repartition(n) | Optimize number of files / avoid skew |
.coalesce(n) | Reduce number of partitions before write |
OPTIMIZE + ZORDER (Delta) | Efficient reads by clustering |
mergeSchema in Delta | Handle evolving data structures cleanly |
.foreachPartition() | Avoid driver overload in custom writes |
.rdd.checkpoint() | Same for RDD-based pipelines |
π Notebook Folder Structure (Recommended)
/dbfs/FileStore/notebooks/
β
βββ 01_acid_transactions_rdbms_vs_delta.ipynb
βββ 02_scd_type1_type2_comparisons.ipynb
βββ 03_schema_evolution_delta.ipynb
βββ 04_merge_vs_upsert.ipynb
βββ 05_delta_time_travel.ipynb
βββ 06_delete_update_comparisons.ipynb
βββ 07_delta_logs_audit.ipynb
βββ 08_zorder_indexing_example.ipynb
βββ 09_optimization_tips.ipynb
βββ 10_delta_vs_parquet_feature_matrix.ipynb