Project Alert:- Building a ETL Data pipeline in Pyspark and using Pandas and Matplotlib for Further Processing

by | Jun 30, 2024 | Pyspark | 0 comments

Project Alert:- Building a ETL Data pipeline in Pyspark and using Pandas and Matplotlib for Further Processing. For Deployment we will consider using Bitbucket and Genkins.

We will build a Data pipeline from BDL Reading Hive Tables in Pyspark and executing Pyspark scrip for Complex Transformations on Data via Py spark Sql and Dataframe ApI. Some of our sources will be from Oracle Tables and CSV files stored in Server specific Location. We will read these and join with our processed BDL data to add more data information( Variables). Our Target tables are BDL Hive tables in paraquet format saved in another schema. We will also transform Big spark dataframes to consolidated data to be consumed as Pandas Dataframes which will be used for analysis using Pandas and Visualisation with Matplotlib. These data can be saved as CSV, Excel or Oracle Tables.

tep 1: Environment Setup

  1. Spark Session Initialization: Set up Spark with Hive support.
  2. Dependencies: Ensure you have the necessary libraries installed.

Step 2: Data Extraction

  1. Hive Tables: Read from Hive.
  2. Oracle Tables: Use JDBC to read from Oracle.
  3. CSV Files: Read from local/remote storage.

Step 3: Data Transformation

  1. PySpark SQL and DataFrame API: Perform transformations.
  2. Joining Data: Combine data from different sources.

Step 4: Data Loading

  1. Write to Hive: Save results in Parquet format.
  2. Convert to Pandas: For further analysis.
  3. Save as CSV/Excel/Oracle: Export data for other uses.

Step 5: Data Analysis and Visualization

  1. Pandas for Analysis: Use Pandas DataFrame.
  2. Matplotlib for Visualization: Create charts and graphs.

Step 6: Deployment with Bitbucket and Jenkins

  1. Version Control with Bitbucket: Push code to Bitbucket.
  2. CI/CD with Jenkins: Automate deployment.

Example Code

Spark Session Initialization

from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("ETL Pipeline") \
.enableHiveSupport() \
.getOrCreate()

Data Extraction

# Reading from Hive
hive_df = spark.sql("SELECT * FROM bdl_database.bdl_table")

# Reading from Oracle
oracle_df = spark.read.format("jdbc").options(
url="jdbc:oracle:thin:@//your_oracle_host:1521/your_oracle_db",
driver="oracle.jdbc.driver.OracleDriver",
dbtable="your_oracle_table",
user="your_user",
password="your_password"
).load()

# Reading from CSV
csv_df = spark.read.csv("path/to/your/csvfile.csv", header=True, inferSchema=True)

Data Transformation

# Example Transformation
hive_df_filtered = hive_df.filter(hive_df["column_name"] > 0)
oracle_df_selected = oracle_df.select("col1", "col2")
csv_df_transformed = csv_df.withColumn("new_col", csv_df["col3"] * 2)

# Joining Data
joined_df = hive_df_filtered.join(oracle_df_selected, "id", "inner") \
.join(csv_df_transformed, "id", "inner")

Data Loading

#Save to Hive
joined_df.write.mode("overwrite").parquet("path/to/hive_table")

# Convert to Pandas for further analysis
pandas_df = joined_df.toPandas()

# Save as CSV and Excel
pandas_df.to_csv("path/to/save/yourdata.csv", index=False)
pandas_df.to_excel("path/to/save/yourdata.xlsx", index=False)

Data Analysis and Visualization

import matplotlib.pyplot as plt
import pandas as pd

# Pandas DataFrame Analysis
print(pandas_df.describe())

# Visualization
plt.figure(figsize=(10, 6))
pandas_df["your_column"].hist()
plt.title("Histogram of Your Column")
plt.xlabel("Values")
plt.ylabel("Frequency")
plt.show()

Deployment with Bitbucket and Jenkins

  1. Push Code to Bitbucket:
    • Initialize a Git repository.
    • Add your code and commit.
    • Push to your Bitbucket repository.
  2. Set Up Jenkins for CI/CD:
    • Install Jenkins and necessary plugins.
    • Create a new Jenkins job.
    • Configure the job to pull from your Bitbucket repository.
    • Add build steps to run your PySpark script.

Jenkins Pipeline Script Example

pipeline {
agent any

stages {
stage('Clone Repository') {
steps {
git 'https://bitbucket.org/your_repo.git'
}
}
stage('Build and Test') {
steps {
sh 'spark-submit --master local[2] your_script.py'
}
}
stage('Deploy') {
steps {
// Add deployment steps here
}
}
}
}


Simplified Version of Complete Pipeline:-

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd
import matplotlib.pyplot as plt

# Initialize Spark Session
spark = SparkSession.builder \
.appName("DataPipeline") \
.enableHiveSupport() \
.getOrCreate()

# Read Data from Hive
hive_df = spark.sql("SELECT * FROM bdl_database.bdl_table")

# Read Data from Oracle
oracle_df = spark.read.format("jdbc").options(
url="jdbc:oracle:thin:@//your_oracle_host:1521/your_oracle_db",
driver="oracle.jdbc.driver.OracleDriver",
dbtable="your_oracle_table",
user="your_user",
password="your_password"
).load()

# Read CSV Files
csv_df = spark.read.csv("path/to/your/csvfile.csv", header=True, inferSchema=True)

# Perform Transformations
# Example transformation: filter and select specific columns
hive_df_filtered = hive_df.filter(hive_df["column_name"] > 0)
oracle_df_selected = oracle_df.select("col1", "col2")
csv_df_transformed = csv_df.withColumn("new_col", csv_df["col3"] * 2)

# Join DataFrames
joined_df = hive_df_filtered.join(oracle_df_selected, hive_df_filtered["id"] == oracle_df_selected["id"], "inner")
final_df = joined_df.join(csv_df_transformed, joined_df["id"] == csv_df_transformed["id"], "inner")

# Save to Hive in Parquet format
final_df.write.mode("overwrite").parquet("path/to/hive_table")

# Convert to Pandas for further analysis
pandas_df = final_df.toPandas()

# Save Pandas DataFrame as CSV and Excel
pandas_df.to_csv("path/to/save/yourdata.csv", index=False)
pandas_df.to_excel("path/to/save/yourdata.xlsx", index=False)

# Visualize Data using Matplotlib
plt.figure(figsize=(10, 6))
pandas_df["your_column"].hist()
plt.title("Histogram of Your Column")
plt.xlabel("Values")
plt.ylabel("Frequency")
plt.show()

# Further Analysis with Pandas
print(pandas_df.describe())

# Close Spark Session
spark.stop()

Written by HintsToday Team

Related Posts

Project Alert: Automation in Pyspark

Here is a detailed approach for dividing a monthly PySpark script into multiple code steps. Each step will be saved in the code column of a control DataFrame and executed sequentially. The script will include error handling and pre-checks to ensure source tables are...

What is PySpark DataFrame API? How it relates to Pyspark SQL

In PySpark, you can perform operations on DataFrames using two main APIs: the DataFrame API and the Spark SQL API. Both are powerful and can be used interchangeably to some extent. Here's a breakdown of key concepts and functionalities: 1. Creating DataFrames: you can...

Get the latest news

Subscribe to our Newsletter

0 Comments

Submit a Comment

Your email address will not be published. Required fields are marked *