1.Exploratory Data Analysis (EDA) with Pandas in Banking – Converted in Pyspark

While searching for A free Pandas Project on Google Found this link –Exploratory Data Analysis (EDA) with Pandas in Banking . I have tried to convert this Pyscript in Pyspark one.

First, let’s handle the initial steps of downloading and extracting the data:

In a PySpark context, we’ll assume the data is already available locally after the above steps.

Importing Libraries

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

%matplotlib inline
plt.rcParams["figure.figsize"] = (8, 6)

import warnings
warnings.filterwarnings('ignore')

Equivalent imports in PySpark:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import matplotlib.pyplot as plt
import numpy as np

# Initialize Spark session
spark = SparkSession.builder.appName("EDA_PySpark_Banking").getOrCreate()

Loading Data

Pandas:

df = pd.read_csv('bank-additional/bank-additional-full.csv', sep=';')
df.head(5)

PySpark:

df = spark.read.csv('bank-additional/bank-additional-full.csv', sep=';', header=True, inferSchema=True)
df.show(5)

Initial Exploration

Columns

Pandas:

df.columns

PySpark:

df.columns

Info

Pandas:

print(df.info())

PySpark:

df.printSchema()

Describe

Pandas:

df.describe()
df.describe(include=["object"])

PySpark:

df.describe().show()
df.select([countDistinct(c).alias(c) for c in df.columns]).show()

Value Counts

Pandas:

df["y"].value_counts()
df["marital"].value_counts(normalize=True)

PySpark:

df.groupBy("y").count().show()
df.groupBy("marital").count().withColumn("normalize", col("count") / df.count()).show()

Sorting

Pandas:

df.sort_values(by="duration", ascending=False).head()
df.sort_values(by=["age", "duration"], ascending=[True, False]).head()

PySpark:

df.orderBy(col("duration").desc()).show(5)
df.orderBy(col("age").asc(), col("duration").desc()).show(5)

Applying Functions

Pandas:

df.apply(np.max)
d = {"no": 0, "yes": 1}
df["y"] = df["y"].map(d)

PySpark:

# PySpark does not have an apply method; use select with max for each column
df.select([max(c).alias(c) for c in df.columns]).show()

# Mapping values
df = df.withColumn("y", when(col("y") == "yes", 1).otherwise(0))

Further Analysis

Pandas:

print("Share of attracted clients =", '{:.1%}'.format(df["y"].mean()))
df[df["y"] == 1].mean()
acd = round(df[df["y"] == 1]["duration"].mean(), 2)
acd_in_min = acd // 60
print("Average call duration for attracted clients =", acd_in_min, "min", int(acd) % 60, "sec")
print("Average age of attracted clients =", int(df[(df["y"] == 1) & (df["marital"] == "single")]["age"].mean()), "years")
df[-1:]

PySpark:

df.groupBy().agg(mean("y")).show()
df.filter(df["y"] == 1).agg(*[mean(c).alias(c) for c in df.columns]).show()
acd = df.filter(df["y"] == 1).agg(round(mean("duration"), 2)).collect()[0][0]
acd_in_min = acd // 60
print("Average call duration for attracted clients =", acd_in_min, "min", int(acd) % 60, "sec")
avg_age = df.filter((df["y"] == 1) & (df["marital"] == "single")).agg(mean("age")).collect()[0][0]
print("Average age of attracted clients =", int(avg_age), "years")
df.orderBy(desc("age")).limit(1).show()

Crosstab and Pivot Table

Pandas:

pd.crosstab(df["y"], df["marital"])
pd.crosstab(df["y"], df["marital"], normalize='index')
df.pivot_table(["age", "duration"], ["job"], aggfunc="mean").head(10)

PySpark:

df.groupBy("y", "marital").count().show()
df.groupBy("y", "marital").count().withColumn("normalize", col("count") / sum("count").over(Window.partitionBy("y"))).show()
df.groupBy("job").agg(mean("age"), mean("duration")).show()

Plots

Pandas:

pd.plotting.scatter_matrix(df[["age", "duration", "campaign"]], figsize=(15, 15), diagonal="kde")
plt.show()

df["age"].hist()
df.hist(color="k", bins=30, figsize=(15, 10))
plt.show()

df.boxplot(column="age", by="marital")
plt.show()

df.boxplot(column="age", by=["marital", "housing"], figsize=(20, 20))
plt.show()

PySpark:

# PySpark does not support direct plotting; use matplotlib for plotting
# Convert to Pandas for plotting
pandas_df = df.select("age", "duration", "campaign").toPandas()
pd.plotting.scatter_matrix(pandas_df, figsize=(15, 15), diagonal="kde")
plt.show()

df.select("age").toPandas().hist()
df.toPandas().hist(color="k", bins=30, figsize=(15, 10))
plt.show()

df.select("age", "marital").toPandas().boxplot(by="marital")
plt.show()

df.select("age", "marital", "housing").toPandas().boxplot(by=["marital", "housing"], figsize=(20, 20))
plt.show()

Combining Code

Here is the combined code:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

# Initialize Spark session
spark = SparkSession.builder.appName("EDA_PySpark_Banking").getOrCreate()

# Load the data
df = spark.read.csv('bank-additional/bank-additional-full.csv', sep=';', header=True, inferSchema=True)

# Initial exploration
df.printSchema()
df.describe().show()

df.groupBy("y").count().show()
df.groupBy("marital").count().withColumn("normalize", col("count") / df.count()).show()

df.orderBy(col("duration").desc()).show(5)
df.orderBy(col("age").asc(), col("duration").desc()).show(5)

df.select([max(c).alias(c) for c in df.columns]).show()
df = df.withColumn("y", when(col("y") == "yes", 1).otherwise(0))

df.groupBy().agg(mean("y")).show()
df.filter(df["y"] == 1).agg(*[mean(c).alias(c) for c in df.columns]).show()

acd = df.filter(df["y"] == 1).agg(round(mean("duration"), 2)).collect()[0][0]
acd_in_min = acd // 60
print("Average call duration for attracted clients =", acd_in_min, "min", int(acd) % 60, "sec")

avg_age = df.filter((df["y"] == 1) & (df["marital"] == "single")).agg(mean("age")).collect()[0][0]
print("Average age of attracted clients =", int(avg_age), "years")
df.orderBy(desc("age")).limit(1).show()

df.groupBy("y", "marital").count().show()
df.groupBy("y", "marital").count().withColumn("normalize", col("count") / sum("count").over(Window.partitionBy("y"))).show()
df.groupBy("job").agg(mean("age"), mean("duration")).show()

# Plots
pandas_df = df.select("age", "duration", "campaign").toPandas()
pd.plotting.scatter_matrix(pandas_df, figsize=(15, 15), diagonal="kde")
plt.show()

df.select("age").toPandas().hist()
df.toPandas().hist(color="k", bins=30, figsize=(15, 10))
plt.show()

df.select("age", "marital").toPandas().boxplot(by="marital")
plt.show()

df.select("age", "marital", "housing").toPandas().boxplot(by=["marital", "housing"], figsize=(20, 20))
plt.show()


2.Dynamic list of variables Creation for ETL Jobs

Let us create One or Multiple dynamic lists of variables and save it in dictionary or Array or other datastructure for further repeating use in Pyspark projects specially for ETL jobs. Variable names are in form of dynamic names for example Month_202401 to Month_202312 for 24 months ( Take these 24 month backdated or as per current month’s Progression). Later we will use this dictionary to create 24 csv files by filtering on year and month. we will also use to create arrays based on this dynamic dictionary. We will create custom excel files for 24 year months combination where column names are also in form of above 24 { year and months combination such as xyz_2404_04 to xyz_2404_12 , abc_xyz_2404_0405 to abc_xyz_2405_1201}. This requirement is specifically for Pyspark based ETL projects.

Creating dynamic lists of variables and saving them in a dictionary or array for further use in PySpark ETL jobs is an excellent idea to streamline and automate the process. Here’s a step-by-step guide to achieve this in a PySpark-based project.

Step 1: Generate Dynamic Variable Names

First, we need to generate dynamic variable names for the past 24 months from the current month. We’ll store these variable names in a dictionary for easy access.

from datetime import datetime, timedelta
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
import calendar

# Initialize SparkSession
spark = SparkSession.builder 
    .appName("DynamicVariablesETL") 
    .getOrCreate()

# Get current date
current_date = datetime.now()

# Create a dictionary to hold variable names
dynamic_vars = {}

# Generate variable names for the past 24 months
for i in range(24):
    date = current_date - timedelta(days=i*30)
    year_month = date.strftime('%Y%m')
    dynamic_vars[f'Month_{year_month}'] = None

# Print the generated variable names
print(dynamic_vars)

Step 2: Create CSV Files by Filtering on Year and Month

Next, we’ll use the dynamic variable names to filter data and create CSV files. For simplicity, let’s assume we have a sample DataFrame to filter.

# Sample DataFrame
data = {
    'date': [datetime.now() - timedelta(days=i*15) for i in range(50)],
    'value': [i for i in range(50)]
}

df = pd.DataFrame(data)
spark_df = spark.createDataFrame(df)

# Function to create CSV files based on dynamic variables
def create_csv_files(spark_df, dynamic_vars):
    for var in dynamic_vars.keys():
        year_month = var.split('_')[1]
        year = int(year_month[:4])
        month = int(year_month[4:])
        
        filtered_df = spark_df.filter((F.year("date") == year) & (F.month("date") == month))
        
        if filtered_df.count() > 0:
            filtered_df.write.csv(f"/path/to/output/{var}.csv")

create_csv_files(spark_df, dynamic_vars)

Step 3: Create Arrays Based on Dynamic Dictionary

We can also create arrays based on the dynamic dictionary.

# Create arrays from dynamic dictionary
dynamic_arrays = [f'Month_{date.strftime("%Y%m")}' for date in [current_date - timedelta(days=i*30) for i in range(24)]]

print(dynamic_arrays)

Step 4: Create Custom Excel Files

Lastly, we can create custom Excel files with columns named based on the dynamic dictionary.

import openpyxl
from openpyxl.utils.dataframe import dataframe_to_rows

# Function to create custom Excel files
def create_excel_files(dynamic_vars):
    for var in dynamic_vars.keys():
        year_month = var.split('_')[1]
        year = year_month[:4]
        month = year_month[4:]

        # Create a DataFrame with custom columns
        data = {
            f'xyz_{year}_{month}': [i for i in range(10)],
            f'abc_xyz_{year}_{month}{month}': [i for i in range(10)]
        }
        
        df = pd.DataFrame(data)
        
        # Create an Excel file
        file_name = f'/path/to/output/{var}.xlsx'
        writer = pd.ExcelWriter(file_name, engine='openpyxl')
        
        df.to_excel(writer, sheet_name='Sheet1', index=False)
        
        writer.save()

create_excel_files(dynamic_vars)

Combining It All Together

Here is the complete script that combines all the steps above.

from datetime import datetime, timedelta
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
import calendar

# Initialize SparkSession
spark = SparkSession.builder 
    .appName("DynamicVariablesETL") 
    .getOrCreate()

# Get current date
current_date = datetime.now()

# Create a dictionary to hold variable names
dynamic_vars = {}

# Generate variable names for the past 24 months
for i in range(24):
    date = current_date - timedelta(days=i*30)
    year_month = date.strftime('%Y%m')
    dynamic_vars[f'Month_{year_month}'] = None

# Sample DataFrame
data = {
    'date': [datetime.now() - timedelta(days=i*15) for i in range(50)],
    'value': [i for i in range(50)]
}

df = pd.DataFrame(data)
spark_df = spark.createDataFrame(df)

# Function to create CSV files based on dynamic variables
def create_csv_files(spark_df, dynamic_vars):
    for var in dynamic_vars.keys():
        year_month = var.split('_')[1]
        year = int(year_month[:4])
        month = int(year_month[4:])
        
        filtered_df = spark_df.filter((F.year("date") == year) & (F.month("date") == month))
        
        if filtered_df.count() > 0:
            filtered_df.write.csv(f"/path/to/output/{var}.csv")

create_csv_files(spark_df, dynamic_vars)

# Create arrays from dynamic dictionary
dynamic_arrays = [f'Month_{date.strftime("%Y%m")}' for date in [current_date - timedelta(days=i*30) for i in range(24)]]

# Function to create custom Excel files
def create_excel_files(dynamic_vars):
    for var in dynamic_vars.keys():
        year_month = var.split('_')[1]
        year = year_month[:4]
        month = year_month[4:]

        # Create a DataFrame with custom columns
        data = {
            f'xyz_{year}_{month}': [i for i in range(10)],
            f'abc_xyz_{year}_{month}{month}': [i for i in range(10)]
        }
        
        df = pd.DataFrame(data)
        
        # Create an Excel file
        file_name = f'/path/to/output/{var}.xlsx'
        writer = pd.ExcelWriter(file_name, engine='openpyxl')
        
        df.to_excel(writer, sheet_name='Sheet1', index=False)
        
        writer.save()

create_excel_files(dynamic_vars)

This script demonstrates how to generate dynamic variable names, filter data to create CSV files, and create custom Excel files with dynamically generated columns. You can customize the paths and logic as needed for your specific ETL job in PySpark.


2.SAS project to Pyspark Migration- merging, joining, transposing , lead/rank functions, data validation with PROC FREQ, error handling, macro variables, and macros

Let us create a comprehensive SAS project that involves merging, joining, transposing large tables, applying PROC SQL lead/rank functions, performing data validation with PROC FREQ, and incorporating error handling, macro variables, and macros for various functional tasks.

Step 1: Set Up Macro Variables for Date Values

%let start_date = '01JAN2023'd;
%let end_date = '31DEC2023'd;

Step 2: Define Macros for Various Functional Tasks

%macro import_data(libname=, dataset=, filepath=);
    proc import datafile="&filepath" out=&libname..&dataset dbms=csv replace;
    getnames=yes;
    run;
%mend import_data;

%macro join_tables(libname=, table1=, table2=, out_table=, join_condition=);
    proc sql;
        create table &libname..&out_table as
        select a.*, b.*
        from &libname..&table1 as a
        left join &libname..&table2 as b
        on &join_condition;
    quit;
%mend join_tables;

%macro transpose_table(libname=, in_table=, out_table=, id_var=, var=);
    proc transpose data=&libname..&in_table out=&libname..&out_table;
    by &id_var;
    var &var;
    run;
%mend transpose_table;

%macro validate_data(libname=, table=, var=);
    proc freq data=&libname..&table;
    tables &var / missing;
    run;
%mend validate_data;

%macro apply_rank(libname=, table=, var=, out_table=);
    proc sql;
        create table &libname..&out_table as
        select *, rank() over (order by &var desc) as rank
        from &libname..&table;
    quit;
%mend apply_rank;

Step 3: Import Large Tables

%import_data(libname=work, dataset=table1, filepath='/path/to/table1.csv');
%import_data(libname=work, dataset=table2, filepath='/path/to/table2.csv');

Step 4: Merge and Join Tables

%join_tables(libname=work, table1=table1, table2=table2, out_table=merged_table, join_condition=a.id=b.id);

Step 5: Transpose the Data

%transpose_table(libname=work, in_table=merged_table, out_table=transposed_table, id_var=id, var=some_var);

Step 6: Apply PROC SQL Lead/Rank Functions

%apply_rank(libname=work, table=transposed_table, var=some_var, out_table=ranked_table);

Step 7: Validate Data with PROC FREQ

%validate_data(libname=work, table=ranked_table, var=some_var);

Step 8: Error Handling

Ensure error handling by checking the existence of datasets and proper execution of steps.

%macro check_dataset(libname=, dataset=);
    %if %sysfunc(exist(&libname..&dataset)) %then %do;
        %put NOTE: Dataset &libname..&dataset exists.;
    %end;
    %else %do;
        %put ERROR: Dataset &libname..&dataset does not exist.;
        %abort cancel;
    %end;
%mend check_dataset;

%check_dataset(libname=work, dataset=ranked_table);

Full Example in One Go

Putting it all together:

%let start_date = '01JAN2023'd;
%let end_date = '31DEC2023'd;

%macro import_data(libname=, dataset=, filepath=);
    proc import datafile="&filepath" out=&libname..&dataset dbms=csv replace;
    getnames=yes;
    run;
%mend import_data;

%macro join_tables(libname=, table1=, table2=, out_table=, join_condition=);
    proc sql;
        create table &libname..&out_table as
        select a.*, b.*
        from &libname..&table1 as a
        left join &libname..&table2 as b
        on &join_condition;
    quit;
%mend join_tables;

%macro transpose_table(libname=, in_table=, out_table=, id_var=, var=);
    proc transpose data=&libname..&in_table out=&libname..&out_table;
    by &id_var;
    var &var;
    run;
%mend transpose_table;

%macro validate_data(libname=, table=, var=);
    proc freq data=&libname..&table;
    tables &var / missing;
    run;
%mend validate_data;

%macro apply_rank(libname=, table=, var=, out_table=);
    proc sql;
        create table &libname..&out_table as
        select *, rank() over (order by &var desc) as rank
        from &libname..&table;
    quit;
%mend apply_rank;

%macro check_dataset(libname=, dataset=);
    %if %sysfunc(exist(&libname..&dataset)) %then %do;
        %put NOTE: Dataset &libname..&dataset exists.;
    %end;
    %else %do;
        %put ERROR: Dataset &libname..&dataset does not exist.;
        %abort cancel;
    %end;
%mend check_dataset;

/* Import large tables */
%import_data(libname=work, dataset=table1, filepath='/path/to/table1.csv');
%import_data(libname=work, dataset=table2, filepath='/path/to/table2.csv');

/* Merge and join tables */
%join_tables(libname=work, table1=table1, table2=table2, out_table=merged_table, join_condition=a.id=b.id);

/* Transpose the data */
%transpose_table(libname=work, in_table=merged_table, out_table=transposed_table, id_var=id, var=some_var);

/* Apply PROC SQL lead/rank functions */
%apply_rank(libname=work, table=transposed_table, var=some_var, out_table=ranked_table);

/* Validate data with PROC FREQ */
%validate_data(libname=work, table=ranked_table, var=some_var);

/* Check the existence of the final dataset */
%check_dataset(libname=work, dataset=ranked_table);

Explanation:

  1. Macro Variables: Defined for date values.
  2. Macros for Functional Tasks: Defined for importing data, joining tables, transposing tables, validating data, and applying ranking functions.
  3. Data Import: Two large tables are imported using PROC IMPORT.
  4. Data Merge and Join: Tables are merged and joined using PROC SQL.
  5. Data Transposition: Transposing the merged table.
  6. Rank Application: Applying the rank function.
  7. Data Validation: Validating data using PROC FREQ.
  8. Error Handling: Ensuring datasets exist and handling errors accordingly.

