Here’s Post 5: Medallion Architecture with Delta Lake — the heart of scalable Lakehouse pipelines. This post is written in a tutorial/blog format with clear steps, diagrams, and hands-on examples for Databricks.


🪙 Post 5: Medallion Architecture with Delta Lake (Bronze → Silver → Gold)

The Medallion Architecture is a layered data engineering design that organizes your data into 3 zones:

  • Bronze: Raw ingestion
  • Silver: Cleaned and enriched
  • Gold: Business-level aggregations and analytics

This approach is modular, testable, and production-ready, perfect for modern Lakehouse pipelines.


🏗️ Why Medallion Architecture?

LayerPurposeCharacteristics
BronzeIngest raw dataAppend-only, minimal transformation
SilverClean and join dataSchema-aligned, deduplicated, enriched
GoldAggregate and serve for BI/MLOptimized for queries, dashboards

Benefits:

  • Clear lineage and modularity
  • Versioning via Delta Lake
  • Easy to maintain and test

📁 Sample Dataset

Let’s work with:

  • customers.csv
  • orders.csv
  • products.csv

All stored at /FileStore/tables/... or coming from S3/ADLS.


⚙️ Step 1: Ingest Raw Data → Bronze Layer

Load raw data and write directly to Delta (no cleaning).

customers_df = spark.read.option("header", True).csv("/FileStore/tables/customers.csv")
orders_df    = spark.read.option("header", True).csv("/FileStore/tables/orders.csv")
products_df  = spark.read.option("header", True).csv("/FileStore/tables/products.csv")

customers_df.write.format("delta").mode("overwrite").save("/delta/bronze/customers")
orders_df.write.format("delta").mode("overwrite").save("/delta/bronze/orders")
products_df.write.format("delta").mode("overwrite").save("/delta/bronze/products")

💡 Tip: No deduplication or transformation here.


🧽 Step 2: Clean & Join → Silver Layer

This layer applies cleaning, typecasting, filtering, deduplication, etc.

from pyspark.sql.functions import col

# Read from Bronze
bronze_customers = spark.read.format("delta").load("/delta/bronze/customers")
bronze_orders    = spark.read.format("delta").load("/delta/bronze/orders")

# Cleaning and transformation
silver_customers = bronze_customers.dropDuplicates(["customer_id"]) \
    .filter(col("customer_id").isNotNull())

silver_orders = bronze_orders \
    .withColumn("amount", col("amount").cast("double")) \
    .dropDuplicates(["order_id"]) \
    .filter(col("order_status") == "COMPLETE")

# Save to Silver
silver_customers.write.format("delta").mode("overwrite").save("/delta/silver/customers")
silver_orders.write.format("delta").mode("overwrite").save("/delta/silver/orders")

📊 Step 3: Aggregate → Gold Layer

Join silver tables and perform business logic.

🔹 Example Use Case: Total Spending per Customer

silver_orders    = spark.read.format("delta").load("/delta/silver/orders")
silver_customers = spark.read.format("delta").load("/delta/silver/customers")

gold_spending = silver_orders.join(silver_customers, "customer_id") \
    .groupBy("customer_id", "first_name", "last_name") \
    .agg({"amount": "sum"}) \
    .withColumnRenamed("sum(amount)", "total_spent")

# Save as Gold Layer
gold_spending.write.format("delta").mode("overwrite").save("/delta/gold/customer_spending")

Now the gold/customer_spending table is optimized for dashboards, reporting, and ML.


🧭 Visual Summary (Diagram)

            +-----------------+
            |  Raw CSV/JSON   |
            +--------+--------+
                     |
                   Bronze
            +--------v--------+
            |  Raw Ingested   |
            |  Delta Tables   |
            +--------+--------+
                     |
                   Silver
            +--------v--------+
            | Cleaned & Joined|
            |  Delta Tables   |
            +--------+--------+
                     |
                   Gold
            +--------v--------+
            | BI-Ready Views  |
            | Aggregated Deltas|
            +-----------------+

📘 Use Case Examples for Each Layer

LayerExample Use Case
BronzeStore real-time streaming data as-is
SilverFilter valid events, join customer data
GoldDaily revenue report, product insights

🧪 Optional: Register Tables in Metastore

-- Bronze
CREATE TABLE bronze_customers USING DELTA LOCATION '/delta/bronze/customers';

-- Silver
CREATE TABLE silver_customers USING DELTA LOCATION '/delta/silver/customers';

-- Gold
CREATE TABLE gold_customer_spending USING DELTA LOCATION '/delta/gold/customer_spending';

Now accessible from BI tools or notebooks via SQL!


🔄 Integrate with Delta Features

  • Use MERGE INTO when loading Bronze to Silver for incremental loads
  • Use Time Travel for reproducibility
  • Use ZORDER & OPTIMIZE for Gold queries

✅ Summary

StepOutput
BronzeRaw Delta tables
SilverCleaned, deduplicated data
GoldBusiness logic: revenue, trends, etc.

Medallion Architecture + Delta Lake gives you:

  • Modular pipeline
  • Reproducibility
  • Analytics-ready data

⏭️ What’s Next?

Post 6: Building a Mini Project with Jobs, Triggers & Auto Load (Streaming + Batch)

We’ll schedule:

  • Bronze loads from cloud storage
  • Auto-processing Silver & Gold
  • Dashboard-ready outputs

# Databricks Notebook: Read JSON + Complex Transformations + Delta Write

# -------------------------------
# Step 1: Read JSON Data (Nested, Multiline)
# -------------------------------
input_path = "/mnt/data/events.json"

raw_df = spark.read \
    .option("multiLine", True) \
    .option("mode", "PERMISSIVE") \
    .option("inferSchema", True) \
    .json(input_path)

raw_df.printSchema()
raw_df.show(truncate=False)

# -------------------------------
# Step 2: Flatten Nested JSON
# -------------------------------
from pyspark.sql.functions import col, explode

flattened_df = raw_df.select(
    col("event_id"),
    col("event_time"),
    col("user.id").alias("user_id"),
    col("user.name").alias("user_name"),
    explode(col("event_details"))  # explode array
)

flattened_df.show()

# -------------------------------
# Step 3: Complex Transformations
# -------------------------------
from pyspark.sql.functions import year, month, to_date, struct

transformed_df = flattened_df.withColumn("event_date", to_date("event_time")) \
    .withColumn("year", year("event_date")) \
    .withColumn("month", month("event_date")) \
    .withColumn("user_info", struct("user_id", "user_name"))

transformed_df.printSchema()

# -------------------------------
# Step 4: Write as Partitioned Delta Table
# -------------------------------
delta_path = "/mnt/delta/events_transformed"
delta_table = "analytics.events_json_transformed"

transformed_df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .option("compression", "zstd") \
    .save(delta_path)

spark.sql(f"""
    CREATE DATABASE IF NOT EXISTS analytics;
    DROP TABLE IF EXISTS {delta_table};
    CREATE TABLE {delta_table} USING DELTA LOCATION '{delta_path}';
""")

# -------------------------------
# Step 5: Databricks-Specific: ZORDER and Display
# -------------------------------
spark.sql(f"""
    OPTIMIZE {delta_table} ZORDER BY (user_id, event_date)
""")

# -------------------------------
# Step 6: Enable Auto Optimizations (Optional)
# -------------------------------
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", True)
spark.conf.set("spark.databricks.delta.autoCompact.enabled", True)

# -------------------------------
# Step 7: Use Databricks Widgets (Optional UI Filter)
# -------------------------------
dbx = dbutils.widgets

dbx.text("country", "US")
country_filter = dbx.get("country")

# Use widget value in filter
spark.read.format("delta").load(delta_path).filter(f"country = '{country_filter}'").display()

# -------------------------------
# Step 8: File Utilities & Preview (Databricks-Specific)
# -------------------------------
dbutils.fs.ls("/mnt/delta/events_transformed")
dbutils.fs.head("/mnt/delta/events_transformed/_delta_log/00000000000000000000.json")

Pages: 1 2 3 4 5 6


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Posted in

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading