PySpark Project Alert:- Dynamic list of variables Creation for ETL Jobs

by | Jul 7, 2024 | Pyspark | 0 comments

Let us create One or Multiple dynamic lists of variables and save it in dictionary or Array or other datastructure for further repeating use in Pyspark projects specially for ETL jobs. Variable names are in form of dynamic names for example Month_202401 to Month_202312 for 24 months ( Take these 24 month backdated or as per current month’s Progression). Later we will use this dictionary to create 24 csv files by filtering on year and month. we will also use to create arrays based on this dynamic dictionary. We will create custom excel files for 24 year months combination where column names are also in form of above 24 { year and months combination such as xyz_2404_04 to xyz_2404_12 , abc_xyz_2404_0405 to abc_xyz_2405_1201}. This requirement is specifically for Pyspark based ETL projects.

Creating dynamic lists of variables and saving them in a dictionary or array for further use in PySpark ETL jobs is an excellent idea to streamline and automate the process. Here’s a step-by-step guide to achieve this in a PySpark-based project.

Step 1: Generate Dynamic Variable Names

First, we need to generate dynamic variable names for the past 24 months from the current month. We’ll store these variable names in a dictionary for easy access.

from datetime import datetime, timedelta
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
import calendar

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("DynamicVariablesETL") \
    .getOrCreate()

# Get current date
current_date = datetime.now()

# Create a dictionary to hold variable names
dynamic_vars = {}

# Generate variable names for the past 24 months
for i in range(24):
    date = current_date - timedelta(days=i*30)
    year_month = date.strftime('%Y%m')
    dynamic_vars[f'Month_{year_month}'] = None

# Print the generated variable names
print(dynamic_vars)

Step 2: Create CSV Files by Filtering on Year and Month

Next, we’ll use the dynamic variable names to filter data and create CSV files. For simplicity, let’s assume we have a sample DataFrame to filter.

# Sample DataFrame
data = {
    'date': [datetime.now() - timedelta(days=i*15) for i in range(50)],
    'value': [i for i in range(50)]
}

df = pd.DataFrame(data)
spark_df = spark.createDataFrame(df)

# Function to create CSV files based on dynamic variables
def create_csv_files(spark_df, dynamic_vars):
    for var in dynamic_vars.keys():
        year_month = var.split('_')[1]
        year = int(year_month[:4])
        month = int(year_month[4:])
        
        filtered_df = spark_df.filter((F.year("date") == year) & (F.month("date") == month))
        
        if filtered_df.count() > 0:
            filtered_df.write.csv(f"/path/to/output/{var}.csv")

create_csv_files(spark_df, dynamic_vars)

Step 3: Create Arrays Based on Dynamic Dictionary

We can also create arrays based on the dynamic dictionary.

# Create arrays from dynamic dictionary
dynamic_arrays = [f'Month_{date.strftime("%Y%m")}' for date in [current_date - timedelta(days=i*30) for i in range(24)]]

print(dynamic_arrays)

Step 4: Create Custom Excel Files

Lastly, we can create custom Excel files with columns named based on the dynamic dictionary.

import openpyxl
from openpyxl.utils.dataframe import dataframe_to_rows

# Function to create custom Excel files
def create_excel_files(dynamic_vars):
    for var in dynamic_vars.keys():
        year_month = var.split('_')[1]
        year = year_month[:4]
        month = year_month[4:]

        # Create a DataFrame with custom columns
        data = {
            f'xyz_{year}_{month}': [i for i in range(10)],
            f'abc_xyz_{year}_{month}{month}': [i for i in range(10)]
        }
        
        df = pd.DataFrame(data)
        
        # Create an Excel file
        file_name = f'/path/to/output/{var}.xlsx'
        writer = pd.ExcelWriter(file_name, engine='openpyxl')
        
        df.to_excel(writer, sheet_name='Sheet1', index=False)
        
        writer.save()

create_excel_files(dynamic_vars)

Combining It All Together

Here is the complete script that combines all the steps above.

from datetime import datetime, timedelta
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
import calendar

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("DynamicVariablesETL") \
    .getOrCreate()

# Get current date
current_date = datetime.now()

# Create a dictionary to hold variable names
dynamic_vars = {}

# Generate variable names for the past 24 months
for i in range(24):
    date = current_date - timedelta(days=i*30)
    year_month = date.strftime('%Y%m')
    dynamic_vars[f'Month_{year_month}'] = None

# Sample DataFrame
data = {
    'date': [datetime.now() - timedelta(days=i*15) for i in range(50)],
    'value': [i for i in range(50)]
}

df = pd.DataFrame(data)
spark_df = spark.createDataFrame(df)

# Function to create CSV files based on dynamic variables
def create_csv_files(spark_df, dynamic_vars):
    for var in dynamic_vars.keys():
        year_month = var.split('_')[1]
        year = int(year_month[:4])
        month = int(year_month[4:])
        
        filtered_df = spark_df.filter((F.year("date") == year) & (F.month("date") == month))
        
        if filtered_df.count() > 0:
            filtered_df.write.csv(f"/path/to/output/{var}.csv")

create_csv_files(spark_df, dynamic_vars)

# Create arrays from dynamic dictionary
dynamic_arrays = [f'Month_{date.strftime("%Y%m")}' for date in [current_date - timedelta(days=i*30) for i in range(24)]]

# Function to create custom Excel files
def create_excel_files(dynamic_vars):
    for var in dynamic_vars.keys():
        year_month = var.split('_')[1]
        year = year_month[:4]
        month = year_month[4:]

        # Create a DataFrame with custom columns
        data = {
            f'xyz_{year}_{month}': [i for i in range(10)],
            f'abc_xyz_{year}_{month}{month}': [i for i in range(10)]
        }
        
        df = pd.DataFrame(data)
        
        # Create an Excel file
        file_name = f'/path/to/output/{var}.xlsx'
        writer = pd.ExcelWriter(file_name, engine='openpyxl')
        
        df.to_excel(writer, sheet_name='Sheet1', index=False)
        
        writer.save()

create_excel_files(dynamic_vars)

This script demonstrates how to generate dynamic variable names, filter data to create CSV files, and create custom Excel files with dynamically generated columns. You can customize the paths and logic as needed for your specific ETL job in PySpark.

Written by HintsToday Team

Related Posts

Project Alert: Automation in Pyspark

Here is a detailed approach for dividing a monthly PySpark script into multiple code steps. Each step will be saved in the code column of a control DataFrame and executed sequentially. The script will include error handling and pre-checks to ensure source tables are...

What is PySpark DataFrame API? How it relates to Pyspark SQL

In PySpark, you can perform operations on DataFrames using two main APIs: the DataFrame API and the Spark SQL API. Both are powerful and can be used interchangeably to some extent. Here's a breakdown of key concepts and functionalities: 1. Creating DataFrames: you can...

Get the latest news

Subscribe to our Newsletter

0 Comments

Submit a Comment

Your email address will not be published. Required fields are marked *