To build a dynamic automated process where the steps (queries or DataFrame operations) are read from a table, so you can design a flexible framework

Below is the complete, enhanced ETL framework code that dynamically processes a specific stage of a program using metadata, with additional features such as parameterization, logging, error handling, and retries.


Dynamic ETL Framework Code

Save this as dynamic_etl_framework.py:

from pyspark.sql import SparkSession
from datetime import datetime
import time
import sys

# Initialize SparkSession
spark = SparkSession.builder
    .appName("Dynamic ETL Framework")
    .enableHiveSupport()
    .getOrCreate()

# Input parameters from the command line
if len(sys.argv) < 4:
    print("Usage: dynamic_etl_framework.py <program_name> <stage_name> <month_year>")
    sys.exit(1)

program_name_filter = sys.argv[1]  # e.g., "Risk Program"
stage_name_filter = sys.argv[2]    # e.g., "Stage 1"
month_year = sys.argv[3]           # e.g., "202412"

# Retry configuration
MAX_RETRIES = 3
RETRY_DELAY = 5  # Seconds

# Load Metadata Table for the specific program and stage
metadata = spark.sql(f"""
    SELECT *
    FROM metadatatable
    WHERE month_year = '{month_year}'
      AND program_name = '{program_name_filter}'
      AND stage_name = '{stage_name_filter}'
""").orderBy("stage_priority", "steps_priority")

# Define Log Table
log_table = "log_table"
log_entries = []

# Function to log status
def log_status(product_name, program_name, stage_name, step_name, status,
               error_message=None, start_time=None, end_time=None, records_processed=None):
    execution_time_seconds = None
    if start_time and end_time:
        execution_time_seconds = (end_time - start_time).total_seconds()

    log_entries.append({
        "product_name": product_name,
        "program_name": program_name,
        "stage_name": stage_name,
        "step_name": step_name,
        "status": status,
        "error_message": error_message,
        "start_time": start_time.strftime("%Y-%m-%d %H:%M:%S") if start_time else None,
        "end_time": end_time.strftime("%Y-%m-%d %H:%M:%S") if end_time else None,
        "records_processed": records_processed,
        "execution_time_seconds": execution_time_seconds
    })

# Function to write to target table
def write_to_table(df, table_name, write_mode, snapshot_mode):
    if write_mode == "snapshot":
        snapshot_table_name = f"{table_name}_{month_year}"
        df.write.mode("overwrite").saveAsTable(snapshot_table_name)
        spark.sql(f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM {snapshot_table_name}")
    else:
        df.write.mode(write_mode).saveAsTable(table_name)

# Process each step in the metadata
for row in metadata.collect():
    step = row.asDict()
    product_name = step["product_name"]
    program_name = step["program_name"]
    stage_name = step["stage_name"]
    step_name = step["step_name"]
    operation_type = step["operation_type"]
    query = step["query"]
    custom_logic = step["custom_logic"]
    temp_view_name = step["temp_view_name"]
    table_name = step["table_name"]
    write_mode = step["write_mode"]
    snapshot_mode = step["snapshot_mode"]

    start_time = datetime.now()
    retry_count = 0
    success = False

    while retry_count < MAX_RETRIES and not success:
        try:
            print(f"Executing Step: {product_name} > {program_name} > {stage_name} > {step_name}")

            if operation_type == "SQL":
                # Execute SQL query
                df = spark.sql(query)
                if temp_view_name:
                    df.createOrReplaceTempView(temp_view_name)
            elif operation_type == "DataFrame":
                # Execute DataFrame logic
                exec(custom_logic)
                df = locals().get(temp_view_name)  # Get the resulting DataFrame
            else:
                raise ValueError(f"Unknown operation type: {operation_type}")

            # Count records processed
            records_processed = df.count() if df else None

            # Write to table
            if table_name:
                write_to_table(df, table_name, write_mode, snapshot_mode)

            # Log success
            end_time = datetime.now()
            log_status(product_name, program_name, stage_name, step_name,
                       "SUCCESS", start_time=start_time, end_time=end_time, records_processed=records_processed)
            success = True

        except Exception as e:
            retry_count += 1
            if retry_count >= MAX_RETRIES:
                # Log failure after max retries
                end_time = datetime.now()
                log_status(product_name, program_name, stage_name, step_name,
                           "FAILED", error_message=str(e), start_time=start_time, end_time=end_time)
                print(f"Error in Step: {step_name}, Error: {str(e)}")
                break
            else:
                print(f"Retrying Step: {step_name} ({retry_count}/{MAX_RETRIES}) after error: {str(e)}")
                time.sleep(RETRY_DELAY)

# Write log entries to Log Table
if log_entries:
    log_df = spark.createDataFrame(log_entries)
    log_df.write.mode("append").saveAsTable(log_table)

print("ETL Process Completed!")

Execution

Save the script as dynamic_etl_framework.py and execute using Spark Submit:

spark-submit dynamic_etl_framework.py "Risk Program" "Stage 1" "202412"

Additional Features in This Version

  1. Parameterization:
    • Runs for specific program_name, stage_name, and month_year.
    • Passed as command-line arguments for flexibility.
  2. Error Handling and Retries:
    • Retries each step up to MAX_RETRIES times before logging it as a failure.
    • Adds a delay between retries to handle transient issues.
  3. Logging:
    • Captures detailed logs, including status, error_message, records_processed, and execution time.
    • Logs are stored in the log_table.
  4. Dynamic Write Modes:
    • Supports overwrite, append, archive, and snapshot modes for target tables.

Sample Metadata for Testing

Here’s the same sample metadata that filters and processes CIBIL data:

Sample Metadata Table

INSERT INTO metadatatable VALUES
('Risk Program', 'Personal Loan', 'Stage 1', 'Filter CIBIL', 'SQL',
 'SELECT * FROM cibil_data WHERE score > 700', NULL, 'filtered_cibil', NULL, NULL, NULL, 1, 1, '202412'),

('Risk Program', 'Personal Loan', 'Stage 1', 'Add Risk Column', 'DataFrame', NULL,
 'df = spark.table("filtered_cibil"); df = df.withColumn("risk_score", df["score"] * 1.1)',
 'cibil_risk_view', NULL, NULL, 1, 2, '202412'),

('Risk Program', 'Personal Loan', 'Stage 1', 'Write Data', 'DataFrame', NULL,
 'df = spark.table("cibil_risk_view"); df.write.mode("overwrite").saveAsTable("final_risk_data")',
 NULL, 'final_risk_data', 'overwrite', NULL, 1, 3, '202412');

Log Table Schema

CREATE TABLE IF NOT EXISTS log_table (
    log_id BIGINT AUTO_INCREMENT,
    program_name STRING,
    product_name STRING,
    stage_name STRING,
    step_name STRING,
    status STRING,
    error_message STRING,
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    records_processed BIGINT,
    execution_time_seconds DOUBLE
);

This approach ensures a robust, dynamic, and extensible ETL pipeline, with clear logging and retry mechanisms. Let me know if you need further refinements!

Skills

Posted on

January 2, 2025