βœ… PART 2: Data Engineering Project Using OOP + PySpark

🎯 Problem Statement:

Build a Metadata-driven ETL Framework in Python using OOP principles, powered by PySpark.


πŸ“¦ Project Modules:

ModulePurposeOOP Feature Used
DataReaderAbstract file reader classAbstract class
CSVReader, JSONReaderConcrete file readersInheritance
TransformerEncapsulates transformationsComposition
LoggerMixinAdds logging to any classMixin
ETLJobEncapsulates full jobHigh-level class
PipelineRunnerManages job execution flowReusability

🧱 Project Structure:

etl_framework/
β”œβ”€β”€ base/
β”‚   β”œβ”€β”€ reader.py
β”‚   β”œβ”€β”€ transformer.py
β”‚   β”œβ”€β”€ logger.py
β”œβ”€β”€ jobs/
β”‚   β”œβ”€β”€ etl_job.py
β”œβ”€β”€ main.py

🧰 reader.py

from abc import ABC, abstractmethod

class DataReader(ABC):
    @abstractmethod
    def read(self, path): pass

class CSVReader(DataReader):
    def read(self, path):
        return spark.read.option("header", True).csv(path)

class JSONReader(DataReader):
    def read(self, path):
        return spark.read.option("multiline", True).json(path)

βš™οΈ transformer.py

from pyspark.sql.functions import col

class Transformer:
    def __init__(self, df):
        self.df = df

    def drop_nulls(self):
        self.df = self.df.dropna()
        return self

    def uppercase(self, column):
        self.df = self.df.withColumn(column, col(column).cast("string").alias(column.upper()))
        return self

    def get(self):
        return self.df

πŸ“ logger.py

class LoggerMixin:
    def log(self, message):
        print(f"[{self.__class__.__name__}] {message}")

πŸš€ etl_job.py

from base.logger import LoggerMixin
from base.reader import CSVReader
from base.transformer import Transformer

class ETLJob(LoggerMixin):
    def __init__(self, path):
        self.path = path
        self.reader = CSVReader()

    def run(self):
        self.log("Starting job")
        df = self.reader.read(self.path)
        df = Transformer(df).drop_nulls().get()
        df.show()
        self.log("Job finished")

▢️ main.py

from jobs.etl_job import ETLJob

if __name__ == "__main__":
    job = ETLJob("/data/sample.csv")
    job.run()

πŸ“ˆ Advanced Add-Ons

  • Add MetadataTableReader class (reads from Hive metastore)
  • Add RetryMixin, AuditLoggerMixin
  • Add ParameterStore as Singleton class
  • Add versioned schema enforcement
  • Track success/failure using a LogWriter class

πŸ”₯ Benefits of OOP in Data Engineering

BenefitExplanation
ReusabilityReuse readers/transformers across jobs
EncapsulationLogic isolated in classes
TestabilityEasy to unit test each class
ExtensibilityAdd new readers, transformations, loggers
MaintainabilityEasier to manage larger projects

Pages: 1 2 3 4 5


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Posted in

Leave a Reply

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading