from pyspark.sql import SparkSession
from pyspark.sql import Row
# Initialize Spark Session (SparkSession internally manages SparkContext)
spark = SparkSession.builder.appName("RDD_Based_Processing").getOrCreate()
# Access SparkContext from SparkSession
sc = spark.sparkContext # Still available if needed
# Explanation:
# - RDDs require SparkContext (sc.parallelize()) because they are a low-level API.
# - DataFrames use SparkSession (spark.createDataFrame()), which manages SparkContext internally.
# Step 1: Creating an RDD from a list of tuples
data = [(1, "Alice", 25, "Engineer"), (2, "Bob", 30, "Doctor"), (3, "Charlie", 35, "Teacher")]
rdd = sc.parallelize(data) # Using SparkContext explicitly
# Step 2: Transforming RDD (Mapping to Rows)
row_rdd = rdd.map(lambda x: Row(ID=x[0], Name=x[1], Age=x[2], Occupation=x[3]))
# Step 3: Converting RDD to DataFrame
df = spark.createDataFrame(row_rdd) # Using SparkSession (No need for sc)
df.show()
# Step 4: Filtering Data
filtered_rdd = rdd.filter(lambda x: x[2] > 28) # Keep people older than 28
# Step 5: Applying Transformation (Mapping)
mapped_rdd = filtered_rdd.map(lambda x: (x[0], x[1].upper(), x[2] + 5, x[3]))
# Step 6: Reducing Data (Counting occupations)
occu_rdd = rdd.map(lambda x: (x[3], 1)).reduceByKey(lambda x, y: x + y)
# Step 7: Sorting Data
sorted_rdd = mapped_rdd.sortBy(lambda x: x[2], ascending=False)
# Step 8: Collecting and Printing Data
print("Filtered Data:", filtered_rdd.collect())
print("Mapped Data:", mapped_rdd.collect())
print("Occupation Count:", occu_rdd.collect())
print("Sorted Data:", sorted_rdd.collect())
# Stopping Spark Session
spark.stop()
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import col, trim, lit
# Initialize Spark Session
spark = SparkSession.builder.appName("DataFrame_Manipulation").getOrCreate()
# Sample Data
data = [(1, "Alice", 25, "Engineer"), (2, "Bob", 30, "Doctor"), (3, "Charlie", 35, "Teacher")]
df = spark.createDataFrame(data, ["ID", "Name", "Age", "Occupation"])
# 1. Column Manipulation
# Rename a Single Column
df = df.withColumnRenamed("Occupation", "Job")
# Rename Multiple Columns
column_rename_map = {"ID": "UserID", "Name": "FullName"}
for old, new in column_rename_map.items():
df = df.withColumnRenamed(old, new)
# Add a Suffix to All Columns
df = df.toDF(*[col + "_v1" for col in df.columns])
# Check Data Types
df.printSchema()
# Apply Dynamic Logic to All Columns: Trim All String Columns
df = df.select([trim(col(c)).alias(c) if dtype == "string" else col(c) for c, dtype in df.dtypes])
# Convert All Integer Columns to Double
df = df.select([col(c).cast("double") if dtype == "int" else col(c) for c, dtype in df.dtypes])
# 2. Row-Based Manipulation
# Collect Rows One by One
for row in df.collect():
print(row)
# Efficient Row Iteration
for row in df.toLocalIterator():
print(row)
# Filtering Rows
df_filtered = df.filter((df["Age_v1"] > 28.0) & (df["FullName_v1"] != "Bob"))
df_filtered.show()
# Sorting Rows
df_sorted = df.orderBy("Age_v1", ascending=False)
df_sorted.show()
# Adding a New Row
df_new = df.union(spark.createDataFrame([Row(UserID_v1=4.0, FullName_v1="David", Age_v1=40.0, Job_v1="Scientist")], df.schema))
df_new.show()
# Finding Duplicate Rows
df.groupBy(df.columns).count().filter("count > 1").show()
# Removing Duplicate Rows
df = df.dropDuplicates()
df.show()
# Adding a New Column Dynamically
df = df.withColumn("NewColumn_v1", lit("DefaultValue"))
df.show()
# Stop Spark Session
spark.stop()
Where to Use Python Traditional Coding in PySpark Scripts
Using traditional Python coding in a PySpark script is common and beneficial for handling tasks that are not inherently distributed or do not involve large-scale data processing. Integrating Python with a PySpark script in a modular way ensures that different responsibilities are clearly separated and the system remains maintainable and scalable. Below, I’ll clarify how and when to use traditional Python coding in a PySpark script and provide a modular example of an ETL project.
Where to Use Python Traditional Coding in PySpark Scripts
- Configuration and Setup
- Reading configuration files (YAML, JSON, or environment variables).
- Setting up logging mechanisms.
- Creating control tables for tracking execution.
- Initializing parameters for the Spark session.
- Metadata Management
- Maintaining control and job status tables (e.g., tracking step statuses and counts).
- Querying metadata repositories.
- Error Handling and Notifications
- Sending emails or alerts in case of failures.
- Logging errors into a database or file.
- Pre/Post-Spark Tasks
- Pre-processing lightweight data or metadata (e.g., generating file paths, validating inputs).
- Post-processing tasks like moving processed files, updating external systems, or cleaning up resources.
- Small Operations
- Lookup table creation in-memory using Python dictionaries for small datasets.
- Performing lightweight, non-parallel computations that don’t benefit from Spark.
Example Modular ETL Project
Project Overview
A PySpark ETL pipeline for processing daily transaction data:
- Extracts data from Oracle, Hive, and CSV sources.
- Transforms the data (joins, aggregations, flag creations).
- Loads the results into Hive and sends summary reports via email.
Module Structure
1. Configuration Module (config.py
)
Handles configuration settings, such as database credentials, Spark configurations, logging settings, etc.
import yaml
def load_config(config_path):
with open(config_path, 'r') as file:
return yaml.safe_load(file)
2. Logging Module (logging_util.py
)
Manages logging setup for tracking the pipeline.
import logging
def setup_logger(log_path):
logger = logging.getLogger("ETL Pipeline")
logger.setLevel(logging.INFO)
handler = logging.FileHandler(log_path)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
3. Notification Module (notification_util.py
)
Sends email notifications for success or failure.
import smtplib
from email.mime.text import MIMEText
def send_email(subject, body, recipients):
sender = "etl-notify@example.com"
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = sender
msg['To'] = ', '.join(recipients)
with smtplib.SMTP('smtp.example.com') as server:
server.sendmail(sender, recipients, msg.as_string())
4. Control Table Module (control_table.py
)
Manages control table updates and job status.
import pandas as pd
from pyspark.sql import SparkSession
def update_control_table(spark: SparkSession, table_name, step, status, counts):
data = [{"step": step, "status": status, "counts": counts}]
df = spark.createDataFrame(data)
df.write.mode("append").saveAsTable(table_name)
5. ETL Steps (steps.py
)
Implements each ETL step.
from pyspark.sql import SparkSession
def step1_extract_data(spark: SparkSession):
return spark.read.format("csv").option("header", "true").load("s3://path-to-source-data/")
def step2_transform_data(df):
return df.groupBy("category").agg({"amount": "sum"}).withColumnRenamed("sum(amount)", "total_amount")
def step3_load_data(spark: SparkSession, df, table_name):
df.write.mode("overwrite").saveAsTable(table_name)
6. Main Script (main.py
)
Coordinates the entire pipeline.
from pyspark.sql import SparkSession
import config
import logging_util
import notification_util
import control_table
import steps
if __name__ == "__main__":
# Load configurations
config_path = "config.yaml"
conf = config.load_config(config_path)
# Setup logging
logger = logging_util.setup_logger(conf["log_path"])
# Initialize Spark Session
spark = SparkSession.builder.appName("ETL Pipeline").getOrCreate()
try:
# Extract
logger.info("Starting Step 1: Extract")
data = steps.step1_extract_data(spark)
control_table.update_control_table(spark, conf["control_table"], "step1", "success", data.count())
# Transform
logger.info("Starting Step 2: Transform")
transformed_data = steps.step2_transform_data(data)
control_table.update_control_table(spark, conf["control_table"], "step2", "success", transformed_data.count())
# Load
logger.info("Starting Step 3: Load")
steps.step3_load_data(spark, transformed_data, conf["target_table"])
control_table.update_control_table(spark, conf["control_table"], "step3", "success", transformed_data.count())
# Notify success
notification_util.send_email("ETL Pipeline Success", "Pipeline completed successfully.", conf["recipients"])
except Exception as e:
logger.error(f"Pipeline failed: {str(e)}")
notification_util.send_email("ETL Pipeline Failure", str(e), conf["recipients"])
finally:
spark.stop()
Integration Points
- Before Spark Session Initialization
- Use Python for lightweight tasks like loading configurations, setting up logging, and reading control tables.
- Within Spark Job
- Use PySpark for distributed data processing tasks.
- Use Python modules (e.g.,
logging_util
,control_table
) for logging, metadata updates, and notifications.
- After Spark Job
- Use Python for cleanup tasks (e.g., moving files, sending reports, archiving logs).
This modular structure separates concerns, allowing you to easily update or replace components without disrupting the entire pipeline.
Leave a Reply