You’re asking for a full PySpark OOP-based ETL framework:

✅ Full working PySpark project template (inline)
✅ Add SQL transformation step using a metadata table
✅ Add parallel file loading and dependency handling

Let’s build this step-by-step, all inline and complete.


🔥 PROJECT: OOP-Based Metadata-Driven ETL Framework (PySpark)


🎯 Goal:

  • Read CSV files from metadata table
  • Apply SQL transformations from metadata
  • Load into target
  • Add parallel file reading & dependency handling
  • Fully modular and extensible via classes

✅ 1. SIMULATED METADATA TABLE

We’ll simulate metadata as a list of dicts (can be Hive table or JSON config):

metadata_table = [
    {
        "step_id": 1,
        "source_path": "/data/file1.csv",
        "sql_transformation": "SELECT id, UPPER(name) AS name FROM input",
        "target_table": "silver.customer",
        "dependencies": []
    },
    {
        "step_id": 2,
        "source_path": "/data/file2.csv",
        "sql_transformation": "SELECT * FROM input WHERE status = 'ACTIVE'",
        "target_table": "silver.active_customers",
        "dependencies": [1]
    }
]

✅ 2. COMMON CLASSES


🔧 reader.py

class CSVReader:
    def __init__(self, spark):
        self.spark = spark

    def read(self, path):
        return self.spark.read.option("header", True).csv(path)

🔧 transformer.py

class SQLTransformer:
    def __init__(self, spark):
        self.spark = spark

    def apply_sql(self, df, sql):
        df.createOrReplaceTempView("input")
        return self.spark.sql(sql)

🔧 writer.py

class DataWriter:
    def __init__(self, mode="overwrite"):
        self.mode = mode

    def write_table(self, df, table):
        df.write.mode(self.mode).saveAsTable(table)

🔧 logger.py

class LoggerMixin:
    def log(self, message):
        print(f"[{self.__class__.__name__}] {message}")

✅ 3. JOB EXECUTOR CLASS WITH DEPENDENCIES

class ETLStep(LoggerMixin):
    def __init__(self, spark, metadata):
        self.spark = spark
        self.meta = metadata
        self.reader = CSVReader(spark)
        self.transformer = SQLTransformer(spark)
        self.writer = DataWriter()

    def run(self):
        self.log(f"Running Step {self.meta['step_id']} - {self.meta['target_table']}")
        df = self.reader.read(self.meta["source_path"])
        df_transformed = self.transformer.apply_sql(df, self.meta["sql_transformation"])
        self.writer.write_table(df_transformed, self.meta["target_table"])
        self.log(f"Completed Step {self.meta['step_id']}")

✅ 4. DAG-STYLE PARALLEL EXECUTION WITH DEPENDENCY HANDLING

from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor

class ETLPipelineRunner(LoggerMixin):
    def __init__(self, spark, metadata_table):
        self.spark = spark
        self.metadata_table = metadata_table
        self.executed = set()
        self.step_map = {meta['step_id']: meta for meta in metadata_table}
        self.dependency_graph = self._build_graph()

    def _build_graph(self):
        graph = defaultdict(list)
        for meta in self.metadata_table:
            for dep in meta['dependencies']:
                graph[dep].append(meta['step_id'])
        return graph

    def _runnable_steps(self):
        return [step_id for step_id, meta in self.step_map.items()
                if step_id not in self.executed and all(dep in self.executed for dep in meta['dependencies'])]

    def run(self):
        while len(self.executed) < len(self.step_map):
            runnable = self._runnable_steps()
            if not runnable:
                raise Exception("Circular dependency or misconfigured DAG")
            self.log(f"Running steps: {runnable}")

            with ThreadPoolExecutor(max_workers=4) as executor:
                futures = []
                for step_id in runnable:
                    meta = self.step_map[step_id]
                    job = ETLStep(self.spark, meta)
                    futures.append(executor.submit(job.run))
                    self.executed.add(step_id)
                for future in futures:
                    future.result()

✅ 5. MAIN FILE TO EXECUTE

from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession.builder.appName("MetadataETL").enableHiveSupport().getOrCreate()

    metadata_table = [
        {
            "step_id": 1,
            "source_path": "/data/file1.csv",
            "sql_transformation": "SELECT id, UPPER(name) AS name FROM input",
            "target_table": "silver.customer",
            "dependencies": []
        },
        {
            "step_id": 2,
            "source_path": "/data/file2.csv",
            "sql_transformation": "SELECT * FROM input WHERE status = 'ACTIVE'",
            "target_table": "silver.active_customers",
            "dependencies": [1]
        }
    ]

    runner = ETLPipelineRunner(spark, metadata_table)
    runner.run()

🧠 Highlights

FeatureExplanation
✅ OOP PrinciplesEach part of ETL is a separate class
✅ SQL via MetadataSQL transformations are metadata-driven
✅ Dependency HandlingSteps run based on DAG dependencies
✅ Parallel ExecutionSteps with no dependencies run in parallel
✅ ReusabilityAdd ParquetReader, JSONReader, S3Writer etc. easily

Pages: 1 2 3 4 5


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Posted in

Leave a Reply

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading