Here’s Post 5: Medallion Architecture with Delta Lake — the heart of scalable Lakehouse pipelines. This post is written in a tutorial/blog format with clear steps, diagrams, and hands-on examples for Databricks.
🪙 Post 5: Medallion Architecture with Delta Lake (Bronze → Silver → Gold)
The Medallion Architecture is a layered data engineering design that organizes your data into 3 zones:
- Bronze: Raw ingestion
- Silver: Cleaned and enriched
- Gold: Business-level aggregations and analytics
This approach is modular, testable, and production-ready, perfect for modern Lakehouse pipelines.
🏗️ Why Medallion Architecture?
Layer | Purpose | Characteristics |
---|---|---|
Bronze | Ingest raw data | Append-only, minimal transformation |
Silver | Clean and join data | Schema-aligned, deduplicated, enriched |
Gold | Aggregate and serve for BI/ML | Optimized for queries, dashboards |
Benefits:
- Clear lineage and modularity
- Versioning via Delta Lake
- Easy to maintain and test
📁 Sample Dataset
Let’s work with:
customers.csv
orders.csv
products.csv
All stored at /FileStore/tables/...
or coming from S3/ADLS.
⚙️ Step 1: Ingest Raw Data → Bronze Layer
Load raw data and write directly to Delta (no cleaning).
customers_df = spark.read.option("header", True).csv("/FileStore/tables/customers.csv")
orders_df = spark.read.option("header", True).csv("/FileStore/tables/orders.csv")
products_df = spark.read.option("header", True).csv("/FileStore/tables/products.csv")
customers_df.write.format("delta").mode("overwrite").save("/delta/bronze/customers")
orders_df.write.format("delta").mode("overwrite").save("/delta/bronze/orders")
products_df.write.format("delta").mode("overwrite").save("/delta/bronze/products")
💡 Tip: No deduplication or transformation here.
🧽 Step 2: Clean & Join → Silver Layer
This layer applies cleaning, typecasting, filtering, deduplication, etc.
from pyspark.sql.functions import col
# Read from Bronze
bronze_customers = spark.read.format("delta").load("/delta/bronze/customers")
bronze_orders = spark.read.format("delta").load("/delta/bronze/orders")
# Cleaning and transformation
silver_customers = bronze_customers.dropDuplicates(["customer_id"]) \
.filter(col("customer_id").isNotNull())
silver_orders = bronze_orders \
.withColumn("amount", col("amount").cast("double")) \
.dropDuplicates(["order_id"]) \
.filter(col("order_status") == "COMPLETE")
# Save to Silver
silver_customers.write.format("delta").mode("overwrite").save("/delta/silver/customers")
silver_orders.write.format("delta").mode("overwrite").save("/delta/silver/orders")
📊 Step 3: Aggregate → Gold Layer
Join silver tables and perform business logic.
🔹 Example Use Case: Total Spending per Customer
silver_orders = spark.read.format("delta").load("/delta/silver/orders")
silver_customers = spark.read.format("delta").load("/delta/silver/customers")
gold_spending = silver_orders.join(silver_customers, "customer_id") \
.groupBy("customer_id", "first_name", "last_name") \
.agg({"amount": "sum"}) \
.withColumnRenamed("sum(amount)", "total_spent")
# Save as Gold Layer
gold_spending.write.format("delta").mode("overwrite").save("/delta/gold/customer_spending")
Now the gold/customer_spending
table is optimized for dashboards, reporting, and ML.
🧭 Visual Summary (Diagram)
+-----------------+
| Raw CSV/JSON |
+--------+--------+
|
Bronze
+--------v--------+
| Raw Ingested |
| Delta Tables |
+--------+--------+
|
Silver
+--------v--------+
| Cleaned & Joined|
| Delta Tables |
+--------+--------+
|
Gold
+--------v--------+
| BI-Ready Views |
| Aggregated Deltas|
+-----------------+
📘 Use Case Examples for Each Layer
Layer | Example Use Case |
---|---|
Bronze | Store real-time streaming data as-is |
Silver | Filter valid events, join customer data |
Gold | Daily revenue report, product insights |
🧪 Optional: Register Tables in Metastore
-- Bronze
CREATE TABLE bronze_customers USING DELTA LOCATION '/delta/bronze/customers';
-- Silver
CREATE TABLE silver_customers USING DELTA LOCATION '/delta/silver/customers';
-- Gold
CREATE TABLE gold_customer_spending USING DELTA LOCATION '/delta/gold/customer_spending';
Now accessible from BI tools or notebooks via SQL!
🔄 Integrate with Delta Features
- Use MERGE INTO when loading Bronze to Silver for incremental loads
- Use Time Travel for reproducibility
- Use ZORDER & OPTIMIZE for Gold queries
✅ Summary
Step | Output |
---|---|
Bronze | Raw Delta tables |
Silver | Cleaned, deduplicated data |
Gold | Business logic: revenue, trends, etc. |
Medallion Architecture + Delta Lake gives you:
- Modular pipeline
- Reproducibility
- Analytics-ready data
⏭️ What’s Next?
Post 6: Building a Mini Project with Jobs, Triggers & Auto Load (Streaming + Batch)
We’ll schedule:
- Bronze loads from cloud storage
- Auto-processing Silver & Gold
- Dashboard-ready outputs
# Databricks Notebook: Read JSON + Complex Transformations + Delta Write
# -------------------------------
# Step 1: Read JSON Data (Nested, Multiline)
# -------------------------------
input_path = "/mnt/data/events.json"
raw_df = spark.read \
.option("multiLine", True) \
.option("mode", "PERMISSIVE") \
.option("inferSchema", True) \
.json(input_path)
raw_df.printSchema()
raw_df.show(truncate=False)
# -------------------------------
# Step 2: Flatten Nested JSON
# -------------------------------
from pyspark.sql.functions import col, explode
flattened_df = raw_df.select(
col("event_id"),
col("event_time"),
col("user.id").alias("user_id"),
col("user.name").alias("user_name"),
explode(col("event_details")) # explode array
)
flattened_df.show()
# -------------------------------
# Step 3: Complex Transformations
# -------------------------------
from pyspark.sql.functions import year, month, to_date, struct
transformed_df = flattened_df.withColumn("event_date", to_date("event_time")) \
.withColumn("year", year("event_date")) \
.withColumn("month", month("event_date")) \
.withColumn("user_info", struct("user_id", "user_name"))
transformed_df.printSchema()
# -------------------------------
# Step 4: Write as Partitioned Delta Table
# -------------------------------
delta_path = "/mnt/delta/events_transformed"
delta_table = "analytics.events_json_transformed"
transformed_df.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("year", "month") \
.option("compression", "zstd") \
.save(delta_path)
spark.sql(f"""
CREATE DATABASE IF NOT EXISTS analytics;
DROP TABLE IF EXISTS {delta_table};
CREATE TABLE {delta_table} USING DELTA LOCATION '{delta_path}';
""")
# -------------------------------
# Step 5: Databricks-Specific: ZORDER and Display
# -------------------------------
spark.sql(f"""
OPTIMIZE {delta_table} ZORDER BY (user_id, event_date)
""")
# -------------------------------
# Step 6: Enable Auto Optimizations (Optional)
# -------------------------------
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", True)
spark.conf.set("spark.databricks.delta.autoCompact.enabled", True)
# -------------------------------
# Step 7: Use Databricks Widgets (Optional UI Filter)
# -------------------------------
dbx = dbutils.widgets
dbx.text("country", "US")
country_filter = dbx.get("country")
# Use widget value in filter
spark.read.format("delta").load(delta_path).filter(f"country = '{country_filter}'").display()
# -------------------------------
# Step 8: File Utilities & Preview (Databricks-Specific)
# -------------------------------
dbutils.fs.ls("/mnt/delta/events_transformed")
dbutils.fs.head("/mnt/delta/events_transformed/_delta_log/00000000000000000000.json")