PySpark provides a powerful API for data manipulation, similar to pandas, but optimized for big data processing. Below is a comprehensive overview of DataFrame operations, functions, and syntax in PySpark with examples.

Creating DataFrames

Creating DataFrames from various sources is a common task in PySpark. Below are examples for creating DataFrames from CSV files, Excel files, Python List, Python Tuple, Python Dictionary, Pandas DataFrames, Hive tables, values, RDDs, Oracle databases, and HBase tables.

1. Creating DataFrames from CSV Files

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from CSV")
.getOrCreate()

# Read CSV file
df_csv = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
df_csv.show()

2. Creating DataFrames from Excel Files

To read Excel files, you need to install the spark-excel library.

# Add the following dependency when initializing SparkSession
# .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.5")

df_excel = spark.read
.format("com.crealytics.spark.excel")
.option("header", "true")
.option("inferSchema", "true")
.load("path/to/file.xlsx")
df_excel.show()

3. Creating DataFrames from Python List

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from List")
.getOrCreate()

# Create DataFrame from list
data = [("Alice", 30), ("Bob", 25), ("Cathy", 28)]
columns = ["name", "age"]
df_list = spark.createDataFrame(data, columns)
df_list.show()

4. Creating DataFrames from Python Tuple

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from Tuple")
.getOrCreate()

# Create DataFrame from tuple
data = (("Alice", 30), ("Bob", 25), ("Cathy", 28))
columns = ["name", "age"]
df_tuple = spark.createDataFrame(data, columns)
df_tuple.show()

5. Creating DataFrames from Python Dictionary

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder 
    .appName("Create DataFrames from Dictionary") 
    .getOrCreate()

# Create DataFrame from dictionary
data = {"name": ["Alice", "Bob", "Cathy"], "age": [30, 25, 28]}
df_dict = spark.createDataFrame(pd.DataFrame(data))
df_dict.show()

6. Creating DataFrames from Pandas DataFrame

import pandas as pd
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from Pandas")
.getOrCreate()

# Create a Pandas DataFrame
pdf = pd.DataFrame({"name": ["Alice", "Bob", "Cathy"], "age": [30, 25, 28]})

# Convert Pandas DataFrame to Spark DataFrame
df_pandas = spark.createDataFrame(pdf)
df_pandas.show()

7. Creating DataFrames from Hive Tables

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from Hive")
.enableHiveSupport()
.getOrCreate()

# Read Hive table
df_hive = spark.sql("SELECT * FROM database.table_name")
df_hive.show()

8. Creating DataFrames from Values

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from Values")
.getOrCreate()

# Create DataFrame from values
data = [("Alice", 30), ("Bob", 25), ("Cathy", 28)]
columns = ["name", "age"]
df_values = spark.createDataFrame(data, columns)
df_values.show()

9. Creating DataFrames from RDDs

from pyspark.sql import Row, SparkSession

# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from RDD")
.getOrCreate()

# Create an RDD
rdd = spark.sparkContext.parallelize([
Row(name="Alice", age=30),
Row(name="Bob", age=25),
Row(name="Cathy", age=28)
])

# Convert RDD to DataFrame
df_rdd = spark.createDataFrame(rdd)
df_rdd.show()

10. Creating DataFrames from Oracle Database

To read from Oracle, you need to have the JDBC driver for Oracle in your classpath.

# Add the following dependency when initializing SparkSession
# .config("spark.jars", "path/to/ojdbc8.jar")

jdbc_url = "jdbc:oracle:thin:@hostname:port:SID"
connection_properties = {
"user": "username",
"password": "password",
"driver": "oracle.jdbc.driver.OracleDriver"
}

df_oracle = spark.read.jdbc(jdbc_url, "schema.table_name", properties=connection_properties)
df_oracle.show()

11. Creating DataFrames from HBase Tables

To read from HBase, you need the hbase-spark connector.

# Add the following dependency when initializing SparkSession
# .config("spark.jars.packages", "org.apache.hbase.connectors.spark:hbase-spark:1.0.0")

from pyspark.sql import SparkSession
from pyspark.sql import DataFrame

# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from HBase")
.getOrCreate()

# Define HBase catalog
catalog = ''.join("""{
"table":{"namespace":"default", "name":"tablename"},
"rowkey":"key",
"columns":{
"col0":{"cf":"rowkey", "col":"key", "type":"string"},
"col1":{"cf":"cf1", "col":"col1", "type":"string"},
"col2":{"cf":"cf2", "col":"col2", "type":"string"}
}
}""".split())

df_hbase = spark.read
.options(catalog=catalog)
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()

df_hbase.show()

12. Creating DataFrames from JSON Files

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from JSON")
.getOrCreate()

# Read JSON file
df_json = spark.read.json("path/to/file.json")
df_json.show()

13. Creating DataFrames from Parquet Files

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from Parquet")
.getOrCreate()

# Read Parquet file
df_parquet = spark.read.parquet("path/to/file.parquet")
df_parquet.show()

Viewing Data in PySpark

1. show(): View the first few rows

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

data = [('Alice', 25), ('Bob', 30), ('Charlie', 35)]
columns = ['Name', 'Age']

sdf = spark.createDataFrame(data, columns)

# View the first 20 rows (default)
sdf.show()

# View the first n rows
sdf.show(10)

2. show() with truncate: Control column width

# Truncate long strings to 20 characters (default)
sdf.show(truncate=True)

# Do not truncate strings
sdf.show(truncate=False)

# Truncate strings to a specific length
sdf.show(truncate=5)

3. show() with vertical: Vertical display of rows

# Vertical display of rows
sdf.show(vertical=True)

4. printSchema(): Print the schema of the DataFrame

# Print the schema
sdf.printSchema()

5. describe(): Summary statistics

# Summary statistics of DataFrame
sdf.describe().show()

6. head(): Retrieve the first row or n rows

# Retrieve the first row
print(sdf.head())

# Retrieve the first n rows
print(sdf.head(5))

7. take(): Retrieve the first n rows

# Retrieve the first n rows as a list of Row objects
print(sdf.take(5))

8. collect(): Retrieve all rows

# Retrieve all rows as a list of Row objects
all_rows = sdf.collect()
for row in all_rows:
print(row)

  • PySpark:
    • show(): Flexible method for displaying rows with options like truncate and vertical.
    • printSchema(): For printing the schema.
    • describe(): For summary statistics.
    • head()take(): For retrieving specific number of rows.
    • collect(): For retrieving all rows.

PySpark Operations on DataFrames

PySpark DataFrame operations can be broadly classified into two categories: Transformations and Actions.

Transformations

Transformations are operations that are applied on a DataFrame to produce a new DataFrame. Transformations are lazy, meaning they are not executed immediately. Instead, they are recorded as a lineage of transformations to be applied when an action is called.

Here are some common transformations:

select()

  • Description: Selects a subset of columns.
  • Syntax: df.select("col1", "col2", ...)
  • Example: df.select("name", "age").show()

df_pyspark.select([‘Vars’,’Month_period’]).show()
df_pyspark.select(‘Vars’,’Month_period’).show()
from pyspark.sql.functions import col
df_pyspark.select(col(“Month_Period”)).show()
df_pyspark[“Month_Period”] are all these same

withColumn()

  • Description: Adds or replaces a column.
  • Syntax: df.withColumn("new_col", df["existing_col"] + 1)
  • Example:df.withColumn("new_age", df["age"] + 1).show()

use cases for the withColumn() method in PySpark:

1. Conditional Column Creation

from pyspark.sql.functions import when, col

df = df.withColumn("is_adult", when(col("age") >= 18, "Yes").otherwise("No"))

2. Column Transformation using UDFs

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def convert_to_uppercase(name):
    return name.upper()

udf_convert_to_uppercase = udf(convert_to_uppercase, StringType())

df = df.withColumn("uppercase_name", udf_convert_to_uppercase(df["name"]))

3. Column Calculation using Multiple Columns

from pyspark.sql.functions import col

df = df.withColumn("total_amount", col("quantity") * col("price"))

4. Renaming and Dropping Columns

df = df.withColumnRenamed("old_name", "new_name")
df = df.drop("column_to_drop")

5. Creating a Column with a List or Array

from pyspark.sql.functions import array, lit

df = df.withColumn("colors", array(lit("red"), lit("green"), lit("blue")))

6. Creating a Column with a Struct

from pyspark.sql.functions import struct, lit

df = df.withColumn("address", struct(lit("123 Main St"), lit("Anytown"), lit("USA")))

7. Using Window Functions

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

window = Window.orderBy("date")

df = df.withColumn("row_num", row_number().over(window))

8. Creating a Column with a JSON Object

from pyspark.sql.functions import to_json, struct

df = df.withColumn("json_data", to_json(struct(df["name"], df["age"])))

9. Using Aggregate Functions

from pyspark.sql.functions import sum, avg

df = df.groupBy("group").agg(sum("value").alias("sum_value"), avg("value").alias("avg_value"))
df = df.withColumn("sum_value_squared", df["sum_value"] ** 2)

10. Using Pattern Matching with when and `otherwise

from pyspark.sql.functions import when, col

df = df.withColumn("category",
                   when(col("value") < 10, "low").
                   when(col("value") < 20, "medium").
                   otherwise("high"))

drop()

  • Description: Drops specified columns.
  • Syntax: df.drop("col1", "col2")
  • Example:df.drop("age").show()

distinct()

  • Description: Returns distinct rows.
  • Syntax: df.distinct()
  • Example:df.distinct().show()

filter() or where()

  • Description: Filters rows based on a condition.
  • Syntax: df.filter(df["age"] > 30)
  • Example:df.filter(df[“age”] > 30).show()

 

groupBy()

  • Description: Groups rows by specified columns.
  • Syntax: df.groupBy("department").agg({"salary": "mean"})
  • Example: df.groupBy("department").mean("salary").show()

orderBy() or sort()

  • Description: Sorts rows by specified columns.
  • Syntax: df.orderBy(df["age"].desc())
  • Example:df.orderBy(df["age"].desc()).show()

join()

  • Description: Joins two DataFrames.
  • Syntax: df1.join(df2, df1["key"] == df2["key"], "inner")
  • Example:df1.join(df2, df1["id"] == df2["id"], "inner").show()

union()

  • Description: Unions two DataFrames.
  • Syntax: df1.union(df2)
  • Example:df1.union(df2).show()

sample()

  • Description: Returns a sampled subset of rows.
  • Syntax: df.sample(withReplacement=False, fraction=0.1)
  • Example:df.sample(withReplacement=False, fraction=0.1).show()

Actions

Actions trigger the execution of the transformations recorded in the DAG and return results.

Here are some common actions:

collect()

  • Description: Returns all rows as a list of Row objects.
  • Syntax: df.collect()
  • Example:rows = df.collect() for row in rows: print(row)

first()

  • Description: Returns the first row.
  • Syntax: df.first()
  • Example:print(df.first())

take()

  • Description: Returns the first n rows.
  • Syntax: df.take(n)
  • Example:print(df.take(5))

describe()

  • Description: Computes basic statistics.
  • Syntax: df.describe()
  • Example:df.describe().show()

cache()

  • Description: Caches the DataFrame in memory.
  • Syntax: df.cache()
  • Example:df.cache()

persist()

  • Description: Persists the DF with specified storage level.
  • Syntax: df.persist(storageLevel)
  • Example:from pyspark import StorageLevel df.persist(StorageLevel.MEMORY_AND_DISK)

write()

  • Description: Writes the DataFrame to a specified format.
  • Syntax: df.write.format("parquet").save("path")
  • Example:df.write.format(“parquet”).save(“path/to/parquet”)

foreach()

  • Description: Applies a function to each row.
  • Syntax: df.foreach(f)
  • Example:def print_row(row): print(row) df.foreach(print_row)

using the expr function from the pyspark.sql.functions module, which allows you to pass SQL expressions as strings.

Example Usage

1.how to use SQL expressions inside the filter or where functions?

Here is an example demonstrating how to use SQL expressions inside the filter or where functions:

from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

# Initialize Spark session
spark = SparkSession.builder
.appName("SQL Expression Example")
.getOrCreate()

# Sample DataFrame
data = [
(1, "Alice", 25, 2000),
(2, "Bob", 30, 3000),
(3, "Charlie", 35, 4000)
]

columns = ["id", "name", "age", "salary"]
df = spark.createDataFrame(data, columns)

# Using SQL expression in filter
filtered_df = df.filter(expr("age > 30"))

filtered_df.show()

Detailed Example

Let’s go through a more detailed example with multiple conditions and transformations using SQL expressions.

from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

# Initialize Spark session
spark = SparkSession.builder
.appName("SQL Expression Example")
.getOrCreate()

# Sample DataFrame
data = [
(1, "Alice", 25, 2000),
(2, "Bob", 30, 3000),
(3, "Charlie", 35, 4000),
(4, "David", 40, 5000)
]

columns = ["id", "name", "age", "salary"]
df = spark.createDataFrame(data, columns)

# Filter using SQL expression
filtered_df = df.filter(expr("age > 30 AND salary > 3000"))

# Select specific columns using SQL expression
selected_df = filtered_df.selectExpr("id", "name", "salary * 1.1 AS adjusted_salary")

# Adding a new column using SQL expression
with_new_column_df = selected_df.withColumn("salary_category", expr("CASE WHEN adjusted_salary > 4000 THEN 'High' ELSE 'Medium' END"))

with_new_column_df.show()

Output

+---+-------+---------------+---------------+
| id| name|adjusted_salary|salary_category|
+---+-------+---------------+---------------+
| 3|Charlie| 4400.0| High|
| 4| David| 5500.0| High|
+---+-------+---------------+---------------+

2. select function syntax variations and examples:

The select function in PySpark is used to select a subset of columns or expressions from a DataFrame. It can be used in several ways to achieve different results.

Basic Syntax

The basic syntax for the select function is:

DataFrame.select(*cols)
Here, *cols can be a list of column names, Column objects, or expressions.

Selecting Specific Columns

df.select('column1', 'column2').show()
This will select and display the 'column1' and 'column2' columns.

Selecting Columns with Expressions

from pyspark.sql.functions import col, expr
df.select(col('column1'), expr('column2 + 1').alias('column2_plus_one')).show()

This will select ‘column1’ and an expression ‘column2 + 1’ as ‘column2_plus_one’.

Using Column Objects

Example:

from pyspark.sql.functions import col
df.select(col('column1'), col('column2')).show()
This will select 'column1' and 'column2' using Column objects.

Renaming Columns

df.select(col('column1').alias('new_column1')).show()
This will select 'column1' and rename it to 'new_column1'.

Selecting All Columns

df.select('*').show()
This will select all columns in the DataFrame.

Using SQL Expressions

df.selectExpr('column1', 'column2 + 1 as column2_plus_one').show()

This allows you to use SQL expressions directly.

 

3.PySpark DataFrame groupBy Operation

The groupBy operation in PySpark is used to group data based on one or more columns. Once grouped, you can apply various aggregation functions like count, sum, avg, max, min, etc.

Syntax

DataFrame.groupBy(*cols)

Here, *cols can be one or more column names or expressions.

Common Aggregation Functions

  • count()
  • sum(column)
  • avg(column)
  • max(column)
  • min(column)
  • agg(*exprs)

Examples

Sample DataFrame
data = [
("Alice", "Sales", 3000),
("Bob", "Sales", 4000),
("Alice", "Sales", 4000),
("Catherine", "HR", 4000),
("David", "HR", 5000),
("Eve", "IT", 3000)
]
columns = ["Name", "Department", "Salary"]

df = spark.createDataFrame(data, schema=columns)
df.show()
1. Group By Single Column and Count
df.groupBy("Department").count().show()

Output:

+----------+-----+
|Department|count|
+----------+-----+
| HR| 2|
| IT| 1|
| Sales| 3|
+----------+-----+
2. Group By Multiple Columns and Count
df.groupBy("Department", "Name").count().show()

Output:

+----------+-----+-----+
|Department| Name|count|
+----------+-----+-----+
| Sales|Alice| 2|
| HR|David| 1|
| HR|Catherine| 1|
| IT| Eve| 1|
| Sales| Bob| 1|
+----------+-----+-----+
3. Group By with Sum Aggregation
df.groupBy("Department").sum("Salary").show()

Output:

+----------+-----------+
|Department|sum(Salary)|
+----------+-----------+
| HR| 9000 |
| IT| 3000 |
| Sales| 11000 |
+----------+-----------+
4. Group By with Multiple Aggregations
df.groupBy("Department").agg(
count("*").alias("Total_Count"),
sum("Salary").alias("Total_Salary"),
avg("Salary").alias("Average_Salary"),
max("Salary").alias("Max_Salary"),
min("Salary").alias("Min_Salary")
).show()

Output:

+----------+-----------+-----------+--------------+-----------+-----------+
|Department|Total_Count|Total_Salary|Average_Salary|Max_Salary|Min_Salary|
+----------+-----------+-----------+--------------+-----------+-----------+
| HR| 2| 9000| 4500.0| 5000| 4000|
| IT| 1| 3000| 3000.0| 3000| 3000|
| Sales| 3| 11000| 3666.67| 4000| 3000|
+----------+-----------+-----------+--------------+-----------+-----------+
5. Using agg Method with Expressions
from pyspark.sql.functions import expr

df.groupBy("Department").agg(
expr("count(*) as Total_Count"),
expr("sum(Salary) as Total_Salary"),
expr("avg(Salary) as Average_Salary"),
expr("max(Salary) as Max_Salary"),
expr("min(Salary) as Min_Salary")
).show()

Output is the same as above.

6. Group By with Having Clause (Filtering Groups)
df.groupBy("Department").agg(
sum("Salary").alias("Total_Salary")
).filter(col("Total_Salary") > 5000).show()

Output:

+----------+-----------+
|Department|Total_Salary|
+----------+-----------+
| HR| 9000|
| Sales| 11000|
+----------+-----------+

3.PySpark orderBy() and sort() Operations

In PySpark, both orderBy() and sort() are used to sort the rows of a DataFrame. They can be used interchangeably, as they provide the same functionality.

Syntax

DataFrame.orderBy(*cols, ascending=True)
DataFrame.sort(*cols, ascending=True)
  • cols: List of column names or expressions to sort by.
  • ascending: Boolean or list of booleans. If a single boolean is provided, it applies to all columns. If a list is provided, it specifies the sort order for each corresponding column.

 

data = [
("Alice", 34, "HR", 3000),
("Bob", 45, "IT", 4000),
("Catherine", 29, "HR", 5000),
("David", 36, "IT", 2500),
("Eve", 28, "Sales", 2800)
]
columns = ["Name", "Age", "Department", "Salary"]

df = spark.createDataFrame(data, schema=columns)
df.show()
1. Order By Single Column Ascending
df.orderBy("Age").show()

Equivalent:

df.sort("Age").show()

Output:

+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
| Eve| 28| Sales| 2800|
|Catherine| 29| HR| 5000|
| Alice| 34| HR| 3000|
| David| 36| IT| 2500|
| Bob| 45| IT| 4000|
+---------+---+----------+------+
2. Order By Single Column Descending
df.orderBy(col("Age").desc()).show()
df.sort(col("Age").desc()).show()

Output:

+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
| Bob| 45| IT| 4000|
| David| 36| IT| 2500|
| Alice| 34| HR| 3000|
|Catherine| 29| HR| 5000|
| Eve| 28| Sales| 2800|
+---------+---+----------+------+
3. Order By Multiple Columns
4. Order By Multiple Columns with Different Sort Orders
df.orderBy(["Department", "Age"], ascending=[True, False]).show()
df.sort(["Department", "Age"], ascending=[True, False]).show()

Output:

+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
|Catherine| 29| HR| 5000|
| Alice| 34| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 2500|
| Eve| 28| Sales| 2800|
+---------+---+----------+------+

Useful Examples

Sorting by Salary in Descending Order
df.orderBy(col("Salary").desc()).show()

Output:

+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
|Catherine| 29| HR| 5000|
| Bob| 45| IT| 4000|
| Alice| 34| HR| 3000|
| Eve| 28| Sales| 2800|
| David| 36| IT| 2500|
+---------+---+----------+------+
Sorting by Department and then by Salary within each Department
df.orderBy("Department", col("Salary").desc()).show()

Output:

+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
|Catherine| 29| HR| 5000|
| Alice| 34| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 2500|
| Eve| 28| Sales| 2800|
+---------+---+----------+------+
Sorting with Expression
df.orderBy(expr("Salary + Age").desc()).show()

Output:

+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
| Bob| 45| IT| 4000|
|Catherine| 29| HR| 5000|
| Alice| 34| HR| 3000|
| David| 36| IT| 2500|
| Eve| 28| Sales| 2800|
+---------+---+----------+------+

4.PySpark Join Operations

In PySpark, the join operation is used to combine two DataFrames based on a condition. The join operation can be performed in several ways, such as inner join, left join, right join, outer join, semi join, and anti join.

Syntax

DataFrame.join(other, on=None, how=None)
  • other: The DataFrame to join with.
  • on: The column(s) to join on. This can be a string representing a single column name, a list of column names, or a condition.
  • how: The type of join to perform. Default is inner.

Join Types

  • inner: Inner join.
  • left or left_outer: Left outer join.
  • right or right_outer: Right outer join.
  • outer or full or full_outer: Full outer join.
  • left_semi: Left semi join.
  • left_anti: Left anti join.

Examples

data1 = [
("Alice", 34, "HR"),
("Bob", 45, "IT"),
("Catherine", 29, "HR"),
("David", 36, "IT")
]
columns1 = ["Name", "Age", "Department"]

data2 = [
("HR", 3000),
("IT", 4000),
("Sales", 2800)
]
columns2 = ["Department", "Salary"]

df1 = spark.createDataFrame(data1, schema=columns1)
df2 = spark.createDataFrame(data2, schema=columns2)

df1.show()
df2.show()
1. Inner Join
inner_join_df = df1.join(df2, on="Department", how="inner")
inner_join_df.show()

Output:

+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
| Alice| 34| HR| 3000|
|Catherine| 29| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 4000|
+---------+---+----------+------+
2. Left Outer Join
left_join_df = df1.join(df2, on="Department", how="left")
left_join_df.show()

Output:

+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
| Alice| 34| HR| 3000|
|Catherine| 29| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 4000|
+---------+---+----------+------+
3. Right Outer Join
right_join_df = df1.join(df2, on="Department", how="right")
right_join_df.show()

Output:

+---------+----+----------+------+
| Name| Age|Department|Salary|
+---------+----+----------+------+
| Alice| 34| HR| 3000|
|Catherine| 29| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 4000|
| null|null| Sales| 2800|
+---------+----+----------+------+
4. Full Outer Join
outer_join_df = df1.join(df2, on="Department", how="outer")
outer_join_df.show()

Output:

+---------+----+----------+------+
| Name| Age|Department|Salary|
+---------+----+----------+------+
| Alice| 34| HR| 3000|
|Catherine| 29| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 4000|
| null|null| Sales| 2800|
+---------+----+----------+------+
5. Left Semi Join
semi_join_df = df1.join(df2, on="Department", how="left_semi")
semi_join_df.show()

Output:

+---------+---+----------+
| Name|Age|Department|
+---------+---+----------+
| Alice| 34| HR|
|Catherine| 29| HR|
| Bob| 45| IT|
| David| 36| IT|
+---------+---+----------+
6. Left Anti Join
anti_join_df = df1.join(df2, on="Department", how="left_anti")
anti_join_df.show()

Output:

+----+---+----------+
|Name|Age|Department|
+----+---+----------+
+----+---+----------+
7. Join on Multiple Columns
data3 = [
("Alice", 34, "HR", 3000),
("Bob", 45, "IT", 4000),
("Catherine", 29, "HR", 5000),
("David", 36, "IT", 2500),
("Eve", 28, "Sales", 2800)
]
columns3 = ["Name", "Age", "Department", "Salary"]

data4 = [
("HR", 34, 3000, "Manager"),
("IT", 45, 4000, "Developer"),
("Sales", 28, 2800, "Salesman")
]
columns4 = ["Department", "Age", "Salary", "Role"]

df3 = spark.createDataFrame(data3, schema=columns3)
df4 = spark.createDataFrame(data4, schema=columns4)

multi_col_join_df = df3.join(df4, on=["Department", "Age", "Salary"], how="inner")
multi_col_join_df.show()

Output:

+-----+---+----------+------+--------+
| Name|Age|Department|Salary| Role|
+-----+---+----------+------+--------+
|Alice| 34| HR| 3000| Manager|
| Bob| 45| IT| 4000|Developer|
+-----+---+----------+------+--------+



Example Project: Comprehensive ETL Pipeline in PySpark

Let’s create a comprehensive example project that demonstrates these operations.

  1. Setup Spark Session
  2. Load Data from CSV, Hive, Oracle, and create DataFrames
  3. Apply Transformations
  4. Perform Actions
  5. Persist DataFrames
  6. Convert to Pandas for further analysis and visualization

1. Setup Spark Session

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder
.appName("ETL Pipeline Example")
.enableHiveSupport()
.getOrCreate()

2. Load Data

From CSV
df_csv = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
From Hive
df_hive = spark.sql("SELECT * FROM database.table_name")
From Oracle
jdbc_url = "jdbc:oracle:thin:@hostname:port:SID"
connection_properties = {
"user": "username",
"password": "password",
"driver": "oracle.jdbc.driver.OracleDriver"
}

df_oracle = spark.read.jdbc(jdbc_url, "schema.table_name", properties=connection_properties)

3. Apply Transformations

# Select columns
df_selected = df_csv.select("name", "age", "salary")

# Filter rows
df_filtered = df_selected.filter(df_selected["age"] > 30)

# Group by and aggregate
df_grouped = df_filtered.groupBy("department").mean("salary")

# Join with another DataFrame
df_joined = df_grouped.join(df_hive, df_grouped["department"] == df_hive["dept_id"])

# Add new column
df_with_new_col = df_joined.withColumn("new_col", df_joined["salary"] + 1000)

# Drop column
df_final = df_with_new_col.drop("dept_id")

4. Perform Actions

# Show DataFrame
df_final.show()

# Collect DataFrame
rows = df_final.collect()
for row in rows:
print(row)

# Count rows
print(df_final.count())

# Describe DataFrame
df_final.describe().show()

5. Persist DataFrames

# Cache DataFrame
df_final.cache()

# Persist DataFrame
from pyspark import StorageLevel
df_final.persist(StorageLevel.MEMORY_AND_DISK)

6. Convert to Pandas for Further Analysis and Visualization

import pandas as pd

# Convert to Pandas DataFrame
pdf = df_final.toPandas()

# Analyze with Pandas
print(pdf.describe())

# Visualize with Matplotlib
import matplotlib.pyplot as plt

pdf['salary'].hist()
plt.show()

you can use SQL expressions inside DataFrame operations,

including the filter or where functions, in PySpark. This is done

Date and Time Functions- Pyspark Dataframes

https://www.hintstoday.com/pyspark-dataframe-programming-operations-functions-all-statements-syntax-with-examples/date-and-time-functions-pyspark-dataframes

String Manipulation on PySpark DataFrames

https://www.hintstoday.com/pyspark-dataframe-programming-operations-functions-all-statements-syntax-with-examples/string-manipulation-on-pyspark-dataframes

Window functions in PySpark on Dataframes

https://www.hintstoday.com/pyspark-dataframe-programming-operations-functions-all-statements-syntax-with-examples/window-functions-in-pyspark-on-dataframe


Some Glaring Questions:-

How do you handle missing data in PySpark?

Handling missing data is a common task in data processing workflows, and PySpark provides various methods to manage missing or null values in a DataFrame. You can choose to remove, replace, or impute missing data depending on the specific use case.

Here are some common techniques to handle missing data in PySpark:


1. Drop Rows with Missing Values (dropna)

The dropna() method allows you to drop rows that contain null or missing values.

Example: Drop Rows with Any Missing Values

This will remove any row that has at least one missing value in any column.

# Drop rows where any column has a null value
df_clean = df.dropna()

Example: Drop Rows Based on Specific Columns

You can specify which columns to check for missing values.

# Drop rows only if 'column1' or 'column2' has a null value
df_clean = df.dropna(subset=["column1", "column2"])

Example: Drop Rows with Threshold

You can also set a threshold, which means only rows with a certain number of non-null values will be retained.

# Drop rows that have less than 2 non-null values
df_clean = df.dropna(thresh=2)

2. Replace Missing Values (fillna)

The fillna() method replaces null or missing values with a specified value. You can fill nulls with a constant value or use specific values for different columns.

Example: Fill All Null Values with a Single Value

You can replace all null values in a DataFrame with a constant value like 0 or an empty string.

# Replace all null values with 0
df_filled = df.fillna(0)

Example: Fill Nulls in Specific Columns

You can fill nulls in specific columns with different values.

# Replace nulls in 'column1' with 0 and in 'column2' with 'unknown'
df_filled = df.fillna({"column1": 0, "column2": "unknown"})
 

3. Impute Missing Values with Mean, Median, or Mode

To fill missing values with statistical values like mean, median, or mode, you can use PySpark’s agg() function or the pyspark.ml.feature.Imputer.

Example: Fill Missing Values with Mean

You can calculate the mean of a column and then use fillna() to replace the missing values.

from pyspark.sql.functions import mean

# Calculate the mean of 'column1'
mean_value = df.select(mean(df['column1'])).collect()[0][0]

# Fill missing values in 'column1' with the mean
df_filled = df.fillna({"column1": mean_value})

Example: Use the Imputer from pyspark.ml

PySpark provides the Imputer class, which allows you to automatically fill missing values with the mean, median, or other strategies.

from pyspark.ml.feature import Imputer

# Create an Imputer object and set the strategy to 'mean'
imputer = Imputer(inputCols=["column1", "column2"], outputCols=["column1_imputed", "column2_imputed"])

# Fit the imputer model and transform the DataFrame
df_imputed = imputer.fit(df).transform(df)
  • inputCols: Columns where the missing values are found.
  • outputCols: Columns where the imputed values will be stored.
  • You can change the strategy to “median” or “mode” using imputer.setStrategy("median").

4. Identifying Rows with Missing Data

Before handling missing data, you may want to identify rows or columns that contain missing values.

Example: Filter Rows with Null Values

You can use the filter() or where() methods to filter rows with null values.

# Filter rows where 'column1' has a null value
df_nulls = df.filter(df['column1'].isNull())

Example: Count Missing Values in Each Column

You can count the number of missing values in each column.

# Count the number of missing values in each column
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

5. Handling Missing Data in Complex Data Types

If you are working with complex types like arrays, structs, or maps, you might need to handle missing data within these nested structures.

Example: Fill Missing Values in Struct Columns

You can use the withColumn() method and a combination of functions like when(), col(), and lit() to handle missing values within nested fields.

# Assuming you have a StructType column called 'address' with a 'city' field
df_filled = df.withColumn("address", 
    when(col("address.city").isNull(), 
         struct(lit("Unknown").alias("city"), col("address.state"))
    ).otherwise(col("address"))
)

6. Dropping or Filling Columns with High Missing Rates

Sometimes, you may want to drop or fill entire columns if they contain too many missing values. You can first check the percentage of missing values and then decide whether to drop or fill the column.

Example: Drop Columns with a High Percentage of Missing Values

# Calculate the percentage of missing values for each column
threshold = 0.5  # Set a threshold, e.g., drop columns with more than 50% missing values
total_rows = df.count()

# Identify columns with more than 50% missing values
cols_to_drop = [c for c in df.columns if df.filter(col(c).isNull()).count() / total_rows > threshold]

# Drop these columns
df_clean = df.drop(*cols_to_drop)

7. Handling Null Values in Joins

When performing joins, missing values can affect the results, especially if you’re using keys that contain nulls. You might want to handle missing values before or after the join operation.

Example: Fill Nulls Before Join

# Fill missing values in the join key column before performing the join
df = df.fillna({"join_key": "default_value"})
other_df = other_df.fillna({"join_key": "default_value"})

# Perform the join
joined_df = df.join(other_df, on="join_key")

Summary of Methods to Handle Missing Data in PySpark:
  1. Drop rows with missing values: dropna().
  2. Replace missing values with a constant value: fillna().
  3. Impute missing values with mean, median, or mode: Use pyspark.ml.feature.Imputer.
  4. Filter or count missing values using filter(), where(), or count().
  5. Handle nested missing data in complex types like StructType, ArrayType, etc.
  6. Drop columns with too many missing values by calculating the percentage of nulls.

The approach you take depends on your specific requirements (e.g., whether the missing data can be safely removed, replaced with a default, or needs to be imputed).


NAN or None or Null

In PySpark, NaN (Not a Number), None, and null are commonly used to represent missing or undefined data in DataFrames, but they are not exactly the same. Let’s go over how these different types of missing data behave in PySpark.

Key Differences Between None, null, and NaN:

None in Python:

None is the Python equivalent of null in Spark and SQL. When you create a DataFrame with None in the data, Spark will treat it as a null value in the DataFrame.

data = [(1, "Alice"), (2, None)]
df = spark.createDataFrame(data, ["id", "name"])
df.show()
+---+-----+
| id| name|
+---+-----+
|  1|Alice|
|  2| null|
+---+-----+

null in PySpark:

# Checking for null values
df.filter(df["name"].isNull()).show()

NaN in PySpark:

from pyspark.sql.functions import isnan

# Checking for NaN values in a column
df.filter(isnan(df["column"])).show()

How to Handle None, null, and NaN in PySpark

1. Handling None and null

  • Replace null or None Values: You can use fillna() to replace None (or null) values in the DataFrame.
# Replace null values in the 'name' column with 'Unknown'
df_filled = df.fillna({"name": "Unknown"})
df_filled.show()
  • Drop Rows with null Values: You can use dropna() to remove rows containing null values.
# Drop rows where any column contains null values
df_clean = df.dropna()
df_clean.show()

2. Handling NaN Values

  • Replace NaN Values: You can use fillna() to replace NaN values in numeric columns.
# Replace NaN values in numeric columns with 0
df_filled = df.fillna(0)
  • Filter Out NaN Values: You can use the isnan() function to filter rows containing NaN values.
from pyspark.sql.functions import isnan

# Filter rows where a column contains NaN
df_filtered = df.filter(~isnan(df["column"]))
df_filtered.show()
  • Note: Unlike null or None, NaN is specific to numeric data types (e.g., FloatType and DoubleType). It does not apply to other data types like StringType.

3. Checking for None, null, and NaN

Check for null or None Values: Use isNull() or isNotNull() to check for null values in any column.

# Filter rows where the 'name' column is null 
df.filter(df["name"].isNull()).show()

Check for NaN Values: Use isnan() to check for NaN values in numeric columns.

from pyspark.sql.functions import isnan
 # Filter rows where the 'score' column is NaN 
df.filter(isnan(df["score"])).show()

Example: Handling None, null, and NaN Together

from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("null_nan_example").getOrCreate()

# Example DataFrame with None (null) and NaN
data = [
    (1, "Alice", 50.0),
    (2, None, float('nan')),
    (3, "Bob", None),
    (4, "Charlie", 70.0)
]
columns = ["id", "name", "score"]

df = spark.createDataFrame(data, columns)

# Show the original DataFrame
df.show()

# Filter rows where 'name' is null (None)
df.filter(df["name"].isNull()).show()

# Filter rows where 'score' is NaN
df.filter(isnan(df["score"])).show()

# Replace NaN and null values
df_filled = df.fillna({"name": "Unknown", "score": 0})
df_filled.show()

Output:

+---+-------+-----+
| id| name|score|
+---+-------+-----+
| 1| Alice| 50.0|
| 2| null| NaN|
| 3| Bob| null|
| 4|Charlie| 70.0|
+---+-------+-----+

# Rows where 'name' is null
+---+----+-----+
| id|name|score|
+---+----+-----+
| 2|null| NaN|
+---+----+-----+

# Rows where 'score' is NaN
+---+----+-----+
| id|name|score|
+---+----+-----+
| 2|null| NaN|
+---+----+-----+

# DataFrame after replacing NaN and null values
+---+-------+-----+
| id| name|score|
+---+-------+-----+
| 1| Alice| 50.0|
| 2|Unknown| 0.0|
| 3| Bob| 0.0|
| 4|Charlie| 70.0|
+---+-------+-----+

Summary of Key Points:

  1. None and null: Both represent missing or undefined values in PySpark and can occur in any column type.
    • Use isNull() or fillna() to handle None/null values.
  2. NaN: Specific to numeric data types (FloatType, DoubleType), representing an invalid number.
    • Use isnan() to filter or handle NaN values in numeric columns.
  3. Handling nulls and NaNs: You can fill, filter, or drop rows with null or NaN values using PySpark functions like fillna(), dropna(), isNull(), and isnan().

All three (None, null, NaN) are common representations of missing data in PySpark, but they behave differently based on their data type and how you handle them.


.na.drop() vs dropna() in PySpark

In PySpark, both .na.drop() and dropna() are used to drop rows containing null values from a DataFrame, and they are essentially the same in functionality. The only difference is the way they are called.

1. .na.drop() vs dropna() in PySpark

  • .na.drop(): This is called through the na property of the DataFrame, which provides various functions to handle missing data.
  • dropna(): This is a more direct method call on the DataFrame and achieves the same result as .na.drop().

Example of .na.drop()

# Using .na.drop() to drop rows with null values
df_clean = df.na.drop()

Example of dropna()

# Using dropna() to drop rows with null values
df_clean = df.dropna()

Both examples will drop rows that contain any null values in any column.

Options for .na.drop() and dropna()

  • Both .na.drop() and dropna() support the same arguments like subset, how, and thresh.
    • subset: Specifies columns to check for null values.
    • how: Specifies whether to drop rows with any nulls ('any') or all nulls ('all').
    • thresh: Specifies a minimum number of non-null values required to keep a row.

Example with subset and how:

# Drop rows where any value in 'column1' or 'column2' is null
df_clean = df.na.drop(subset=["column1", "column2"])

# Drop rows only if all columns are null
df_clean = df.dropna(how='all')

2. Equivalent in Pandas

In Pandas, the equivalent function for dropping rows with missing values is dropna().

Example in Pandas:

import pandas as pd

# Create a Pandas DataFrame
data = {'col1': [1, None, 3], 'col2': [4, 5, None]}
df = pd.DataFrame(data)

# Drop rows with any missing values
df_clean = df.dropna()

Pandas dropna() offers similar options to PySpark’s dropna(), such as:

  • subset: Specify which columns to consider for null values.
  • how: Drop rows with any or all missing values.
  • thresh: Keep rows that have at least a specified number of non-null values.

Summary of Equivalent Methods:

OperationPySparkPandas
Drop rows with nullsdf.na.drop() / df.dropna()df.dropna()

Both PySpark and Pandas offer flexibility to drop rows with missing data using the respective dropna() method, with similar arguments to control behavior such as subset, how, and thresh.


What is PySpark DataFrame API? How does it relate to Pyspark SQL?

In PySpark, you can perform operations on DataFrames using two main APIs: the DataFrame API and the Spark SQL API. Both are powerful and can be used interchangeably to some extent.

Here’s a breakdown of key concepts and functionalities:

1. Creating DataFrames:

you can create a PySpark DataFrame from a list of rows

from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df
DataFrame[a: bigint, b: double,c: string, d: date, e: timestamp]

Create a PySpark DataFrame with an explicit schema.


df = spark.createDataFrame([
(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

From existing data sources:

Read data from various sources like CSV, Parquet, JSON, or Hive tables using spark.read.format("format").load("path").

Leverage JDBC to connect to external databases and read tables.

PySpark DataFrame from a pandas DataFrame

pandas_df = pd.DataFrame({
'a': [1, 2, 3],
'b': [2., 3., 4.],
'c': ['string1', 'string2', 'string3'],
'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df

Also we can Create DataFrames from lists, dictionaries, or other Python data structures using spark.createDataFrame(data).

PySpark DataFrame from an RDD consisting of a list of tuples.

rdd = spark.sparkContext.parallelize([
(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
])
df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]
The DataFrames created above all have the same results and schema.
# All DataFrames above result same.
df.show()
df.printSchema()

2. DataFrame Structure:

  • DataFrames are distributed collections of rows, where each row represents a record.
  • Each column has a name and a specific data type (e.g., integer, string, timestamp).
  • You can access columns by name (e.g., df.column_name).

3. DataFrame Operations:

  • Selection: Select specific columns using df.select("column1", "column2").
  • Filtering: Filter rows based on certain conditions using df.where(condition).
  • Sorting: Sort DataFrames using df.sort(col1.desc(), col2.asc()).
  • Aggregation: Calculate summary statistics using functions like df.groupBy("column").agg(F.avg("value")).
  • Joining: Combine DataFrames based on shared columns using df1.join(df2, on="column_name", how="inner").
  • Transformations: Apply custom logic to each row using df.withColumn("new_column", F.col("existing_column") * 2).
  • Action Operations: These operations return results or trigger computations, like df.show(), df.count(), df.write.parquet("path").

4. Key Features:

  • Laziness: DataFrame operations are not immediately executed but rather built into a logical plan. When an action operation is triggered (e.g., df.show()), the plan is optimized and executed on the cluster.
  • Distributed Processing: Operations are spread across the Spark cluster for parallel processing, enabling efficient handling of large datasets.
  • Data Types: DataFrames support various data types for flexibility in data representation.

5. Relationship with Spark SQL:

  • PySpark DataFrame API provides a programmatic way to interact with DataFrames.
  • Spark SQL offers a SQL-like syntax for querying DataFrames. You can seamlessly switch between these APIs for data manipulation.

Here are some resources for further exploration:

The PySpark SQL API offers a powerful way to interact with structured data in PySpark using SQL-like syntax. Here’s a breakdown of key concepts and functionalities:

1. SparkSession:

  • The entry point for working with PySpark SQL is the SparkSession object.
  • It provides methods for creating DataFrames, interacting with catalogs (like Hive tables), and configuring Spark SQL settings.

2. DataFrames:

  • DataFrames are the fundamental data structures in PySpark SQL. They represent distributed collections of rows with named columns and specific data types.
  • You can create DataFrames from various sources (CSV, Parquet, JSON, Hive tables) or using Python collections (lists, dictionaries).

3. SQL-like Operations:

  • PySpark SQL allows you to write SQL-inspired queries on DataFrames.
  • You can perform actions like:
    • Selection:SELECT column1, column2 FROM df
    • Filtering:SELECT * FROM df WHERE condition
    • Aggregation:SELECT column, SUM(value) AS total FROM df GROUP BY column
    • Joining:SELECT * FROM df1 JOIN df2 ON df1.column = df2.column
    • Sorting:SELECT * FROM df ORDER BY column ASC

4. Key Features:

  • Integration with DataFrame API: You can seamlessly switch between PySpark SQL and DataFrame API for data manipulation.
  • SQL Functions: PySpark SQL provides a rich set of built-in functions for various operations (e.g., mathematical, string manipulation, date/time).
  • User-Defined Functions (UDFs): You can create custom functions in Python and use them within your SQL queries.
  • Catalog Integration: PySpark SQL interacts with catalogs like Hive, allowing you to manage and query external tables.

5. Benefits of PySpark SQL:

  • Readability: SQL-like syntax often feels more natural for data analysts and SQL-familiar users.
  • Conciseness: Complex data manipulations can be expressed concisely in SQL queries.
  • Familiarity: Existing SQL knowledge can be leveraged for working with PySpark.

6. Resources:

  • While PySpark SQL provides SQL-like syntax, it’s not a full-fledged SQL implementation. Some functionalities might differ from traditional SQL engines.
  • PySpark SQL leverages the power of PySpark’s distributed processing for efficient handling of large datasets.

Below is an explanation of each, along with examples.

DataFrame API

The DataFrame API provides a higher-level abstraction for structured data. It allows you to manipulate data in a more Pythonic way.

Example of DataFrame API

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("DataFrame API Example").getOrCreate()

# Create a DataFrame
data = [("James", "Smith", "M", 3000),
("Anna", "Rose", "F", 4100),
("Robert", "Williams", "M", 6200)]

columns = ["FirstName", "LastName", "Gender", "Salary"]
df = spark.createDataFrame(data, schema=columns)

# Show DataFrame
df.show()

# Select specific columns
df.select("FirstName", "Salary").show()

# Filter rows
df.filter(df["Salary"] > 4000).show()

# Group by and aggregation
df.groupBy("Gender").avg("Salary").show()

# Add new column
df.withColumn("Bonus", df["Salary"] * 0.10).show()

# Stop Spark session
spark.stop()


Spark SQL API

The Spark SQL API allows you to run SQL queries on DataFrames, providing a SQL-like interface for querying data. This can be particularly useful for users who are familiar with SQL.

Example of Spark SQL API

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("Spark SQL API Example").getOrCreate()

# Create a DataFrame
data = [("James", "Smith", "M", 3000),
("Anna", "Rose", "F", 4100),
("Robert", "Williams", "M", 6200)]

columns = ["FirstName", "LastName", "Gender", "Salary"]
df = spark.createDataFrame(data, schema=columns)

# Register DataFrame as a temporary SQL table
df.createOrReplaceTempView("employees")

# Show DataFrame using SQL
spark.sql("SELECT * FROM employees").show()

# Select specific columns using SQL
spark.sql("SELECT FirstName, Salary FROM employees").show()

# Filter rows using SQL
spark.sql("SELECT * FROM employees WHERE Salary > 4000").show()

# Group by and aggregation using SQL
spark.sql("SELECT Gender, AVG(Salary) AS AvgSalary FROM employees GROUP BY Gender").show()

# Add new column using SQL
spark.sql("SELECT *, Salary * 0.10 AS Bonus FROM employees").show()

# Stop Spark session
spark.stop()


When to Use Each API

  • DataFrame API: Use this for more programmatic operations and when you need to chain multiple transformations together. It is more Pythonic and integrates seamlessly with other PySpark operations.
  • Spark SQL API: Use this when you have complex queries that are easier to express in SQL or when working with people who are more familiar with SQL.

Combined Example

You can combine both APIs in a single application. For instance, you might load data and perform initial transformations with the DataFrame API, then register the DataFrame as a table and perform complex queries with Spark SQL.

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("Combined Example").getOrCreate()

# Create a DataFrame
data = [("James", "Smith", "M", 3000),
("Anna", "Rose", "F", 4100),
("Robert", "Williams", "M", 6200)]

columns = ["FirstName", "LastName", "Gender", "Salary"]
df = spark.createDataFrame(data, schema=columns)

# DataFrame API
df = df.withColumn("Bonus", df["Salary"] * 0.10)

# Register DataFrame as a temporary SQL table
df.createOrReplaceTempView("employees")

# Spark SQL API
result_df = spark.sql("SELECT FirstName, LastName, Salary + Bonus AS TotalCompensation FROM employees")

# Show result
result_df.show()

# Stop Spark session
spark.stop()


This flexibility allows you to leverage the strengths of both APIs and use the most appropriate tool for each task in your ETL pipeline.

Questions-

How to run SQL queries in PySpark SQL?

Running SQL Queries in PySpark

To run SQL queries in PySpark, you can use the SparkSession object, which provides a SQL context for executing SQL queries.

SparkSession in PySpark SQL

SparkSession is the entry point to programming Spark with the Dataset and DataFrame API. It provides a SQL context for executing SQL queries.

Here’s an example of creating a SparkSession object:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("My App").getOrCreate()

SQL Context

In PySpark, a SQL context refers to the environment or scope in which SQL queries are executed. It provides a way to interact with Spark’s SQL engine, allowing you to execute SQL queries, create tables, and perform other SQL-related operations.

Think of a SQL context as a container that holds the state and metadata necessary to execute SQL queries. It’s like a workspace where you can write and execute SQL code, and it keeps track of the tables, views, and other objects you create.

When you create a SparkSession object, it automatically creates a SQL context for you. This SQL context is tied to the SparkSession object and is used to execute SQL queries, create tables, and perform other SQL-related operations.

Key Components of a SQL Context

  1. Catalog: The catalog is the metadata store that keeps track of tables, views, and other objects.
  2. SQL Parser: The SQL parser is responsible for parsing SQL queries and converting them into an abstract syntax tree (AST).
  3. Optimizer: The optimizer is responsible for optimizing the execution plan of SQL queries.
  4. Executor: The executor is responsible for executing the optimized execution plan.

SQL Context Operations

  1. Executing SQL Queries: You can execute SQL queries using the sql method.
  2. Creating Tables: You can create tables using the createTable method.
  3. Creating Views: You can create views using the createView method.
  4. Registering DataFrames as Tables: You can register DataFrames as tables using the createOrReplaceTempView method.

By understanding the SQL context, you can effectively use PySpark’s SQL features to analyze and manipulate data.

Spark.sql(“””….. “”””)


Discover more from AI HintsToday

Subscribe to get the latest posts sent to your email.

Trending

Discover more from AI HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading