Here’s a full Pytest project setup for Spark unit testing – focused on testing your ETL components like readers, transformers, writers, and even the full pipeline.


βœ… 1. πŸ“ Project Structure

etl_project/
β”œβ”€β”€ base/
β”‚   β”œβ”€β”€ reader.py
β”‚   β”œβ”€β”€ transformer.py
β”‚   β”œβ”€β”€ writer.py
β”‚   └── ...
β”œβ”€β”€ tests/
β”‚   β”œβ”€β”€ test_reader.py
β”‚   β”œβ”€β”€ test_transformer.py
β”‚   β”œβ”€β”€ conftest.py  <-- Spark session fixture
β”‚   └── test_writer.py
β”œβ”€β”€ data/
β”‚   └── test_data.csv
β”œβ”€β”€ requirements.txt

βœ… 2. requirements.txt

pytest
pyspark
delta-spark

Install with:

pip install -r requirements.txt

βœ… 3. conftest.py (Pytest Spark fixture)

import pytest
from pyspark.sql import SparkSession

@pytest.fixture(scope="session")
def spark():
    spark = SparkSession.builder \
        .appName("pytest-etl") \
        .master("local[*]") \
        .config("spark.sql.shuffle.partitions", "1") \
        .config("spark.sql.catalogImplementation", "in-memory") \
        .config("spark.sql.warehouse.dir", "/tmp/spark-warehouse") \
        .enableHiveSupport() \
        .getOrCreate()

    yield spark
    spark.stop()
  • This gives shared Spark session for all test files.

βœ… 4. test_reader.py

from base.reader import CSVReader

def test_csv_read_basic(spark):
    reader = CSVReader(spark)
    df = reader.read("data/test_data.csv")
    assert df.count() > 0
    assert "name" in df.columns

βœ… 5. test_transformer.py

from base.transformer import SQLTransformer

def test_sql_uppercase(spark):
    df = spark.createDataFrame([(1, "alice")], ["id", "name"])
    transformer = SQLTransformer(spark)
    df_out = transformer.apply_sql(df, "SELECT id, UPPER(name) as name FROM input")
    
    row = df_out.collect()[0]
    assert row["name"] == "ALICE"

βœ… 6. test_writer.py (for Delta write)

from base.writer import DeltaWriter
import os

def test_delta_write(spark, tmp_path):
    path = str(tmp_path / "delta_test")
    df = spark.createDataFrame([(1, "A")], ["id", "val"])

    writer = DeltaWriter(spark)
    writer.write_delta(df, "delta_test_table", versioning=False)

    assert spark._jsparkSession.catalog().tableExists("delta_test_table")

βœ… 7. Sample test_data.csv for local tests

id,name,age
1,alice,30
2,bob,25
3,charlie,40

βœ… 8. Run All Tests

pytest tests/

πŸ”’ Bonus: Testing Strategy Summary

LayerTest TypeExample
ReaderFunctional testRead sample CSV
TransformerSQL logic testUPPER(), filters, joins
WriterPersistence testWrite Delta table, check
PipelineEnd-to-end testRun metadata ETL
LoggingCheck audit rowsCount schema rows
Retry/ErrorSimulate failureRaise exception, retry

Pages: 1 2 3 4 5


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Posted in

Leave a Reply

Your email address will not be published. Required fields are marked *

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading