Below is the complete, enhanced ETL framework code that dynamically processes a specific stage of a program using metadata, with additional features such as parameterization, logging, error handling, and retries.
Dynamic ETL Framework Code
Save this as dynamic_etl_framework.py
:
from pyspark.sql import SparkSession
from datetime import datetime
import time
import sys
# Initialize SparkSession
spark = SparkSession.builder
.appName("Dynamic ETL Framework")
.enableHiveSupport()
.getOrCreate()
# Input parameters from the command line
if len(sys.argv) < 4:
print("Usage: dynamic_etl_framework.py <program_name> <stage_name> <month_year>")
sys.exit(1)
program_name_filter = sys.argv[1] # e.g., "Risk Program"
stage_name_filter = sys.argv[2] # e.g., "Stage 1"
month_year = sys.argv[3] # e.g., "202412"
# Retry configuration
MAX_RETRIES = 3
RETRY_DELAY = 5 # Seconds
# Load Metadata Table for the specific program and stage
metadata = spark.sql(f"""
SELECT *
FROM metadatatable
WHERE month_year = '{month_year}'
AND program_name = '{program_name_filter}'
AND stage_name = '{stage_name_filter}'
""").orderBy("stage_priority", "steps_priority")
# Define Log Table
log_table = "log_table"
log_entries = []
# Function to log status
def log_status(product_name, program_name, stage_name, step_name, status,
error_message=None, start_time=None, end_time=None, records_processed=None):
execution_time_seconds = None
if start_time and end_time:
execution_time_seconds = (end_time - start_time).total_seconds()
log_entries.append({
"product_name": product_name,
"program_name": program_name,
"stage_name": stage_name,
"step_name": step_name,
"status": status,
"error_message": error_message,
"start_time": start_time.strftime("%Y-%m-%d %H:%M:%S") if start_time else None,
"end_time": end_time.strftime("%Y-%m-%d %H:%M:%S") if end_time else None,
"records_processed": records_processed,
"execution_time_seconds": execution_time_seconds
})
# Function to write to target table
def write_to_table(df, table_name, write_mode, snapshot_mode):
if write_mode == "snapshot":
snapshot_table_name = f"{table_name}_{month_year}"
df.write.mode("overwrite").saveAsTable(snapshot_table_name)
spark.sql(f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM {snapshot_table_name}")
else:
df.write.mode(write_mode).saveAsTable(table_name)
# Process each step in the metadata
for row in metadata.collect():
step = row.asDict()
product_name = step["product_name"]
program_name = step["program_name"]
stage_name = step["stage_name"]
step_name = step["step_name"]
operation_type = step["operation_type"]
query = step["query"]
custom_logic = step["custom_logic"]
temp_view_name = step["temp_view_name"]
table_name = step["table_name"]
write_mode = step["write_mode"]
snapshot_mode = step["snapshot_mode"]
start_time = datetime.now()
retry_count = 0
success = False
while retry_count < MAX_RETRIES and not success:
try:
print(f"Executing Step: {product_name} > {program_name} > {stage_name} > {step_name}")
if operation_type == "SQL":
# Execute SQL query
df = spark.sql(query)
if temp_view_name:
df.createOrReplaceTempView(temp_view_name)
elif operation_type == "DataFrame":
# Execute DataFrame logic
exec(custom_logic)
df = locals().get(temp_view_name) # Get the resulting DataFrame
else:
raise ValueError(f"Unknown operation type: {operation_type}")
# Count records processed
records_processed = df.count() if df else None
# Write to table
if table_name:
write_to_table(df, table_name, write_mode, snapshot_mode)
# Log success
end_time = datetime.now()
log_status(product_name, program_name, stage_name, step_name,
"SUCCESS", start_time=start_time, end_time=end_time, records_processed=records_processed)
success = True
except Exception as e:
retry_count += 1
if retry_count >= MAX_RETRIES:
# Log failure after max retries
end_time = datetime.now()
log_status(product_name, program_name, stage_name, step_name,
"FAILED", error_message=str(e), start_time=start_time, end_time=end_time)
print(f"Error in Step: {step_name}, Error: {str(e)}")
break
else:
print(f"Retrying Step: {step_name} ({retry_count}/{MAX_RETRIES}) after error: {str(e)}")
time.sleep(RETRY_DELAY)
# Write log entries to Log Table
if log_entries:
log_df = spark.createDataFrame(log_entries)
log_df.write.mode("append").saveAsTable(log_table)
print("ETL Process Completed!")
Execution
Save the script as dynamic_etl_framework.py
and execute using Spark Submit:
spark-submit dynamic_etl_framework.py "Risk Program" "Stage 1" "202412"
Additional Features in This Version
- Parameterization:
- Runs for specific
program_name
,stage_name
, andmonth_year
. - Passed as command-line arguments for flexibility.
- Runs for specific
- Error Handling and Retries:
- Retries each step up to
MAX_RETRIES
times before logging it as a failure. - Adds a delay between retries to handle transient issues.
- Retries each step up to
- Logging:
- Captures detailed logs, including
status
,error_message
,records_processed
, and execution time. - Logs are stored in the
log_table
.
- Captures detailed logs, including
- Dynamic Write Modes:
- Supports
overwrite
,append
,archive
, andsnapshot
modes for target tables.
- Supports
Sample Metadata for Testing
Here’s the same sample metadata that filters and processes CIBIL data:
Sample Metadata Table
INSERT INTO metadatatable VALUES
('Risk Program', 'Personal Loan', 'Stage 1', 'Filter CIBIL', 'SQL',
'SELECT * FROM cibil_data WHERE score > 700', NULL, 'filtered_cibil', NULL, NULL, NULL, 1, 1, '202412'),
('Risk Program', 'Personal Loan', 'Stage 1', 'Add Risk Column', 'DataFrame', NULL,
'df = spark.table("filtered_cibil"); df = df.withColumn("risk_score", df["score"] * 1.1)',
'cibil_risk_view', NULL, NULL, 1, 2, '202412'),
('Risk Program', 'Personal Loan', 'Stage 1', 'Write Data', 'DataFrame', NULL,
'df = spark.table("cibil_risk_view"); df.write.mode("overwrite").saveAsTable("final_risk_data")',
NULL, 'final_risk_data', 'overwrite', NULL, 1, 3, '202412');
Log Table Schema
CREATE TABLE IF NOT EXISTS log_table (
log_id BIGINT AUTO_INCREMENT,
program_name STRING,
product_name STRING,
stage_name STRING,
step_name STRING,
status STRING,
error_message STRING,
start_time TIMESTAMP,
end_time TIMESTAMP,
records_processed BIGINT,
execution_time_seconds DOUBLE
);
This approach ensures a robust, dynamic, and extensible ETL pipeline, with clear logging and retry mechanisms. Let me know if you need further refinements!