PySpark provides a powerful API for data manipulation, similar to pandas, but optimized for big data processing. Below is a comprehensive overview of DataFrame operations, functions, and syntax in PySpark with examples.
Contents
- 1 Creating DataFrames
- 1.1 1. Creating DataFrames from CSV Files
- 1.2 2. Creating DataFrames from Excel Files
- 1.3 3. Creating DataFrames from Python List
- 1.4 4. Creating DataFrames from Python Tuple
- 1.5 5. Creating DataFrames from Python Dictionary
- 1.6 6. Creating DataFrames from Pandas DataFrame
- 1.7 7. Creating DataFrames from Hive Tables
- 1.8 8. Creating DataFrames from Values
- 1.9 9. Creating DataFrames from RDDs
- 1.10 10. Creating DataFrames from Oracle Database
- 1.11 11. Creating DataFrames from HBase Tables
- 1.12 12. Creating DataFrames from JSON Files
- 1.13 13. Creating DataFrames from Parquet Files
- 2 PySpark Operations on DataFrames
- 3 Example Usage
- 3.1 1.how to use SQL expressions inside the filter or where functions?
- 3.2 2. select function syntax variations and examples:
- 3.3 3.PySpark DataFrame groupBy Operation
- 3.3.1 Syntax
- 3.3.2 Common Aggregation Functions
- 3.3.3 Examples
- 3.3.3.1 Sample DataFrame
- 3.3.3.2 1. Group By Single Column and Count
- 3.3.3.3 2. Group By Multiple Columns and Count
- 3.3.3.4 3. Group By with Sum Aggregation
- 3.3.3.5 4. Group By with Multiple Aggregations
- 3.3.3.6 5. Using agg Method with Expressions
- 3.3.3.7 6. Group By with Having Clause (Filtering Groups)
- 3.4 3.PySpark orderBy() and sort() Operations
- 3.5 4.PySpark Join Operations
- 3.6 Date and Time Functions- Pyspark Dataframes
- 3.7 String Manipulation on PySpark DataFrames
- 3.8 Window functions in PySpark on Dataframes
- 3.9 Share this:
Creating DataFrames
Creating DataFrames from various sources is a common task in PySpark. Below are examples for creating DataFrames from CSV files, Excel files, Python List, Python Tuple, Python Dictionary, Pandas DataFrames, Hive tables, values, RDDs, Oracle databases, and HBase tables.
1. Creating DataFrames from CSV Files
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from CSV") \
.getOrCreate()
# Read CSV file
df_csv = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
df_csv.show()
2. Creating DataFrames from Excel Files
To read Excel files, you need to install the spark-excel
library.
# Add the following dependency when initializing SparkSession
# .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.5")
df_excel = spark.read \
.format("com.crealytics.spark.excel") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("path/to/file.xlsx")
df_excel.show()
3. Creating DataFrames from Python List
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from List") \
.getOrCreate()
# Create DataFrame from list
data = [("Alice", 30), ("Bob", 25), ("Cathy", 28)]
columns = ["name", "age"]
df_list = spark.createDataFrame(data, columns)
df_list.show()
4. Creating DataFrames from Python Tuple
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from Tuple") \
.getOrCreate()
# Create DataFrame from tuple
data = (("Alice", 30), ("Bob", 25), ("Cathy", 28))
columns = ["name", "age"]
df_tuple = spark.createDataFrame(data, columns)
df_tuple.show()
5. Creating DataFrames from Python Dictionary
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from Dictionary") \
.getOrCreate()
# Create DataFrame from dictionary
data = {"name": ["Alice", "Bob", "Cathy"], "age": [30, 25, 28]}
df_dict = spark.createDataFrame(pd.DataFrame(data))
df_dict.show()
6. Creating DataFrames from Pandas DataFrame
import pandas as pd
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from Pandas") \
.getOrCreate()
# Create a Pandas DataFrame
pdf = pd.DataFrame({"name": ["Alice", "Bob", "Cathy"], "age": [30, 25, 28]})
# Convert Pandas DataFrame to Spark DataFrame
df_pandas = spark.createDataFrame(pdf)
df_pandas.show()
7. Creating DataFrames from Hive Tables
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from Hive") \
.enableHiveSupport() \
.getOrCreate()
# Read Hive table
df_hive = spark.sql("SELECT * FROM database.table_name")
df_hive.show()
8. Creating DataFrames from Values
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from Values") \
.getOrCreate()
# Create DataFrame from values
data = [("Alice", 30), ("Bob", 25), ("Cathy", 28)]
columns = ["name", "age"]
df_values = spark.createDataFrame(data, columns)
df_values.show()
9. Creating DataFrames from RDDs
from pyspark.sql import Row, SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from RDD") \
.getOrCreate()
# Create an RDD
rdd = spark.sparkContext.parallelize([
Row(name="Alice", age=30),
Row(name="Bob", age=25),
Row(name="Cathy", age=28)
])
# Convert RDD to DataFrame
df_rdd = spark.createDataFrame(rdd)
df_rdd.show()
10. Creating DataFrames from Oracle Database
To read from Oracle, you need to have the JDBC driver for Oracle in your classpath.
# Add the following dependency when initializing SparkSession
# .config("spark.jars", "path/to/ojdbc8.jar")
jdbc_url = "jdbc:oracle:thin:@hostname:port:SID"
connection_properties = {
"user": "username",
"password": "password",
"driver": "oracle.jdbc.driver.OracleDriver"
}
df_oracle = spark.read.jdbc(jdbc_url, "schema.table_name", properties=connection_properties)
df_oracle.show()
11. Creating DataFrames from HBase Tables
To read from HBase, you need the hbase-spark
connector.
# Add the following dependency when initializing SparkSession
# .config("spark.jars.packages", "org.apache.hbase.connectors.spark:hbase-spark:1.0.0")
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from HBase") \
.getOrCreate()
# Define HBase catalog
catalog = ''.join("""{
"table":{"namespace":"default", "name":"tablename"},
"rowkey":"key",
"columns":{
"col0":{"cf":"rowkey", "col":"key", "type":"string"},
"col1":{"cf":"cf1", "col":"col1", "type":"string"},
"col2":{"cf":"cf2", "col":"col2", "type":"string"}
}
}""".split())
df_hbase = spark.read \
.options(catalog=catalog) \
.format("org.apache.spark.sql.execution.datasources.hbase") \
.load()
df_hbase.show()
12. Creating DataFrames from JSON Files
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from JSON") \
.getOrCreate()
# Read JSON file
df_json = spark.read.json("path/to/file.json")
df_json.show()
13. Creating DataFrames from Parquet Files
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from Parquet") \
.getOrCreate()
# Read Parquet file
df_parquet = spark.read.parquet("path/to/file.parquet")
df_parquet.show()
PySpark Operations on DataFrames
PySpark DataFrame operations can be broadly classified into two categories: Transformations and Actions.
Transformations
Transformations are operations that are applied on a DataFrame to produce a new DataFrame. Transformations are lazy, meaning they are not executed immediately. Instead, they are recorded as a lineage of transformations to be applied when an action is called.
Here are some common transformations:
select()
- Description: Selects a subset of columns.
- Syntax:
df.select("col1", "col2", ...)
Example:
df.select("name", "age").show()
filter() or where()
- Description: Filters rows based on a condition.
- Syntax:
df.filter(df["age"] > 30)
- Example:df.filter(df[“age”] > 30).show()
groupBy()
- Description: Groups rows by specified columns.
- Syntax:
df.groupBy("department").agg({"salary": "mean"})
Example:
df.groupBy("department").mean("salary").show()
orderBy() or sort()
- Description: Sorts rows by specified columns.
- Syntax:
df.orderBy(df["age"].desc())
Example:
df.orderBy(df["age"].desc()).show()
withColumn()
- Description: Adds or replaces a column.
- Syntax:
df.withColumn("new_col", df["existing_col"] + 1)
Example:
df.withColumn("new_age", df["age"] + 1).show()
drop()
- Description: Drops specified columns.
- Syntax:
df.drop("col1", "col2")
Example:
df.drop("age").show()
distinct()
- Description: Returns distinct rows.
- Syntax:
df.distinct()
Example:
df.distinct().show()
join()
- Description: Joins two DataFrames.
- Syntax:
df1.join(df2, df1["key"] == df2["key"], "inner")
Example:
df1.join(df2, df1["id"] == df2["id"], "inner").show()
union()
- Description: Unions two DataFrames.
- Syntax:
df1.union(df2)
Example:
df1.union(df2).show()
sample()
- Description: Returns a sampled subset of rows.
- Syntax:
df.sample(withReplacement=False, fraction=0.1)
Example:
df.sample(withReplacement=False, fraction=0.1).show()
Actions
Actions trigger the execution of the transformations recorded in the DAG and return results.
Here are some common actions:
collect()
- Description: Returns all rows as a list of Row objects.
- Syntax:
df.collect()
Example:rows = df.collect() for row in rows: print(row)
first()
- Description: Returns the first row.
- Syntax:
df.first()
Example:print(df.first())
take()
- Description: Returns the first
n
rows. - Syntax:
df.take(n)
Example:print(df.take(5))
describe()
- Description: Computes basic statistics.
- Syntax:
df.describe()
Example:df.describe().show()
cache()
- Description: Caches the DataFrame in memory.
- Syntax:
df.cache()
Example:
df.cache()
persist()
- Description: Persists the DF with specified storage level.
- Syntax:
df.persist(storageLevel)
Example:from pyspark import StorageLevel df.persist(StorageLevel.MEMORY_AND_DISK)
write()
- Description: Writes the DataFrame to a specified format.
- Syntax:
df.write.format("parquet").save("path")
Example:df.write.format(“parquet”).save(“path/to/parquet”)
foreach()
- Description: Applies a function to each row.
- Syntax:
df.foreach(f)
Example:def print_row(row): print(row) df.foreach(print_row)
Example Project: Comprehensive ETL Pipeline in PySpark
Let’s create a comprehensive example project that demonstrates these operations.
- Setup Spark Session
- Load Data from CSV, Hive, Oracle, and create DataFrames
- Apply Transformations
- Perform Actions
- Persist DataFrames
- Convert to Pandas for further analysis and visualization
1. Setup Spark Session
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("ETL Pipeline Example") \
.enableHiveSupport() \
.getOrCreate()
2. Load Data
From CSV
df_csv = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
From Hive
df_hive = spark.sql("SELECT * FROM database.table_name")
From Oracle
jdbc_url = "jdbc:oracle:thin:@hostname:port:SID"
connection_properties = {
"user": "username",
"password": "password",
"driver": "oracle.jdbc.driver.OracleDriver"
}
df_oracle = spark.read.jdbc(jdbc_url, "schema.table_name", properties=connection_properties)
3. Apply Transformations
# Select columns
df_selected = df_csv.select("name", "age", "salary")
# Filter rows
df_filtered = df_selected.filter(df_selected["age"] > 30)
# Group by and aggregate
df_grouped = df_filtered.groupBy("department").mean("salary")
# Join with another DataFrame
df_joined = df_grouped.join(df_hive, df_grouped["department"] == df_hive["dept_id"])
# Add new column
df_with_new_col = df_joined.withColumn("new_col", df_joined["salary"] + 1000)
# Drop column
df_final = df_with_new_col.drop("dept_id")
4. Perform Actions
# Show DataFrame
df_final.show()
# Collect DataFrame
rows = df_final.collect()
for row in rows:
print(row)
# Count rows
print(df_final.count())
# Describe DataFrame
df_final.describe().show()
5. Persist DataFrames
# Cache DataFrame
df_final.cache()
# Persist DataFrame
from pyspark import StorageLevel
df_final.persist(StorageLevel.MEMORY_AND_DISK)
6. Convert to Pandas for Further Analysis and Visualization
import pandas as pd
# Convert to Pandas DataFrame
pdf = df_final.toPandas()
# Analyze with Pandas
print(pdf.describe())
# Visualize with Matplotlib
import matplotlib.pyplot as plt
pdf['salary'].hist()
plt.show()
you can use SQL expressions inside DataFrame operations, including the filter
or where
functions, in PySpark. This is done using the expr
function from the pyspark.sql.functions
module, which allows you to pass SQL expressions as strings.
Example Usage
1.how to use SQL expressions inside the filter
or where
functions?
Here is an example demonstrating how to use SQL expressions inside the filter
or where
functions:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
# Initialize Spark session
spark = SparkSession.builder \
.appName("SQL Expression Example") \
.getOrCreate()
# Sample DataFrame
data = [
(1, "Alice", 25, 2000),
(2, "Bob", 30, 3000),
(3, "Charlie", 35, 4000)
]
columns = ["id", "name", "age", "salary"]
df = spark.createDataFrame(data, columns)
# Using SQL expression in filter
filtered_df = df.filter(expr("age > 30"))
filtered_df.show()
Detailed Example
Let’s go through a more detailed example with multiple conditions and transformations using SQL expressions.
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
# Initialize Spark session
spark = SparkSession.builder \
.appName("SQL Expression Example") \
.getOrCreate()
# Sample DataFrame
data = [
(1, "Alice", 25, 2000),
(2, "Bob", 30, 3000),
(3, "Charlie", 35, 4000),
(4, "David", 40, 5000)
]
columns = ["id", "name", "age", "salary"]
df = spark.createDataFrame(data, columns)
# Filter using SQL expression
filtered_df = df.filter(expr("age > 30 AND salary > 3000"))
# Select specific columns using SQL expression
selected_df = filtered_df.selectExpr("id", "name", "salary * 1.1 AS adjusted_salary")
# Adding a new column using SQL expression
with_new_column_df = selected_df.withColumn("salary_category", expr("CASE WHEN adjusted_salary > 4000 THEN 'High' ELSE 'Medium' END"))
with_new_column_df.show()
Output
+---+-------+---------------+---------------+
| id| name|adjusted_salary|salary_category|
+---+-------+---------------+---------------+
| 3|Charlie| 4400.0| High|
| 4| David| 5500.0| High|
+---+-------+---------------+---------------+
2. select
function syntax variations and examples:
The select
function in PySpark is used to select a subset of columns or expressions from a DataFrame. It can be used in several ways to achieve different results.
Basic Syntax
The basic syntax for the select
function is:
DataFrame.select(*cols)
Here, *cols can be a list of column names, Column objects, or expressions.
Selecting Specific Columns
df.select('column1', 'column2').show()
This will select and display the 'column1' and 'column2' columns.
Selecting Columns with Expressions
from pyspark.sql.functions import col, expr
df.select(col('column1'), expr('column2 + 1').alias('column2_plus_one')).show()
This will select ‘column1’ and an expression ‘column2 + 1’ as ‘column2_plus_one’.
Using Column Objects
Example:
from pyspark.sql.functions import col
df.select(col('column1'), col('column2')).show()
This will select 'column1' and 'column2' using Column objects.
Renaming Columns
df.select(col('column1').alias('new_column1')).show()
This will select ‘column1’ and rename it to ‘new_column1’.
Selecting All Columns
df.select('*').show()
This will select all columns in the DataFrame.
Using SQL Expressions
df.selectExpr('column1', 'column2 + 1 as column2_plus_one').show()
This allows you to use SQL expressions directly.
3.PySpark DataFrame groupBy
Operation
The groupBy
operation in PySpark is used to group data based on one or more columns. Once grouped, you can apply various aggregation functions like count
, sum
, avg
, max
, min
, etc.
Syntax
DataFrame.groupBy(*cols)
Here, *cols
can be one or more column names or expressions.
Common Aggregation Functions
count()
sum(column)
avg(column)
max(column)
min(column)
agg(*exprs)
Examples
Sample DataFrame
data = [
("Alice", "Sales", 3000),
("Bob", "Sales", 4000),
("Alice", "Sales", 4000),
("Catherine", "HR", 4000),
("David", "HR", 5000),
("Eve", "IT", 3000)
]
columns = ["Name", "Department", "Salary"]
df = spark.createDataFrame(data, schema=columns)
df.show()
1. Group By Single Column and Count
df.groupBy("Department").count().show()
Output:
+----------+-----+
|Department|count|
+----------+-----+
| HR| 2|
| IT| 1|
| Sales| 3|
+----------+-----+
2. Group By Multiple Columns and Count
df.groupBy("Department", "Name").count().show()
Output:
+----------+-----+-----+
|Department| Name|count|
+----------+-----+-----+
| Sales|Alice| 2|
| HR|David| 1|
| HR|Catherine| 1|
| IT| Eve| 1|
| Sales| Bob| 1|
+----------+-----+-----+
3. Group By with Sum Aggregation
df.groupBy("Department").sum("Salary").show()
Output:
+----------+-----------+
|Department|sum(Salary)|
+----------+-----------+
| HR| 9000 |
| IT| 3000 |
| Sales| 11000 |
+----------+-----------+
4. Group By with Multiple Aggregations
df.groupBy("Department").agg(
count("*").alias("Total_Count"),
sum("Salary").alias("Total_Salary"),
avg("Salary").alias("Average_Salary"),
max("Salary").alias("Max_Salary"),
min("Salary").alias("Min_Salary")
).show()
Output:
+----------+-----------+-----------+--------------+-----------+-----------+
|Department|Total_Count|Total_Salary|Average_Salary|Max_Salary|Min_Salary|
+----------+-----------+-----------+--------------+-----------+-----------+
| HR| 2| 9000| 4500.0| 5000| 4000|
| IT| 1| 3000| 3000.0| 3000| 3000|
| Sales| 3| 11000| 3666.67| 4000| 3000|
+----------+-----------+-----------+--------------+-----------+-----------+
5. Using agg
Method with Expressions
from pyspark.sql.functions import expr
df.groupBy("Department").agg(
expr("count(*) as Total_Count"),
expr("sum(Salary) as Total_Salary"),
expr("avg(Salary) as Average_Salary"),
expr("max(Salary) as Max_Salary"),
expr("min(Salary) as Min_Salary")
).show()
Output is the same as above.
6. Group By with Having Clause (Filtering Groups)
df.groupBy("Department").agg(
sum("Salary").alias("Total_Salary")
).filter(col("Total_Salary") > 5000).show()
Output:
+----------+-----------+
|Department|Total_Salary|
+----------+-----------+
| HR| 9000|
| Sales| 11000|
+----------+-----------+
3.PySpark orderBy()
and sort()
Operations
In PySpark, both orderBy()
and sort()
are used to sort the rows of a DataFrame. They can be used interchangeably, as they provide the same functionality.
Syntax
pythonCopy codeDataFrame.orderBy(*cols, ascending=True)
DataFrame.sort(*cols, ascending=True)
cols
: List of column names or expressions to sort by.ascending
: Boolean or list of booleans. If a single boolean is provided, it applies to all columns. If a list is provided, it specifies the sort order for each corresponding column.
data = [
("Alice", 34, "HR", 3000),
("Bob", 45, "IT", 4000),
("Catherine", 29, "HR", 5000),
("David", 36, "IT", 2500),
("Eve", 28, "Sales", 2800)
]
columns = ["Name", "Age", "Department", "Salary"]
df = spark.createDataFrame(data, schema=columns)
df.show()
1. Order By Single Column Ascending
df.orderBy("Age").show()
Equivalent:
df.sort("Age").show()
Output:
+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
| Eve| 28| Sales| 2800|
|Catherine| 29| HR| 5000|
| Alice| 34| HR| 3000|
| David| 36| IT| 2500|
| Bob| 45| IT| 4000|
+---------+---+----------+------+
2. Order By Single Column Descending
df.orderBy(col("Age").desc()).show()
Equivalent:
df.sort(col("Age").desc()).show()
Output:
+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
| Bob| 45| IT| 4000|
| David| 36| IT| 2500|
| Alice| 34| HR| 3000|
|Catherine| 29| HR| 5000|
| Eve| 28| Sales| 2800|
+---------+---+----------+------+
3. Order By Multiple Columns
df.orderBy("Department", "Age").show()
Equivalent:
df.sort("Department", "Age").show()
Output:
+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
| Alice| 34| HR| 3000|
|Catherine| 29| HR| 5000|
| Bob| 45| IT| 4000|
| David| 36| IT| 2500|
| Eve| 28| Sales| 2800|
+---------+---+----------+------+
4. Order By Multiple Columns with Different Sort Orders
df.orderBy(["Department", "Age"], ascending=[True, False]).show()
Equivalent:
df.sort(["Department", "Age"], ascending=[True, False]).show()
Output:
+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
|Catherine| 29| HR| 5000|
| Alice| 34| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 2500|
| Eve| 28| Sales| 2800|
+---------+---+----------+------+
Useful Examples
Sorting by Salary in Descending Order
df.orderBy(col("Salary").desc()).show()
Output:
+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
|Catherine| 29| HR| 5000|
| Bob| 45| IT| 4000|
| Alice| 34| HR| 3000|
| Eve| 28| Sales| 2800|
| David| 36| IT| 2500|
+---------+---+----------+------+
Sorting by Department and then by Salary within each Department
df.orderBy("Department", col("Salary").desc()).show()
Output:
+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
|Catherine| 29| HR| 5000|
| Alice| 34| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 2500|
| Eve| 28| Sales| 2800|
+---------+---+----------+------+
Sorting with Expression
df.orderBy(expr("Salary + Age").desc()).show()
Output:
+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
| Bob| 45| IT| 4000|
|Catherine| 29| HR| 5000|
| Alice| 34| HR| 3000|
| David| 36| IT| 2500|
| Eve| 28| Sales| 2800|
+---------+---+----------+------+
4.PySpark Join Operations
In PySpark, the join
operation is used to combine two DataFrames based on a condition. The join operation can be performed in several ways, such as inner join, left join, right join, outer join, semi join, and anti join.
Syntax
DataFrame.join(other, on=None, how=None)
other
: The DataFrame to join with.on
: The column(s) to join on. This can be a string representing a single column name, a list of column names, or a condition.how
: The type of join to perform. Default isinner
.
Join Types
inner
: Inner join.left
orleft_outer
: Left outer join.right
orright_outer
: Right outer join.outer
orfull
orfull_outer
: Full outer join.left_semi
: Left semi join.left_anti
: Left anti join.
Examples
data1 = [
("Alice", 34, "HR"),
("Bob", 45, "IT"),
("Catherine", 29, "HR"),
("David", 36, "IT")
]
columns1 = ["Name", "Age", "Department"]
data2 = [
("HR", 3000),
("IT", 4000),
("Sales", 2800)
]
columns2 = ["Department", "Salary"]
df1 = spark.createDataFrame(data1, schema=columns1)
df2 = spark.createDataFrame(data2, schema=columns2)
df1.show()
df2.show()
1. Inner Join
inner_join_df = df1.join(df2, on="Department", how="inner")
inner_join_df.show()
Output:
+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
| Alice| 34| HR| 3000|
|Catherine| 29| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 4000|
+---------+---+----------+------+
2. Left Outer Join
left_join_df = df1.join(df2, on="Department", how="left")
left_join_df.show()
Output:
+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
| Alice| 34| HR| 3000|
|Catherine| 29| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 4000|
+---------+---+----------+------+
3. Right Outer Join
right_join_df = df1.join(df2, on="Department", how="right")
right_join_df.show()
Output:
+---------+----+----------+------+
| Name| Age|Department|Salary|
+---------+----+----------+------+
| Alice| 34| HR| 3000|
|Catherine| 29| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 4000|
| null|null| Sales| 2800|
+---------+----+----------+------+
4. Full Outer Join
outer_join_df = df1.join(df2, on="Department", how="outer")
outer_join_df.show()
Output:
+---------+----+----------+------+
| Name| Age|Department|Salary|
+---------+----+----------+------+
| Alice| 34| HR| 3000|
|Catherine| 29| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 4000|
| null|null| Sales| 2800|
+---------+----+----------+------+
5. Left Semi Join
semi_join_df = df1.join(df2, on="Department", how="left_semi")
semi_join_df.show()
Output:
+---------+---+----------+
| Name|Age|Department|
+---------+---+----------+
| Alice| 34| HR|
|Catherine| 29| HR|
| Bob| 45| IT|
| David| 36| IT|
+---------+---+----------+
6. Left Anti Join
anti_join_df = df1.join(df2, on="Department", how="left_anti")
anti_join_df.show()
Output:
+----+---+----------+
|Name|Age|Department|
+----+---+----------+
+----+---+----------+
7. Join on Multiple Columns
data3 = [
("Alice", 34, "HR", 3000),
("Bob", 45, "IT", 4000),
("Catherine", 29, "HR", 5000),
("David", 36, "IT", 2500),
("Eve", 28, "Sales", 2800)
]
columns3 = ["Name", "Age", "Department", "Salary"]
data4 = [
("HR", 34, 3000, "Manager"),
("IT", 45, 4000, "Developer"),
("Sales", 28, 2800, "Salesman")
]
columns4 = ["Department", "Age", "Salary", "Role"]
df3 = spark.createDataFrame(data3, schema=columns3)
df4 = spark.createDataFrame(data4, schema=columns4)
multi_col_join_df = df3.join(df4, on=["Department", "Age", "Salary"], how="inner")
multi_col_join_df.show()
Output:
+-----+---+----------+------+--------+
| Name|Age|Department|Salary| Role|
+-----+---+----------+------+--------+
|Alice| 34| HR| 3000| Manager|
| Bob| 45| IT| 4000|Developer|
+-----+---+----------+------+--------+