You’re ready for the next-level enterprise-grade ETL framework. Let’s extend your PySpark OOP-based metadata ETL framework with:
✅ Delta Lake Writer
✅ ETL Logging with start_time
, end_time
, status
✅ Target Table Versioning (Soft Updates)
✅ All inline code – fully printed
🔥 Advanced PySpark Metadata-Driven ETL Framework (Delta + Logging + Versioning)
✅ 1. Add Delta Lake Support in DataWriter
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp, lit
class DeltaWriter:
def __init__(self, spark, mode="overwrite"):
self.spark = spark
self.mode = mode
def write_delta(self, df, table, versioning=False):
if versioning:
df = df.withColumn("updated_at", current_timestamp())
if self._table_exists(table):
delta_tbl = DeltaTable.forName(self.spark, table)
delta_tbl.alias("t").merge(
df.alias("s"),
"t.id = s.id" # assuming 'id' is primary key
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
else:
df.write.format("delta").mode("overwrite").saveAsTable(table)
else:
df.write.format("delta").mode(self.mode).saveAsTable(table)
def _table_exists(self, table):
return self.spark._jsparkSession.catalog().tableExists(table)
✅ 2. Add JobLogger
Class to Log ETL Steps
from pyspark.sql import Row
from datetime import datetime
import traceback
class JobLogger:
def __init__(self, spark):
self.spark = spark
self.logs = []
def log_step(self, step_id, table, status, start_time, end_time, error=None):
self.logs.append(Row(
step_id=step_id,
table_name=table,
status=status,
start_time=start_time,
end_time=end_time,
error=str(error) if error else None
))
def write_log(self, table_name="etl_logs"):
log_df = self.spark.createDataFrame(self.logs)
log_df.write.mode("append").saveAsTable(table_name)
✅ 3. Update ETLStep
to Use DeltaWriter + Logging + Versioning
from datetime import datetime
class ETLStep(LoggerMixin):
def __init__(self, spark, metadata, logger):
self.spark = spark
self.meta = metadata
self.logger = logger
self.reader = CSVReader(spark)
self.transformer = SQLTransformer(spark)
self.writer = DeltaWriter(spark)
def run(self):
start_time = datetime.now()
status = "SUCCESS"
error = None
try:
self.log(f"Step {self.meta['step_id']} - START")
df = self.reader.read(self.meta["source_path"])
df = self.transformer.apply_sql(df, self.meta["sql_transformation"])
self.writer.write_delta(df, self.meta["target_table"], versioning=True)
self.log(f"Step {self.meta['step_id']} - DONE")
except Exception as e:
status = "FAILED"
error = traceback.format_exc()
self.log(f"Step {self.meta['step_id']} - ERROR")
finally:
end_time = datetime.now()
self.logger.log_step(
step_id=self.meta["step_id"],
table=self.meta["target_table"],
status=status,
start_time=start_time,
end_time=end_time,
error=error
)
✅ 4. Update PipelineRunner to Pass Logger and Write Log After Execution
class ETLPipelineRunner(LoggerMixin):
def __init__(self, spark, metadata_table):
self.spark = spark
self.metadata_table = metadata_table
self.executed = set()
self.step_map = {meta['step_id']: meta for meta in metadata_table}
self.logger = JobLogger(spark)
self.graph = self._build_graph()
def _build_graph(self):
from collections import defaultdict
graph = defaultdict(list)
for meta in self.metadata_table:
for dep in meta['dependencies']:
graph[dep].append(meta['step_id'])
return graph
def _runnable_steps(self):
return [step_id for step_id, meta in self.step_map.items()
if step_id not in self.executed and all(dep in self.executed for dep in meta['dependencies'])]
def run(self):
from concurrent.futures import ThreadPoolExecutor
while len(self.executed) < len(self.step_map):
runnable = self._runnable_steps()
if not runnable:
raise Exception("Circular dependency or misconfigured DAG")
self.log(f"Running: {runnable}")
with ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for step_id in runnable:
job = ETLStep(self.spark, self.step_map[step_id], self.logger)
futures.append(executor.submit(job.run))
self.executed.add(step_id)
for f in futures:
f.result()
self.logger.write_log("etl_logs")
✅ 5. Run from main.py
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession.builder \
.appName("AdvancedMetadataETL") \
.enableHiveSupport() \
.getOrCreate()
metadata_table = [
{
"step_id": 1,
"source_path": "/data/customers.csv",
"sql_transformation": "SELECT id, UPPER(name) AS name FROM input",
"target_table": "silver.customer",
"dependencies": []
},
{
"step_id": 2,
"source_path": "/data/active.csv",
"sql_transformation": "SELECT * FROM input WHERE status = 'ACTIVE'",
"target_table": "silver.active_customers",
"dependencies": [1]
}
]
runner = ETLPipelineRunner(spark, metadata_table)
runner.run()
✅ What’s Added in This Version
Feature | Description |
---|---|
✅ DeltaWriter | Writes data using Delta Lake (merge or overwrite) |
✅ Soft Updates | Uses MERGE for versioned insert/update |
✅ Logging | etl_logs table tracks start_time , end_time , status , error |
✅ Parallelism | Steps with no dependencies run concurrently |
✅ OOP Design | Fully modular classes: Reader , Transformer , Writer , Logger |
🧠 Next-Level Enhancements (Optional)
Feature | Idea |
---|---|
🧪 Unit Test Framework | Add pytest-based tests for each class |
🛠 ParamTable Integration | Read transformations from Hive JSON or config table |
🔁 Retry Logic | Add retry per step with max attempts |
🚨 Alerting | Send email/Slack if a step fails |
📊 Audit Framework | Track row counts, schema, file size in metadata |
🌪 Streaming Support | Extend to use Spark Structured Streaming |
Perfect. Let’s now extend our PySpark ETL Framework with the requested production-grade features, all inline and organized for clarity:
✅ Extended Feature Set for PySpark OOP-Based Metadata ETL
Feature | Implementation |
---|---|
🧪 Unit Tests | Using pytest style for class logic |
🛠 ParamTable | Load metadata from Hive or JSON config |
🔁 Retry Logic | Decorator to retry failing steps |
🚨 Alerting | Email or Slack alerts on failure |
📊 Audit Framework | Row counts, schema, file size logging |
🛠 BASE SETUP
We’ll assume:
- Spark session is already initialized
- Delta Lake is enabled
- Hive tables are available for metadata, audit log, and alerts
✅ 1. 🔁 Retry Decorator (retry_utils.py
)
import time
import functools
def retry(times=3, delay=2):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(1, times + 1):
try:
return func(*args, **kwargs)
except Exception as e:
print(f"[Retry {attempt}] Failed: {e}")
time.sleep(delay)
raise Exception("Max retries exceeded")
return wrapper
return decorator
✅ 2. 🧪 Unit Test Example (test_csv_reader.py
)
import pytest
from base.reader import CSVReader
def test_csv_reader_succeeds(spark_session):
reader = CSVReader(spark_session)
df = reader.read("tests/test_data.csv")
assert df.count() > 0
assert "name" in df.columns
Use
conftest.py
to definespark_session
as a pytest fixture usingSparkSession.builder
.
✅ 3. 🛠 ParamTable Integration (metadata_loader.py
)
class MetadataLoader:
def __init__(self, spark):
self.spark = spark
def load_from_hive(self, table="etl_metadata"):
df = self.spark.table(table)
return [row.asDict() for row in df.collect()]
def load_from_json(self, path):
df = self.spark.read.option("multiline", True).json(path)
return [row.asDict() for row in df.collect()]
✅ 4. 🚨 Alerting (notifier.py
)
import smtplib
from email.message import EmailMessage
class AlertNotifier:
def __init__(self, email_to):
self.email_to = email_to
def send_email(self, subject, body):
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = "noreply@etlframework.com"
msg["To"] = self.email_to
msg.set_content(body)
try:
with smtplib.SMTP("localhost") as server:
server.send_message(msg)
except Exception as e:
print(f"Email alert failed: {e}")
✅ 5. 📊 Audit Logger (audit_logger.py
)
from pyspark.sql.functions import lit
from pyspark.sql import Row
class AuditLogger:
def __init__(self, spark):
self.spark = spark
self.rows = []
def log(self, df, step_id, table):
schema = str(df.schema.simpleString())
count = df.count()
self.rows.append(Row(
step_id=step_id,
target_table=table,
row_count=count,
schema=schema
))
def write(self, table="audit_logs"):
self.spark.createDataFrame(self.rows).write.mode("append").saveAsTable(table)
✅ 6. ETLStep Updated with Retry, Alerting, Audit
from retry_utils import retry
from notifier import AlertNotifier
from audit_logger import AuditLogger
class ETLStep(LoggerMixin):
def __init__(self, spark, meta, job_logger, audit_logger, notifier):
self.spark = spark
self.meta = meta
self.reader = CSVReader(spark)
self.transformer = SQLTransformer(spark)
self.writer = DeltaWriter(spark)
self.logger = job_logger
self.auditor = audit_logger
self.notifier = notifier
@retry(times=2, delay=5)
def run(self):
from datetime import datetime
import traceback
start_time = datetime.now()
status = "SUCCESS"
error = None
try:
self.log(f"Step {self.meta['step_id']} - START")
df = self.reader.read(self.meta["source_path"])
df = self.transformer.apply_sql(df, self.meta["sql_transformation"])
self.writer.write_delta(df, self.meta["target_table"], versioning=True)
self.auditor.log(df, self.meta["step_id"], self.meta["target_table"])
self.log(f"Step {self.meta['step_id']} - DONE")
except Exception as e:
status = "FAILED"
error = traceback.format_exc()
self.notifier.send_email(
f"ETL Failed - Step {self.meta['step_id']}",
f"Error:\n{error}"
)
self.log(f"Step {self.meta['step_id']} - ERROR")
finally:
end_time = datetime.now()
self.logger.log_step(
step_id=self.meta["step_id"],
table=self.meta["target_table"],
status=status,
start_time=start_time,
end_time=end_time,
error=error
)
✅ 7. Runner Integration
from metadata_loader import MetadataLoader
from notifier import AlertNotifier
from audit_logger import AuditLogger
class ETLPipelineRunner(LoggerMixin):
def __init__(self, spark):
self.spark = spark
self.logger = JobLogger(spark)
self.auditor = AuditLogger(spark)
self.notifier = AlertNotifier("admin@company.com")
self.loader = MetadataLoader(spark)
self.step_map = {}
self.executed = set()
def run(self):
metadata_table = self.loader.load_from_hive()
self.step_map = {m["step_id"]: m for m in metadata_table}
graph = {m["step_id"]: m["dependencies"] for m in metadata_table}
while len(self.executed) < len(self.step_map):
runnable = [sid for sid, deps in graph.items()
if sid not in self.executed and all(d in self.executed for d in deps)]
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for step_id in runnable:
job = ETLStep(
self.spark,
self.step_map[step_id],
self.logger,
self.auditor,
self.notifier
)
futures.append(executor.submit(job.run))
self.executed.add(step_id)
for f in futures: f.result()
self.logger.write_log()
self.auditor.write()
✅ 8. Hive Tables to Create
-- Metadata table
CREATE TABLE etl_metadata (
step_id INT,
source_path STRING,
sql_transformation STRING,
target_table STRING,
dependencies ARRAY<INT>
);
-- ETL log
CREATE TABLE etl_logs (
step_id INT,
table_name STRING,
status STRING,
start_time TIMESTAMP,
end_time TIMESTAMP,
error STRING
);
-- Audit log
CREATE TABLE audit_logs (
step_id INT,
target_table STRING,
row_count INT,
schema STRING
);
✅ Summary Table of Features
Feature | Tool/Logic Used |
---|---|
Retry Logic | @retry decorator |
ParamTable | Read Hive table or JSON |
Unit Testing | pytest + Spark fixture |
Alerting | Email (extend to Slack/webhook) |
Audit Logging | Row count, schema, saved in Hive |
Dependency Exec | ThreadPool + DAG check |
Leave a Reply