This setup provides a robust framework for handling complex data processing tasks in SAS, incorporating various best practices and functionalities.

Method1

Here is the equivalent PySpark code for the provided SAS script:

from pyspark.sql import SparkSession
from pyspark.sql.functions import rank, col

# Create a SparkSession object
spark = SparkSession.builder.appName("My App").getOrCreate()

# Define the start and end dates
start_date = "2023-01-01"
end_date = "2023-12-31"

# Import the data
table1 = spark.read.csv("/path/to/table1.csv", header=True, inferSchema=True)
table2 = spark.read.csv("/path/to/table2.csv", header=True, inferSchema=True)

# Register the DataFrames as temporary views
table1.createOrReplaceTempView("table1")
table2.createOrReplaceTempView("table2")

# Join the tables
merged_table = spark.sql("SELECT a.*, b.* FROM table1 a LEFT JOIN table2 b ON a.id = b.id")
merged_table.createOrReplaceTempView("merged_table")

# Transpose the data
transposed_table = merged_table.groupBy("id").pivot("some_var").sum()
transposed_table.createOrReplaceTempView("transposed_table")

# Apply the rank function
ranked_table = transposed_table.withColumn("rank", rank().over(Window.orderBy(col("some_var").desc())))
ranked_table.createOrReplaceTempView("ranked_table")

# Validate the data
ranked_table.select("some_var").summary().show()

# Check the existence of the final dataset
if ranked_table.count() > 0:
    print("Dataset exists.")
else:
    print("Dataset does not exist.")

Here’s a step-by-step explanation of the PySpark code:

  1. Create a SparkSession object, which is the entry point to programming Spark with the Dataset and DataFrame API.
  2. Define the start and end dates as strings.
  3. Import the data from CSV files using the read.csv method, which returns a DataFrame.
  4. Register the DataFrames as temporary views using the createOrReplaceTempView method, so they can be queried using SQL.
  5. Join the tables using a SQL query, which is executed using the sql method.
  6. Transpose the data using the groupBy and pivot methods, which are equivalent to the SAS proc transpose step.
  7. Apply the rank function using the withColumn and rank methods, which are equivalent to the SAS proc sql step with the rank function.
  8. Validate the data by computing summary statistics using the summary method, which is equivalent to the SAS proc freq step.
  9. Check the existence of the final dataset by checking the count of rows using the count method. If the count is greater than 0, the dataset exists; otherwise, it does not exist.

Note that the PySpark code uses the DataFrame API, which is a more concise and expressive way of working with data compared to the SAS script. Additionally, the PySpark code uses SQL queries to perform the join and rank operations, which can be more efficient than using the SAS proc sql step.

Method 2

Here’s the modified PySpark code that translates SAS macros to functions and adds error handling:

Python

from pyspark.sql import SparkSession
from pyspark.sql.functions import rank, col

# Create a SparkSession object
spark = SparkSession.builder.appName("My App").getOrCreate()

# Define a function for importing data
def import_data(libname, dataset, filepath):
    try:
        df = spark.read.csv(filepath, header=True, inferSchema=True)
        df.createOrReplaceTempView(f"{libname}_{dataset}")
        return df
    except Exception as e:
        print(f"Error importing data: {e}")
        return None

# Define a function for joining tables
def join_tables(libname, table1, table2, out_table, join_condition):
    try:
        spark.sql(f"""
            CREATE TABLE {libname}_{out_table} AS
            SELECT a.*, b.*
            FROM {libname}_{table1} a
            LEFT JOIN {libname}_{table2} b
            ON {join_condition}
        """)
        return spark.table(f"{libname}_{out_table}")
    except Exception as e:
        print(f"Error joining tables: {e}")
        return None

# Define a function for transposing data
def transpose_table(libname, in_table, out_table, id_var, var):
    try:
        spark.sql(f"""
            CREATE TABLE {libname}_{out_table} AS
            SELECT {id_var}, {var}
            FROM {libname}_{in_table}
            GROUP BY {id_var}
            PIVOT ({var})
        """)
        return spark.table(f"{libname}_{out_table}")
    except Exception as e:
        print(f"Error transposing data: {e}")
        return None

# Define a function for applying the rank function
def apply_rank(libname, table, var, out_table):
    try:
        spark.sql(f"""
            CREATE TABLE {libname}_{out_table} AS
            SELECT *, RANK() OVER (ORDER BY {var} DESC) AS rank
            FROM {libname}_{table}
        """)
        return spark.table(f"{libname}_{out_table}")
    except Exception as e:
        print(f"Error applying rank function: {e}")
        return None

# Define a function for validating data
def validate_data(libname, table, var):
    try:
        spark.sql(f"""
            SELECT {var}, COUNT(*) AS count
            FROM {libname}_{table}
            GROUP BY {var}
        """).show()
    except Exception as e:
        print(f"Error validating data: {e}")

# Define a function for checking the existence of a dataset
def check_dataset(libname, dataset):
    try:
        if spark.catalog.tableExists(f"{libname}_{dataset}"):
            print("Dataset exists.")
        else:
            print("Dataset does not exist.")
    except Exception as e:
        print(f"Error checking dataset existence: {e}")

# Call the functions
table1 = import_data("work", "table1", "/path/to/table1.csv")
table2 = import_data("work", "table2", "/path/to/table2.csv")

merged_table = join_tables("work", "table1", "table2", "merged_table", "a.id = b.id")

transposed_table = transpose_table("work", "merged_table", "transposed_table", "id", "some_var")

ranked_table = apply_rank("work", "transposed_table", "some_var", "ranked_table")

validate_data("work", "ranked_table", "some_var")

check_dataset("work", "ranked_table")

In this modified code, I’ve defined separate functions for each of the SAS macros, and added error handling using try-except blocks. If an error occurs during the execution of a function, the error message is printed and the function returns None.

Method 3

Here’s the modified PySpark code that uses the sql method for the transpose and rank operations, and defines classes, objects, and functions to mimic the SAS macros:

Python

from pyspark.sql import SparkSession
from pyspark.sql.functions import rank, col

# Create a SparkSession object
spark = SparkSession.builder.appName("My App").getOrCreate()

# Define the start and end dates
start_date = "2023-01-01"
end_date = "2023-12-31"

# Define a class for importing data
class ImportData:
    def __init__(self, libname, dataset, filepath):
        self.libname = libname
        self.dataset = dataset
        self.filepath = filepath

    def execute(self):
        df = spark.read.csv(self.filepath, header=True, inferSchema=True)
        df.createOrReplaceTempView(f"{self.libname}_{self.dataset}")
        return df

# Define a class for joining tables
class JoinTables:
    def __init__(self, libname, table1, table2, out_table, join_condition):
        self.libname = libname
        self.table1 = table1
        self.table2 = table2
        self.out_table = out_table
        self.join_condition = join_condition

    def execute(self):
        spark.sql(f"""
            CREATE TABLE {self.libname}_{self.out_table} AS
            SELECT a.*, b.*
            FROM {self.libname}_{self.table1} a
            LEFT JOIN {self.libname}_{self.table2} b
            ON {self.join_condition}
        """)
        return spark.table(f"{self.libname}_{self.out_table}")

# Define a class for transposing data
class TransposeTable:
    def __init__(self, libname, in_table, out_table, id_var, var):
        self.libname = libname
        self.in_table = in_table
        self.out_table = out_table
        self.id_var = id_var
        self.var = var

    def execute(self):
        spark.sql(f"""
            CREATE TABLE {self.libname}_{self.out_table} AS
            SELECT {self.id_var}, {self.var}
            FROM {self.libname}_{self.in_table}
            GROUP BY {self.id_var}
            PIVOT ({self.var})
        """)
        return spark.table(f"{self.libname}_{self.out_table}")

# Define a class for applying the rank function
class ApplyRank:
    def __init__(self, libname, table, var, out_table):
        self.libname = libname
        self.table = table
        self.var = var
        self.out_table = out_table

    def execute(self):
        spark.sql(f"""
            CREATE TABLE {self.libname}_{self.out_table} AS
            SELECT *, RANK() OVER (ORDER BY {self.var} DESC) AS rank
            FROM {self.libname}_{self.table}
        """)
        return spark.table(f"{self.libname}_{self.out_table}")

# Define a class for validating data
class ValidateData:
    def __init__(self, libname, table, var):
        self.libname = libname
        self.table = table
        self.var = var

    def execute(self):
        spark.sql(f"""
            SELECT {self.var}, COUNT(*) AS count
            FROM {self.libname}_{self.table}
            GROUP BY {self.var}
        """).show()

# Define a class for checking the existence of a dataset
class CheckDataset:
    def __init__(self, libname, dataset):
        self.libname = libname
        self.dataset = dataset

    def execute(self):
        if spark.catalog.tableExists(f"{self.libname}_{self.dataset}"):
            print("Dataset exists.")
        else:
            print("Dataset does not exist.")

# Create instances of the classes and execute the methods
import_data = ImportData("work", "table1", "/path/to/table1.csv")
table1 = import_data.execute()

import_data = ImportData("work", "table2", "/path/to/table2.csv")
table2 = import_data.execute()

join_tables = JoinTables("work", "table1", "table2", "merged_table", "a.id = b.id")
merged_table = join_tables.execute()

transpose_table = TransposeTable("work", "merged_table", "transposed_table", "id", "some_var")
transposed_table = transpose_table.execute()

apply_rank = ApplyRank("work", "transposed_table", "some_var", "ranked_table")
ranked_table = apply_rank.execute()

validate_data = ValidateData("work", "ranked_table", "some_var")
validate_data.execute()

check_dataset = CheckDataset("work", "ranked_table")
check_dataset.execute()

In this modified code, we define classes for each of the SAS macros, and create instances of these classes to execute

step by step explanation is still left

Here is a step-by-step explanation of the modified PySpark code:

Step 1: Create a SparkSession object

spark = SparkSession.builder.appName("My App").getOrCreate()

This line creates a SparkSession object, which is the entry point to programming Spark with the Dataset and DataFrame API.

Step 2: Define the start and end dates

start_date = "2023-01-01"
end_date = "2023-12-31"

These lines define the start and end dates as strings.

Step 3: Define a class for importing data

class ImportData:
    def __init__(self, libname, dataset, filepath):
        self.libname = libname
        self.dataset = dataset
        self.filepath = filepath

    def execute(self):
        df = spark.read.csv(self.filepath, header=True, inferSchema=True)
        df.createOrReplaceTempView(f"{self.libname}_{self.dataset}")
        return df

This class defines a method for importing data from a CSV file. The execute method reads the CSV file using the spark.read.csv method, creates a temporary view using the createOrReplaceTempView method, and returns the DataFrame.

Step 4: Define a class for joining tables

class JoinTables:
    def __init__(self, libname, table1, table2, out_table, join_condition):
        self.libname = libname
        self.table1 = table1
        self.table2 = table2
        self.out_table = out_table
        self.join_condition = join_condition

    def execute(self):
        spark.sql(f"""
            CREATE TABLE {self.libname}_{self.out_table} AS
            SELECT a.*, b.*
            FROM {self.libname}_{self.table1} a
            LEFT JOIN {self.libname}_{self.table2} b
            ON {self.join_condition}
        """)
        return spark.table(f"{self.libname}_{self.out_table}")

This class defines a method for joining two tables. The execute method uses the spark.sql method to execute a SQL query that joins the two tables, creates a new table with the joined data, and returns the resulting DataFrame.

Step 5: Define a class for transposing data

class TransposeTable:
    def __init__(self, libname, in_table, out_table, id_var, var):
        self.libname = libname
        self.in_table = in_table
        self.out_table = out_table
        self.id_var = id_var
        self.var = var

    def execute(self):
        spark.sql(f"""
            CREATE TABLE {self.libname}_{self.out_table} AS
            SELECT {self.id_var}, {self.var}
            FROM {self.libname}_{self.in_table}
            GROUP BY {self.id_var}
            PIVOT ({self.var})""")
        return spark.table(f"{self.libname}_{self.out_table}")

This class defines a method for transposing data. The execute method uses the spark.sql method to execute a SQL query that transposes the data, creates a new table with the transposed data, and returns the resulting DataFrame.

Step 6: Define a class for applying the rank function

class ApplyRank:
    def __init__(self, libname, table, var, out_table):
        self.libname = libname
        self.table = table
        self.var = var
        self.out_table = out_table

    def execute(self):
        spark.sql(f"""
            CREATE TABLE {self.libname}_{self.out_table} AS
            SELECT *, RANK() OVER (ORDER BY {self.var} DESC) AS rank
            FROM {self.libname}_{self.table}
        """)
        return spark.table(f"{self.libname}_{self.out_table}")

This class defines a method for applying the rank function. The execute method uses the spark.sql method to execute a SQL query that applies the rank function, creates a new table with the ranked data, and returns the resulting DataFrame.

Step 7: Define a class for validating data

class ValidateData:
    def __init__(self, libname, table, var):
        self.libname = libname
        self.table = table
        self.var = var

    def execute(self):
        spark.sql(f"""
            SELECT {self.var}, COUNT(*) AS count
            FROM {self.libname}_{self.table}
            GROUP BY {self.var}
        """).show()

This class defines a method for validating data. The execute method uses the spark.sql method to execute a SQL query that groups the data by the specified variable and counts the number of rows in each group, and displays the results.


for dividing a monthly PySpark script into multiple code steps. Each step will be saved in the code column of a control DataFrame and executed sequentially. The script will include error handling and pre-checks to ensure source tables are present or Oracle tables have data before starting each step. The control DataFrame and status DataFrame for each month will be appended to a Hive or Oracle table to maintain historical data.

Here is a detailed approach for dividing a monthly PySpark script into multiple code steps. Each step will be saved in the code column of a control DataFrame and executed sequentially. The script will include error handling and pre-checks to ensure source tables are present or Oracle tables have data before starting each step. The control DataFrame and status DataFrame for each month will be appended to a Hive or Oracle table to maintain historical data.

Step 1: Prepare the Control DataFrame with Code Steps

Sample Monthly PySpark Script

Below is a sample PySpark script that reads data from Hive tables, performs transformations, and writes the output to Hive tables.

Sample PySpark Script

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from datetime import datetime

spark = SparkSession.builder 
    .appName("Monthly PySpark Script") 
    .enableHiveSupport() 
    .getOrCreate()

# Define the current month in MMyyyy format
current_month = datetime.now().strftime("%m%Y")

# Sample code steps to be executed sequentially
code_steps = [
    # Step 1: Check if source Hive table exists and has data
    f"""
    source_table = "source_db.source_table"
    if not spark.catalog.tableExists(source_table):
        raise ValueError("Source table does not exist.")
    df = spark.table(source_table)
    if df.count() == 0:
        raise ValueError("Source table is empty.")
    """,

    # Step 2: Read data from an Oracle table (Assuming JDBC connection properties are set)
    f"""
    oracle_url = "jdbc:oracle:thin:@hostname:port:service_name"
    oracle_properties = {{
        "user": "username",
        "password": "password"
    }}
    oracle_df = spark.read.jdbc(oracle_url, "oracle_schema.oracle_table",   properties=oracle_properties)
    if oracle_df.count() == 0:
        raise ValueError("Oracle table is empty.")
    print("Data read from Oracle table:", oracle_df.count(), "rows")
    """,

    # Step 3: Perform a transformation
    f"""
    transformed_df = df.withColumn("new_col", col("existing_col") * 2)
    print("Transformation applied:", transformed_df.count(), "rows")
    """,

    # Step 4: Join with Oracle data
    f"""
    final_df = transformed_df.join(oracle_df, "join_key")
    print("Joined data:", final_df.count(), "rows")
    """,

    # Step 5: Write data to the target Hive table
    f"""
    target_table = "target_db.target_table"
    final_df.write.mode("overwrite").saveAsTable(target_table)
    print("Data written to target table")
    """
]

# Create a Control DataFrame
from pyspark.sql import Row
control_data = [Row(serial_no=i+1, code=step, month_period=current_month) for i, step in enumerate(code_steps)]
control_df = spark.createDataFrame(control_data)
control_df.show(truncate=False)

Step 2: Create a Status Table

Create a status table to store the execution status of each code step. This table can be created in Hive for simplicity.

# Create a status DataFrame
status_schema = "serial_no INT, status STRING, message STRING, month_period STRING, row_count INT"
spark.sql(f"CREATE TABLE IF NOT EXISTS control_db.status_table ({status_schema})")

Step 3: Execute Each Code Step with Error Handling and Status Tracking

Execute each code step sequentially, log the status, any error messages, and the row count of the output tables.

# Function to log status
def log_status(serial_no, status, message, row_count=0):
spark.sql(f"""
INSERT INTO control_db.status_table
VALUES ({serial_no}, '{status}', '{message}', '{current_month}', {row_count})
""")

# Execute each code snippet
code_snippets = control_df.select("serial_no", "code").collect()

for row in code_snippets:
serial_no = row["serial_no"]
code_snippet = row["code"]

try:
print(f"Executing step {serial_no}...")
exec(code_snippet)

# Fetch the row count of the output table if this step writes to a table
if "write.mode" in code_snippet:
output_table = code_snippet.split('"')[1]
row_count = spark.table(output_table).count()
else:
row_count = 0

log_status(serial_no, "SUCCESS", "Executed successfully", row_count)
except Exception as e:
error_message = str(e).replace("'", "")
log_status(serial_no, "FAILED", error_message)
print(f"Error executing step {serial_no}: {error_message}")
break

# Example to display the status table
spark.sql("SELECT * FROM control_db.status_table").show()

Full Example Script

Here’s the complete script:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from datetime import datetime
from pyspark.sql import Row

# Initialize Spark session
spark = SparkSession.builder
.appName("Execute Code Snippets from Control DataFrame")
.enableHiveSupport()
.getOrCreate()

# Define the current month in MMyyyy format
current_month = datetime.now().strftime("%m%Y")

# Define code snippets
code_steps = [
# Step 1: Check if source Hive table exists and has data
f"""
source_table = "source_db.source_table"
if not spark.catalog.tableExists(source_table):
raise ValueError("Source table does not exist.")
df = spark.table(source_table)
if df.count() == 0:
raise ValueError("Source table is empty.")
""",

# Step 2: Read data from an Oracle table
f"""
oracle_url = "jdbc:oracle:thin:@hostname:port:service_name"
oracle_properties = {{
"user": "username",
"password": "password"
}}
oracle_df = spark.read.jdbc(oracle_url, "oracle_schema.oracle_table", properties=oracle_properties)
if oracle_df.count() == 0:
raise ValueError("Oracle table is empty.")
print("Data read from Oracle table:", oracle_df.count(), "rows")
""",

# Step 3: Perform a transformation
f"""
transformed_df = df.withColumn("new_col", col("existing_col") * 2)
print("Transformation applied:", transformed_df.count(), "rows")
""",

# Step 4: Join with Oracle data
f"""
final_df = transformed_df.join(oracle_df, "join_key")
print("Joined data:", final_df.count(), "rows")
""",

# Step 5: Write data to the target Hive table
f"""
target_table = "target_db.target_table"
final_df.write.mode("overwrite").saveAsTable(target_table)
print("Data written to target table")
"""
]

# Create a Control DataFrame
control_data = [Row(serial_no=i+1, code=step, month_period=current_month) for i, step in enumerate(code_steps)]
control_df = spark.createDataFrame(control_data)
control_df.show(truncate=False)

# Create a status table
status_schema = "serial_no INT, status STRING, message STRING, month_period STRING, row_count INT"
spark.sql(f"CREATE TABLE IF NOT EXISTS control_db.status_table ({status_schema})")

# Function to log status
def log_status(serial_no, status, message, row_count=0):
spark.sql(f"""
INSERT INTO control_db.status_table
VALUES ({serial_no}, '{status}', '{message}', '{current_month}', {row_count})
""")

# Execute each code snippet
code_snippets = control_df.select("serial_no", "code").collect()

for row in code_snippets:
serial_no = row["serial_no"]
code_snippet = row["code"]

try:
print(f"Executing step {serial_no}...")
exec(code_snippet)

# Fetch the row count of the output table if this step writes to a table
if "write.mode" in code_snippet:
output_table = code_snippet.split('"')[1]
row_count = spark.table(output_table).count()
else:
row_count = 0

log_status(serial_no, "SUCCESS", "Executed successfully", row_count)
except Exception as e:
error_message = str(e).replace("'", "")
log_status(serial_no, "FAILED", error_message)
print(f"Error executing step {serial_no}: {error_message}")
break

# Example to display the status table
spark.sql("SELECT * FROM control_db.status_table").show()

To execute dynamic code snippets stored in a DataFrame (or any other source) in PySpark, you can use the exec function. This can be particularly useful when you have a control table that stores PySpark code snippets to be executed sequentially. Below is a complete example of how to achieve this, including error handling, logging, and updating status.

Example: Dynamic Code Execution in PySpark

1. Initialize Spark Session

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from datetime import datetime

# Initialize Spark session
spark = SparkSession.builder
.appName("PySpark Dynamic Code Execution Example")
.enableHiveSupport()
.getOrCreate()

2. Create Sample Control DataFrame

# Sample control DataFrame
control_data = [
(1, "df1 = spark.range(0, 10)"),
(2, "df2 = df1.withColumn('square', F.col('id') ** 2)"),
(3, "df2.show()")
]
control_columns = ["serial_no", "code_snippet"]

control_df = spark.createDataFrame(control_data, control_columns)
control_df.show(truncate=False)

3. Create Log and Status DataFrames

# Define schema for log and status tables
log_schema = "timestamp STRING, message STRING, level STRING"
status_schema = "step INT, status STRING, row_count INT, timestamp STRING"

# Initialize empty log and status DataFrames
log_df = spark.createDataFrame([], schema=log_schema)
status_df = spark.createDataFrame([], schema=status_schema)

4. Define Functions for Error Handling and Logging

# Function to log messages
def log(message, level="INFO"):
global log_df
log_entry = [(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), message, level)]
log_df = log_df.union(spark.createDataFrame(log_entry, schema=log_schema))

# Function to update status
def update_status(step, status, row_count=0):
global status_df
status_entry = [(step, status, row_count, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))]
status_df = status_df.union(spark.createDataFrame(status_entry, schema=status_schema))

5. Execute Code Snippets Sequentially

# Function to execute code snippets
def execute_code(control_df):
for row in control_df.collect():
step = row["serial_no"]
code = row["code_snippet"]
try:
# Execute the code snippet
exec(code, globals())
log(f"Step {step} executed successfully", level="INFO")
# Optionally, you can get the row count of a DataFrame if the code produces one
row_count = eval(code.split('=')[0].strip()).count() if '=' in code else 0
update_status(step, "SUCCESS", row_count)
except Exception as e:
log(f"Step {step} failed: {e}", level="ERROR")
update_status(step, "FAILED")
break

# Execute the control DataFrame
execute_code(control_df)

# Show log and status tables
log_df.show(truncate=False)
status_df.show(truncate=False)

Explanation

  1. Initialization: A Spark session is initialized, and a sample control DataFrame is created with code snippets.
  2. Log and Status DataFrames: Empty DataFrames for logs and status are initialized with appropriate schemas.
  3. Logging Functions: Functions for logging messages and updating the status are defined.
  4. Execute Code Snippets: The execute_code function iterates over the control DataFrame, executing each code snippet using exec. It logs the execution status and updates the status table.
  5. Execution and Display: The control DataFrame is executed, and the log and status tables are displayed.

Scheduling the Script

To schedule the script to run automatically, you can use scheduling tools like Apache Airflow, cron jobs, or any other job scheduler that fits your environment. Here’s an example of how you might schedule it using a simple cron job in Linux:

  1. Create a Python script (e.g., pyspark_script.py) with the above code.
  2. Add a cron job to execute the script at the desired interval.
bashCopy code# Open the cron table for editing
crontab -e

# Add a line to run the script at the desired schedule (e.g., every day at midnight)
0 0 * * * /path/to/spark/bin/spark-submit /path/to/pyspark_script.py

This setup will run your PySpark script automatically according to the schedule specified in the cron job.


Building a ETL Data pipeline in Pyspark and using Pandas and Matplotlib for Further Processing

Project Alert:- Building a ETL Data pipeline in Pyspark and using Pandas and Matplotlib for Further Processing. For Deployment we will consider using Bitbucket and Genkins.

We will build a Data pipeline from BDL Reading Hive Tables in Pyspark and executing Pyspark scrip for Complex Transformations on Data via Py spark Sql and Dataframe ApI. Some of our sources will be from Oracle Tables and CSV files stored in Server specific Location. We will read these and join with our processed BDL data to add more data information( Variables). Our Target tables are BDL Hive tables in paraquet format saved in another schema. We will also transform Big spark dataframes to consolidated data to be consumed as Pandas Dataframes which will be used for analysis using Pandas and Visualisation with Matplotlib. These data can be saved as CSV, Excel or Oracle Tables.

tep 1: Environment Setup

  1. Spark Session Initialization: Set up Spark with Hive support.
  2. Dependencies: Ensure you have the necessary libraries installed.

Step 2: Data Extraction

  1. Hive Tables: Read from Hive.
  2. Oracle Tables: Use JDBC to read from Oracle.
  3. CSV Files: Read from local/remote storage.

Step 3: Data Transformation

  1. PySpark SQL and DataFrame API: Perform transformations.
  2. Joining Data: Combine data from different sources.

Step 4: Data Loading

  1. Write to Hive: Save results in Parquet format.
  2. Convert to Pandas: For further analysis.
  3. Save as CSV/Excel/Oracle: Export data for other uses.

Step 5: Data Analysis and Visualization

  1. Pandas for Analysis: Use Pandas DataFrame.
  2. Matplotlib for Visualization: Create charts and graphs.

Step 6: Deployment with Bitbucket and Jenkins

  1. Version Control with Bitbucket: Push code to Bitbucket.
  2. CI/CD with Jenkins: Automate deployment.

Example Code

Spark Session Initialization

from pyspark.sql import SparkSession

spark = SparkSession.builder
.appName("ETL Pipeline")
.enableHiveSupport()
.getOrCreate()

Data Extraction

# Reading from Hive
hive_df = spark.sql("SELECT * FROM bdl_database.bdl_table")

# Reading from Oracle
oracle_df = spark.read.format("jdbc").options(
url="jdbc:oracle:thin:@//your_oracle_host:1521/your_oracle_db",
driver="oracle.jdbc.driver.OracleDriver",
dbtable="your_oracle_table",
user="your_user",
password="your_password"
).load()

# Reading from CSV
csv_df = spark.read.csv("path/to/your/csvfile.csv", header=True, inferSchema=True)

Data Transformation

# Example Transformation
hive_df_filtered = hive_df.filter(hive_df["column_name"] > 0)
oracle_df_selected = oracle_df.select("col1", "col2")
csv_df_transformed = csv_df.withColumn("new_col", csv_df["col3"] * 2)

# Joining Data
joined_df = hive_df_filtered.join(oracle_df_selected, "id", "inner")
.join(csv_df_transformed, "id", "inner")

Data Loading

#Save to Hive
joined_df.write.mode("overwrite").parquet("path/to/hive_table")

# Convert to Pandas for further analysis
pandas_df = joined_df.toPandas()

# Save as CSV and Excel
pandas_df.to_csv("path/to/save/yourdata.csv", index=False)
pandas_df.to_excel("path/to/save/yourdata.xlsx", index=False)

Data Analysis and Visualization

import matplotlib.pyplot as plt
import pandas as pd

# Pandas DataFrame Analysis
print(pandas_df.describe())

# Visualization
plt.figure(figsize=(10, 6))
pandas_df["your_column"].hist()
plt.title("Histogram of Your Column")
plt.xlabel("Values")
plt.ylabel("Frequency")
plt.show()

Deployment with Bitbucket and Jenkins

  1. Push Code to Bitbucket:
    • Initialize a Git repository.
    • Add your code and commit.
    • Push to your Bitbucket repository.
  2. Set Up Jenkins for CI/CD:
    • Install Jenkins and necessary plugins.
    • Create a new Jenkins job.
    • Configure the job to pull from your Bitbucket repository.
    • Add build steps to run your PySpark script.

Jenkins Pipeline Script Example

pipeline {
agent any

stages {
stage('Clone Repository') {
steps {
git 'https://bitbucket.org/your_repo.git'
}
}
stage('Build and Test') {
steps {
sh 'spark-submit --master local[2] your_script.py'
}
}
stage('Deploy') {
steps {
// Add deployment steps here
}
}
}
}


Simplified Version of Complete Pipeline:-

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd
import matplotlib.pyplot as plt

# Initialize Spark Session
spark = SparkSession.builder
.appName("DataPipeline")
.enableHiveSupport()
.getOrCreate()

# Read Data from Hive
hive_df = spark.sql("SELECT * FROM bdl_database.bdl_table")

# Read Data from Oracle
oracle_df = spark.read.format("jdbc").options(
url="jdbc:oracle:thin:@//your_oracle_host:1521/your_oracle_db",
driver="oracle.jdbc.driver.OracleDriver",
dbtable="your_oracle_table",
user="your_user",
password="your_password"
).load()

# Read CSV Files
csv_df = spark.read.csv("path/to/your/csvfile.csv", header=True, inferSchema=True)

# Perform Transformations
# Example transformation: filter and select specific columns
hive_df_filtered = hive_df.filter(hive_df["column_name"] > 0)
oracle_df_selected = oracle_df.select("col1", "col2")
csv_df_transformed = csv_df.withColumn("new_col", csv_df["col3"] * 2)

# Join DataFrames
joined_df = hive_df_filtered.join(oracle_df_selected, hive_df_filtered["id"] == oracle_df_selected["id"], "inner")
final_df = joined_df.join(csv_df_transformed, joined_df["id"] == csv_df_transformed["id"], "inner")

# Save to Hive in Parquet format
final_df.write.mode("overwrite").parquet("path/to/hive_table")

# Convert to Pandas for further analysis
pandas_df = final_df.toPandas()

# Save Pandas DataFrame as CSV and Excel
pandas_df.to_csv("path/to/save/yourdata.csv", index=False)
pandas_df.to_excel("path/to/save/yourdata.xlsx", index=False)

# Visualize Data using Matplotlib
plt.figure(figsize=(10, 6))
pandas_df["your_column"].hist()
plt.title("Histogram of Your Column")
plt.xlabel("Values")
plt.ylabel("Frequency")
plt.show()

# Further Analysis with Pandas
print(pandas_df.describe())

# Close Spark Session
spark.stop()


Discover more from AI HintsToday

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

Trending

Discover more from AI HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading