Here’s a full Pytest project setup for Spark unit testing β focused on testing your ETL components like readers, transformers, writers, and even the full pipeline.
β 1. π Project Structure
etl_project/
βββ base/
β βββ reader.py
β βββ transformer.py
β βββ writer.py
β βββ ...
βββ tests/
β βββ test_reader.py
β βββ test_transformer.py
β βββ conftest.py <-- Spark session fixture
β βββ test_writer.py
βββ data/
β βββ test_data.csv
βββ requirements.txt
β
2. requirements.txt
pytest
pyspark
delta-spark
Install with:
pip install -r requirements.txt
β
3. conftest.py
(Pytest Spark fixture)
import pytest
from pyspark.sql import SparkSession
@pytest.fixture(scope="session")
def spark():
spark = SparkSession.builder \
.appName("pytest-etl") \
.master("local[*]") \
.config("spark.sql.shuffle.partitions", "1") \
.config("spark.sql.catalogImplementation", "in-memory") \
.config("spark.sql.warehouse.dir", "/tmp/spark-warehouse") \
.enableHiveSupport() \
.getOrCreate()
yield spark
spark.stop()
- This gives shared Spark session for all test files.
β
4. test_reader.py
from base.reader import CSVReader
def test_csv_read_basic(spark):
reader = CSVReader(spark)
df = reader.read("data/test_data.csv")
assert df.count() > 0
assert "name" in df.columns
β
5. test_transformer.py
from base.transformer import SQLTransformer
def test_sql_uppercase(spark):
df = spark.createDataFrame([(1, "alice")], ["id", "name"])
transformer = SQLTransformer(spark)
df_out = transformer.apply_sql(df, "SELECT id, UPPER(name) as name FROM input")
row = df_out.collect()[0]
assert row["name"] == "ALICE"
β
6. test_writer.py
(for Delta write)
from base.writer import DeltaWriter
import os
def test_delta_write(spark, tmp_path):
path = str(tmp_path / "delta_test")
df = spark.createDataFrame([(1, "A")], ["id", "val"])
writer = DeltaWriter(spark)
writer.write_delta(df, "delta_test_table", versioning=False)
assert spark._jsparkSession.catalog().tableExists("delta_test_table")
β
7. Sample test_data.csv
for local tests
id,name,age
1,alice,30
2,bob,25
3,charlie,40
β 8. Run All Tests
pytest tests/
π Bonus: Testing Strategy Summary
Layer | Test Type | Example |
---|---|---|
Reader | Functional test | Read sample CSV |
Transformer | SQL logic test | UPPER(), filters, joins |
Writer | Persistence test | Write Delta table, check |
Pipeline | End-to-end test | Run metadata ETL |
Logging | Check audit rows | Count schema rows |
Retry/Error | Simulate failure | Raise exception, retry |
Leave a Reply