You’re ready for the next-level enterprise-grade ETL framework. Let’s extend your PySpark OOP-based metadata ETL framework with:

Delta Lake Writer
ETL Logging with start_time, end_time, status
Target Table Versioning (Soft Updates)
✅ All inline code – fully printed


🔥 Advanced PySpark Metadata-Driven ETL Framework (Delta + Logging + Versioning)


✅ 1. Add Delta Lake Support in DataWriter

from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp, lit

class DeltaWriter:
    def __init__(self, spark, mode="overwrite"):
        self.spark = spark
        self.mode = mode

    def write_delta(self, df, table, versioning=False):
        if versioning:
            df = df.withColumn("updated_at", current_timestamp())
            if self._table_exists(table):
                delta_tbl = DeltaTable.forName(self.spark, table)
                delta_tbl.alias("t").merge(
                    df.alias("s"),
                    "t.id = s.id"  # assuming 'id' is primary key
                ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
            else:
                df.write.format("delta").mode("overwrite").saveAsTable(table)
        else:
            df.write.format("delta").mode(self.mode).saveAsTable(table)

    def _table_exists(self, table):
        return self.spark._jsparkSession.catalog().tableExists(table)

✅ 2. Add JobLogger Class to Log ETL Steps

from pyspark.sql import Row
from datetime import datetime
import traceback

class JobLogger:
    def __init__(self, spark):
        self.spark = spark
        self.logs = []

    def log_step(self, step_id, table, status, start_time, end_time, error=None):
        self.logs.append(Row(
            step_id=step_id,
            table_name=table,
            status=status,
            start_time=start_time,
            end_time=end_time,
            error=str(error) if error else None
        ))

    def write_log(self, table_name="etl_logs"):
        log_df = self.spark.createDataFrame(self.logs)
        log_df.write.mode("append").saveAsTable(table_name)

✅ 3. Update ETLStep to Use DeltaWriter + Logging + Versioning

from datetime import datetime

class ETLStep(LoggerMixin):
    def __init__(self, spark, metadata, logger):
        self.spark = spark
        self.meta = metadata
        self.logger = logger
        self.reader = CSVReader(spark)
        self.transformer = SQLTransformer(spark)
        self.writer = DeltaWriter(spark)

    def run(self):
        start_time = datetime.now()
        status = "SUCCESS"
        error = None
        try:
            self.log(f"Step {self.meta['step_id']} - START")
            df = self.reader.read(self.meta["source_path"])
            df = self.transformer.apply_sql(df, self.meta["sql_transformation"])
            self.writer.write_delta(df, self.meta["target_table"], versioning=True)
            self.log(f"Step {self.meta['step_id']} - DONE")
        except Exception as e:
            status = "FAILED"
            error = traceback.format_exc()
            self.log(f"Step {self.meta['step_id']} - ERROR")
        finally:
            end_time = datetime.now()
            self.logger.log_step(
                step_id=self.meta["step_id"],
                table=self.meta["target_table"],
                status=status,
                start_time=start_time,
                end_time=end_time,
                error=error
            )

✅ 4. Update PipelineRunner to Pass Logger and Write Log After Execution

class ETLPipelineRunner(LoggerMixin):
    def __init__(self, spark, metadata_table):
        self.spark = spark
        self.metadata_table = metadata_table
        self.executed = set()
        self.step_map = {meta['step_id']: meta for meta in metadata_table}
        self.logger = JobLogger(spark)
        self.graph = self._build_graph()

    def _build_graph(self):
        from collections import defaultdict
        graph = defaultdict(list)
        for meta in self.metadata_table:
            for dep in meta['dependencies']:
                graph[dep].append(meta['step_id'])
        return graph

    def _runnable_steps(self):
        return [step_id for step_id, meta in self.step_map.items()
                if step_id not in self.executed and all(dep in self.executed for dep in meta['dependencies'])]

    def run(self):
        from concurrent.futures import ThreadPoolExecutor
        while len(self.executed) < len(self.step_map):
            runnable = self._runnable_steps()
            if not runnable:
                raise Exception("Circular dependency or misconfigured DAG")

            self.log(f"Running: {runnable}")

            with ThreadPoolExecutor(max_workers=4) as executor:
                futures = []
                for step_id in runnable:
                    job = ETLStep(self.spark, self.step_map[step_id], self.logger)
                    futures.append(executor.submit(job.run))
                    self.executed.add(step_id)
                for f in futures:
                    f.result()

        self.logger.write_log("etl_logs")

✅ 5. Run from main.py

from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession.builder \
        .appName("AdvancedMetadataETL") \
        .enableHiveSupport() \
        .getOrCreate()

    metadata_table = [
        {
            "step_id": 1,
            "source_path": "/data/customers.csv",
            "sql_transformation": "SELECT id, UPPER(name) AS name FROM input",
            "target_table": "silver.customer",
            "dependencies": []
        },
        {
            "step_id": 2,
            "source_path": "/data/active.csv",
            "sql_transformation": "SELECT * FROM input WHERE status = 'ACTIVE'",
            "target_table": "silver.active_customers",
            "dependencies": [1]
        }
    ]

    runner = ETLPipelineRunner(spark, metadata_table)
    runner.run()

✅ What’s Added in This Version

FeatureDescription
DeltaWriterWrites data using Delta Lake (merge or overwrite)
Soft UpdatesUses MERGE for versioned insert/update
Loggingetl_logs table tracks start_time, end_time, status, error
ParallelismSteps with no dependencies run concurrently
OOP DesignFully modular classes: Reader, Transformer, Writer, Logger

🧠 Next-Level Enhancements (Optional)

FeatureIdea
🧪 Unit Test FrameworkAdd pytest-based tests for each class
🛠 ParamTable IntegrationRead transformations from Hive JSON or config table
🔁 Retry LogicAdd retry per step with max attempts
🚨 AlertingSend email/Slack if a step fails
📊 Audit FrameworkTrack row counts, schema, file size in metadata
🌪 Streaming SupportExtend to use Spark Structured Streaming

Perfect. Let’s now extend our PySpark ETL Framework with the requested production-grade features, all inline and organized for clarity:


✅ Extended Feature Set for PySpark OOP-Based Metadata ETL

FeatureImplementation
🧪 Unit TestsUsing pytest style for class logic
🛠 ParamTableLoad metadata from Hive or JSON config
🔁 Retry LogicDecorator to retry failing steps
🚨 AlertingEmail or Slack alerts on failure
📊 Audit FrameworkRow counts, schema, file size logging

🛠 BASE SETUP

We’ll assume:

  • Spark session is already initialized
  • Delta Lake is enabled
  • Hive tables are available for metadata, audit log, and alerts

✅ 1. 🔁 Retry Decorator (retry_utils.py)

import time
import functools

def retry(times=3, delay=2):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(1, times + 1):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    print(f"[Retry {attempt}] Failed: {e}")
                    time.sleep(delay)
            raise Exception("Max retries exceeded")
        return wrapper
    return decorator

✅ 2. 🧪 Unit Test Example (test_csv_reader.py)

import pytest
from base.reader import CSVReader

def test_csv_reader_succeeds(spark_session):
    reader = CSVReader(spark_session)
    df = reader.read("tests/test_data.csv")
    assert df.count() > 0
    assert "name" in df.columns

Use conftest.py to define spark_session as a pytest fixture using SparkSession.builder.


✅ 3. 🛠 ParamTable Integration (metadata_loader.py)

class MetadataLoader:
    def __init__(self, spark):
        self.spark = spark

    def load_from_hive(self, table="etl_metadata"):
        df = self.spark.table(table)
        return [row.asDict() for row in df.collect()]

    def load_from_json(self, path):
        df = self.spark.read.option("multiline", True).json(path)
        return [row.asDict() for row in df.collect()]

✅ 4. 🚨 Alerting (notifier.py)

import smtplib
from email.message import EmailMessage

class AlertNotifier:
    def __init__(self, email_to):
        self.email_to = email_to

    def send_email(self, subject, body):
        msg = EmailMessage()
        msg["Subject"] = subject
        msg["From"] = "noreply@etlframework.com"
        msg["To"] = self.email_to
        msg.set_content(body)

        try:
            with smtplib.SMTP("localhost") as server:
                server.send_message(msg)
        except Exception as e:
            print(f"Email alert failed: {e}")

✅ 5. 📊 Audit Logger (audit_logger.py)

from pyspark.sql.functions import lit
from pyspark.sql import Row

class AuditLogger:
    def __init__(self, spark):
        self.spark = spark
        self.rows = []

    def log(self, df, step_id, table):
        schema = str(df.schema.simpleString())
        count = df.count()
        self.rows.append(Row(
            step_id=step_id,
            target_table=table,
            row_count=count,
            schema=schema
        ))

    def write(self, table="audit_logs"):
        self.spark.createDataFrame(self.rows).write.mode("append").saveAsTable(table)

✅ 6. ETLStep Updated with Retry, Alerting, Audit

from retry_utils import retry
from notifier import AlertNotifier
from audit_logger import AuditLogger

class ETLStep(LoggerMixin):
    def __init__(self, spark, meta, job_logger, audit_logger, notifier):
        self.spark = spark
        self.meta = meta
        self.reader = CSVReader(spark)
        self.transformer = SQLTransformer(spark)
        self.writer = DeltaWriter(spark)
        self.logger = job_logger
        self.auditor = audit_logger
        self.notifier = notifier

    @retry(times=2, delay=5)
    def run(self):
        from datetime import datetime
        import traceback

        start_time = datetime.now()
        status = "SUCCESS"
        error = None

        try:
            self.log(f"Step {self.meta['step_id']} - START")
            df = self.reader.read(self.meta["source_path"])
            df = self.transformer.apply_sql(df, self.meta["sql_transformation"])
            self.writer.write_delta(df, self.meta["target_table"], versioning=True)
            self.auditor.log(df, self.meta["step_id"], self.meta["target_table"])
            self.log(f"Step {self.meta['step_id']} - DONE")
        except Exception as e:
            status = "FAILED"
            error = traceback.format_exc()
            self.notifier.send_email(
                f"ETL Failed - Step {self.meta['step_id']}",
                f"Error:\n{error}"
            )
            self.log(f"Step {self.meta['step_id']} - ERROR")
        finally:
            end_time = datetime.now()
            self.logger.log_step(
                step_id=self.meta["step_id"],
                table=self.meta["target_table"],
                status=status,
                start_time=start_time,
                end_time=end_time,
                error=error
            )

✅ 7. Runner Integration

from metadata_loader import MetadataLoader
from notifier import AlertNotifier
from audit_logger import AuditLogger

class ETLPipelineRunner(LoggerMixin):
    def __init__(self, spark):
        self.spark = spark
        self.logger = JobLogger(spark)
        self.auditor = AuditLogger(spark)
        self.notifier = AlertNotifier("admin@company.com")
        self.loader = MetadataLoader(spark)
        self.step_map = {}
        self.executed = set()

    def run(self):
        metadata_table = self.loader.load_from_hive()
        self.step_map = {m["step_id"]: m for m in metadata_table}
        graph = {m["step_id"]: m["dependencies"] for m in metadata_table}

        while len(self.executed) < len(self.step_map):
            runnable = [sid for sid, deps in graph.items()
                        if sid not in self.executed and all(d in self.executed for d in deps)]

            from concurrent.futures import ThreadPoolExecutor
            with ThreadPoolExecutor(max_workers=4) as executor:
                futures = []
                for step_id in runnable:
                    job = ETLStep(
                        self.spark,
                        self.step_map[step_id],
                        self.logger,
                        self.auditor,
                        self.notifier
                    )
                    futures.append(executor.submit(job.run))
                    self.executed.add(step_id)
                for f in futures: f.result()

        self.logger.write_log()
        self.auditor.write()

✅ 8. Hive Tables to Create

-- Metadata table
CREATE TABLE etl_metadata (
  step_id INT,
  source_path STRING,
  sql_transformation STRING,
  target_table STRING,
  dependencies ARRAY<INT>
);

-- ETL log
CREATE TABLE etl_logs (
  step_id INT,
  table_name STRING,
  status STRING,
  start_time TIMESTAMP,
  end_time TIMESTAMP,
  error STRING
);

-- Audit log
CREATE TABLE audit_logs (
  step_id INT,
  target_table STRING,
  row_count INT,
  schema STRING
);

✅ Summary Table of Features

FeatureTool/Logic Used
Retry Logic@retry decorator
ParamTableRead Hive table or JSON
Unit Testingpytest + Spark fixture
AlertingEmail (extend to Slack/webhook)
Audit LoggingRow count, schema, saved in Hive
Dependency ExecThreadPool + DAG check

Pages: 1 2 3 4 5


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Posted in

Leave a Reply

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading