You’re asking for a full PySpark OOP-based ETL framework:
✅ Full working PySpark project template (inline)
✅ Add SQL transformation step using a metadata table
✅ Add parallel file loading and dependency handling
Let’s build this step-by-step, all inline and complete.
🔥 PROJECT: OOP-Based Metadata-Driven ETL Framework (PySpark)
🎯 Goal:
- Read CSV files from metadata table
- Apply SQL transformations from metadata
- Load into target
- Add parallel file reading & dependency handling
- Fully modular and extensible via classes
✅ 1. SIMULATED METADATA TABLE
We’ll simulate metadata as a list of dicts (can be Hive table or JSON config):
metadata_table = [
{
"step_id": 1,
"source_path": "/data/file1.csv",
"sql_transformation": "SELECT id, UPPER(name) AS name FROM input",
"target_table": "silver.customer",
"dependencies": []
},
{
"step_id": 2,
"source_path": "/data/file2.csv",
"sql_transformation": "SELECT * FROM input WHERE status = 'ACTIVE'",
"target_table": "silver.active_customers",
"dependencies": [1]
}
]
✅ 2. COMMON CLASSES
🔧 reader.py
class CSVReader:
def __init__(self, spark):
self.spark = spark
def read(self, path):
return self.spark.read.option("header", True).csv(path)
🔧 transformer.py
class SQLTransformer:
def __init__(self, spark):
self.spark = spark
def apply_sql(self, df, sql):
df.createOrReplaceTempView("input")
return self.spark.sql(sql)
🔧 writer.py
class DataWriter:
def __init__(self, mode="overwrite"):
self.mode = mode
def write_table(self, df, table):
df.write.mode(self.mode).saveAsTable(table)
🔧 logger.py
class LoggerMixin:
def log(self, message):
print(f"[{self.__class__.__name__}] {message}")
✅ 3. JOB EXECUTOR CLASS WITH DEPENDENCIES
class ETLStep(LoggerMixin):
def __init__(self, spark, metadata):
self.spark = spark
self.meta = metadata
self.reader = CSVReader(spark)
self.transformer = SQLTransformer(spark)
self.writer = DataWriter()
def run(self):
self.log(f"Running Step {self.meta['step_id']} - {self.meta['target_table']}")
df = self.reader.read(self.meta["source_path"])
df_transformed = self.transformer.apply_sql(df, self.meta["sql_transformation"])
self.writer.write_table(df_transformed, self.meta["target_table"])
self.log(f"Completed Step {self.meta['step_id']}")
✅ 4. DAG-STYLE PARALLEL EXECUTION WITH DEPENDENCY HANDLING
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
class ETLPipelineRunner(LoggerMixin):
def __init__(self, spark, metadata_table):
self.spark = spark
self.metadata_table = metadata_table
self.executed = set()
self.step_map = {meta['step_id']: meta for meta in metadata_table}
self.dependency_graph = self._build_graph()
def _build_graph(self):
graph = defaultdict(list)
for meta in self.metadata_table:
for dep in meta['dependencies']:
graph[dep].append(meta['step_id'])
return graph
def _runnable_steps(self):
return [step_id for step_id, meta in self.step_map.items()
if step_id not in self.executed and all(dep in self.executed for dep in meta['dependencies'])]
def run(self):
while len(self.executed) < len(self.step_map):
runnable = self._runnable_steps()
if not runnable:
raise Exception("Circular dependency or misconfigured DAG")
self.log(f"Running steps: {runnable}")
with ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for step_id in runnable:
meta = self.step_map[step_id]
job = ETLStep(self.spark, meta)
futures.append(executor.submit(job.run))
self.executed.add(step_id)
for future in futures:
future.result()
✅ 5. MAIN FILE TO EXECUTE
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession.builder.appName("MetadataETL").enableHiveSupport().getOrCreate()
metadata_table = [
{
"step_id": 1,
"source_path": "/data/file1.csv",
"sql_transformation": "SELECT id, UPPER(name) AS name FROM input",
"target_table": "silver.customer",
"dependencies": []
},
{
"step_id": 2,
"source_path": "/data/file2.csv",
"sql_transformation": "SELECT * FROM input WHERE status = 'ACTIVE'",
"target_table": "silver.active_customers",
"dependencies": [1]
}
]
runner = ETLPipelineRunner(spark, metadata_table)
runner.run()
🧠 Highlights
Feature | Explanation |
---|---|
✅ OOP Principles | Each part of ETL is a separate class |
✅ SQL via Metadata | SQL transformations are metadata-driven |
✅ Dependency Handling | Steps run based on DAG dependencies |
✅ Parallel Execution | Steps with no dependencies run in parallel |
✅ Reusability | Add ParquetReader, JSONReader, S3Writer etc. easily |
Leave a Reply