Window functions in PySpark allow you to perform operations on a subset of your data using a “window” that defines a range of rows. These functions are similar to SQL window functions and are useful for tasks like ranking, cumulative sums, and moving averages. Let’s go through various PySpark DataFrame window functions, compare them with Spark SQL window functions, and provide examples with a large sample dataset.
PySpark’s window functions allow operations across a specified “window” of rows, such as performing aggregations, ranking, or comparisons. The functionality mimics SQL window functions but uses PySpark’s syntax.
Syntax Structure
- Define a Window Specification: The
Window
object specifies how rows are partitioned and ordered for the operation.from pyspark.sql.window import Window window_spec = Window.partitionBy("column1").orderBy("column2")
- Apply the Window Function: Use PySpark functions like
row_number()
,rank()
,dense_rank()
, etc., with the window specification.from pyspark.sql.functions import row_number, rank, dense_rank, sum df.withColumn("row_num", row_number().over(window_spec))
Window Specification Options
Option | Description | Syntax |
---|---|---|
partitionBy() | Divides the data into partitions for independent calculations. | Window.partitionBy("column1") |
orderBy() | Specifies the order of rows within each partition. | Window.orderBy("column2") |
rowsBetween() | Defines a window frame by rows relative to the current row. | .rowsBetween(-1, 1) |
rangeBetween() | Defines a window frame based on the range of values in the ordering column. | .rangeBetween(-10, 10) |
unboundedPreceding | Indicates all rows before the current row in the partition. | Window.rowsBetween(Window.unboundedPreceding, 0) |
unboundedFollowing | Indicates all rows after the current row in the partition. | Window.rowsBetween(0, Window.unboundedFollowing) |
currentRow | Refers to the current row in the partition. | Window.rowsBetween(Window.currentRow, Window.currentRow) |
Common PySpark Window Functions
Function | Description |
---|---|
row_number() | Assigns a unique number to each row in a window. |
rank() | Assigns a rank to each row, with gaps for ties. |
dense_rank() | Assigns a rank to each row, without gaps for ties. |
ntile(n) | Divides rows into n buckets and assigns a bucket number to each row. |
lead(column, n) | Returns the value of the column from n rows ahead of the current row. |
lag(column, n) | Returns the value of the column from n rows behind the current row. |
first() | Returns the first value in the window frame. |
last() | Returns the last value in the window frame. |
sum() | Computes the sum of the column over the window frame. |
avg() | Computes the average of the column over the window frame. |
max() | Returns the maximum value of the column over the window frame. |
min() | Returns the minimum value of the column over the window frame. |
count() | Returns the count of rows in the window frame. |
Examples
1. Ranking Employees by Salary
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank
data = [(1, "Alice", 5000), (2, "Bob", 6000), (3, "Charlie", 4000), (4, "Alice", 7000)]
columns = ["EmpID", "Name", "Salary"]
df = spark.createDataFrame(data, columns)
window_spec = Window.partitionBy("Name").orderBy("Salary")
df = df.withColumn("row_number", row_number().over(window_spec)) \
.withColumn("rank", rank().over(window_spec)) \
.withColumn("dense_rank", dense_rank().over(window_spec))
df.show()
Output:
EmpID | Name | Salary | row_number | rank | dense_rank |
---|---|---|---|---|---|
3 | Charlie | 4000 | 1 | 1 | 1 |
1 | Alice | 5000 | 1 | 1 | 1 |
4 | Alice | 7000 | 2 | 2 | 2 |
2 | Bob | 6000 | 1 | 1 | 1 |
2. Cumulative Sum
from pyspark.sql.functions import sum
window_spec = Window.partitionBy("Name").orderBy("Salary").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn("cumulative_sum", sum("Salary").over(window_spec))
df.show()
Output:
EmpID | Name | Salary | cumulative_sum |
---|---|---|---|
3 | Charlie | 4000 | 4000 |
1 | Alice | 5000 | 5000 |
4 | Alice | 7000 | 12000 |
2 | Bob | 6000 | 6000 |
Options for Handling NULLs
- Exclude NULLs in Order: Use
NULLS FIRST
orNULLS LAST
inorderBy()
.Window.orderBy(col("Salary").desc().asc_nulls_last())
- Filter NULLs in Partition: Use
.filter()
before applying the window function.df.filter(col("Salary").isNotNull())
Important Notes
- PartitionBy: Breaks data into logical groups for independent calculations.
- OrderBy: Determines the order within each partition.
- Frame Specification: Allows cumulative, rolling, or specific-frame calculations using
rowsBetween
orrangeBetween
Setting Up the Environment
First, let’s set up the environment and create a sample dataset.
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, rank, dense_rank, percent_rank, ntile, lag, lead, sum, avg
# Initialize Spark session
spark = SparkSession.builder
.appName("PySpark Window Functions")
.getOrCreate()
# Create a sample dataset
data = [(1, "Alice", 1000),
(2, "Bob", 1200),
(3, "Catherine", 1200),
(4, "David", 800),
(5, "Eve", 950),
(6, "Frank", 800),
(7, "George", 1200),
(8, "Hannah", 1000),
(9, "Ivy", 950),
(10, "Jack", 1200)]
columns = ["id", "name", "salary"]
df = spark.createDataFrame(data, schema=columns)
df.show()
PySpark Window Functions
1. Row Number
The row_number
function assigns a unique number to each row within a window partition.
windowSpec = Window.partitionBy("salary").orderBy("id")
df.withColumn("row_number", row_number().over(windowSpec)).show()
2. Rank
The rank
function provides ranks to rows within a window partition, with gaps in ranking.
df.withColumn("rank", rank().over(windowSpec)).show()
3. Dense Rank
The dense_rank
function provides ranks to rows within a window partition, without gaps in ranking.
df.withColumn("dense_rank", dense_rank().over(windowSpec)).show()
4. Percent Rank
The percent_rank
function calculates the percentile rank of rows within a window partition.
df.withColumn("percent_rank", percent_rank().over(windowSpec)).show()
5. NTile
The ntile
function divides the rows within a window partition into n
buckets.
df.withColumn("ntile", ntile(4).over(windowSpec)).show()
6. Lag
The lag
function provides access to a row at a given physical offset before the current row within a window partition.
df.withColumn("lag", lag("salary", 1).over(windowSpec)).show()
7. Lead
The lead
function provides access to a row at a given physical offset after the current row within a window partition.
df.withColumn("lead", lead("salary", 1).over(windowSpec)).show()
8. Cumulative Sum
The sum
function calculates the cumulative sum of values within a window partition.
df.withColumn("cumulative_sum", sum("salary").over(windowSpec)).show()
9. Moving Average
The avg
function calculates the moving average of values within a window partition.
df.withColumn("moving_avg", avg("salary").over(windowSpec)).show()
There are multiple ways to apply window functions on DataFrames in PySpark. While withColumn
is the most commonly used method to add a new column with a window function, there are other approaches to apply window functions, depending on the specific use case.
Here are different methods for applying window functions:
1. Using select()
with window functions
Instead of withColumn()
, you can use select()
to directly apply a window function to the columns of the DataFrame. This is useful when you only want to return a subset of columns along with the windowed column.
from pyspark.sql.functions import row_number
from pyspark.sql import Window
# Define the window specification
windowSpec = Window.partitionBy("salary").orderBy("id")
# Use select to apply the window function
df.select("id", "salary", row_number().over(windowSpec).alias("row_number")).show()
2. Using agg()
with window functions
Window functions can also be applied when performing aggregations (agg()
). This is useful when you want to calculate aggregated metrics (e.g., sum, avg) over a window.
from pyspark.sql.functions import sum
from pyspark.sql import Window
# Define the window specification
windowSpec = Window.partitionBy("salary")
# Apply window function during aggregation
df.groupBy("id").agg(sum("salary").over(windowSpec).alias("total_salary")).show()
3. Using filter()
or where()
Sometimes, window functions are used in conjunction with filters to extract specific rows, such as filtering the first or last row per partition.
from pyspark.sql.functions import row_number
# Define the window specification
windowSpec = Window.partitionBy("salary").orderBy("id")
# Apply window function and filter based on the rank
ranked_df = df.withColumn("row_number", row_number().over(windowSpec))
ranked_df.filter(ranked_df["row_number"] == 1).show() # Filter to get the first row per partition
4. Using groupBy()
with window functions
Though groupBy()
is usually used for aggregations, you can combine it with window functions. Window functions won’t replace groupBy()
, but you can apply them after aggregations.
from pyspark.sql.functions import rank
# Define the window specification
windowSpec = Window.partitionBy("salary").orderBy("id")
# First, group by some column and then apply a window function
grouped_df = df.groupBy("salary").count()
grouped_df.withColumn("rank", rank().over(windowSpec)).show()
5. Using withColumnRenamed()
with window functions
You can also rename the result of a window function when adding it as a new column.
from pyspark.sql.functions import row_number
# Define the window specification
windowSpec = Window.partitionBy("salary").orderBy("id")
# Apply the window function and rename the column
df.withColumn("row_number", row_number().over(windowSpec)).withColumnRenamed("row_number", "rank").show()
6. Combining multiple window functions in one step
You can apply multiple window functions in a single step using either select()
or withColumn()
.
from pyspark.sql.functions import row_number, rank
# Define the window specification
windowSpec = Window.partitionBy("salary").orderBy("id")
# Apply multiple window functions
df.select("id", "salary",
row_number().over(windowSpec).alias("row_number"),
rank().over(windowSpec).alias("rank")
).show()
Comparison with Spark SQL Window Functions
All the above operations can also be performed using Spark SQL. Here are the equivalent SQL queries:
1. Row Number
SELECT id, name, salary,
ROW_NUMBER() OVER (PARTITION BY salary ORDER BY id) AS row_number
FROM df
2. Rank
SELECT id, name, salary,
RANK() OVER (PARTITION BY salary ORDER BY id) AS rank
FROM df
3. Dense Rank
SELECT id, name, salary,
DENSE_RANK() OVER (PARTITION BY salary ORDER BY id) AS dense_rank
FROM df
4. Percent Rank
SELECT id, name, salary,
PERCENT_RANK() OVER (PARTITION BY salary ORDER BY id) AS percent_rank
FROM df
5. NTile
SELECT id, name, salary,
NTILE(4) OVER (PARTITION BY salary ORDER BY id) AS ntile
FROM df
6. Lag
SELECT id, name, salary,
LAG(salary, 1) OVER (PARTITION BY salary ORDER BY id) AS lag
FROM df
7. Lead
SELECT id, name, salary,
LEAD(salary, 1) OVER (PARTITION BY salary ORDER BY id) AS lead
FROM df
8. Cumulative Sum
SELECT id, name, salary,
SUM(salary) OVER (PARTITION BY salary ORDER BY id ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_sum
FROM df
9. Moving Average
SELECT id, name, salary,
AVG(salary) OVER (PARTITION BY salary ORDER BY id ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS moving_avg
FROM df
Large Sample Dataset Example
Let’s create a larger dataset and apply window functions.
import random
# Create a larger dataset
large_data = [(i, f"Name_{i}", random.choice([1000, 1200, 950, 800])) for i in range(1, 101)]
large_df = spark.createDataFrame(large_data, schema=columns)
large_df.show(10)
# Apply window functions
large_windowSpec = Window.partitionBy("salary").orderBy("id")
large_df.withColumn("row_number", row_number().over(large_windowSpec)).show(10)
large_df.withColumn("rank", rank().over(large_windowSpec)).show(10)
large_df.withColumn("dense_rank", dense_rank().over(large_windowSpec)).show(10)
large_df.withColumn("percent_rank", percent_rank().over(large_windowSpec)).show(10)
large_df.withColumn("ntile", ntile(4).over(large_windowSpec)).show(10)
large_df.withColumn("lag", lag("salary", 1).over(large_windowSpec)).show(10)
large_df.withColumn("lead", lead("salary", 1).over(large_windowSpec)).show(10)
large_df.withColumn("cumulative_sum", sum("salary").over(large_windowSpec)).show(10)
large_df.withColumn("moving_avg", avg("salary").over(large_windowSpec)).show(10)
Window functions in PySpark and Spark SQL are powerful tools for data analysis. They allow you to perform complex calculations and transformations on subsets of your data, similar to SQL window functions. By using window functions, you can easily implement features like ranking, cumulative sums, and moving averages in your PySpark applications.
Examples:-
1.pyspark dataframes Remove duplicates based on specific columns and then order by different columns
To remove duplicates from a PySpark DataFrame based on specific columns and order the remaining rows by different columns, you can use a combination of the dropDuplicates()
function and the orderBy()
(or sort()
) function.
Here is an example that demonstrates this process:
- Remove duplicates based on specific columns.
- Order the resulting DataFrame by different columns.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Initialize SparkSession
spark = SparkSession.builder.appName("RemoveDuplicatesAndOrder").getOrCreate()
# Sample data
data = [
(1, "Alice", 29),
(2, "Bob", 30),
(3, "Alice", 29),
(4, "David", 35),
(5, "Alice", 25)
]
# Create DataFrame
columns = ["id", "name", "age"]
df = spark.createDataFrame(data, columns)
# Show the original DataFrame
print("Original DataFrame:")
df.show()
# Step 1: Remove duplicates based on specific columns (e.g., "name", "age")
df_no_duplicates = df.dropDuplicates(["name", "age"])
# Step 2: Order the resulting DataFrame by different columns (e.g., "age" in descending order)
df_ordered = df_no_duplicates.orderBy(col("age").desc())
# Show the resulting DataFrame
print("DataFrame after removing duplicates and ordering:")
df_ordered.show()
# Stop SparkSession
spark.stop()
Explanation:
- Initialization and Data Preparation:
- A
SparkSession
is created. - Sample data is provided, and a DataFrame is created from this data.
- A
- Removing Duplicates:
- The
dropDuplicates()
function is used to remove rows that have the same values in the specified columns ("name"
and"age"
in this case).
- The
- Ordering Data:
- The
orderBy()
function is used to sort the DataFrame by the specified columns. In this case, the DataFrame is ordered by"age"
in descending order.
- The
- Displaying Results:
- The original and resulting DataFrames are displayed using the
show()
function.
- The original and resulting DataFrames are displayed using the
Example Output:
Original DataFrame:
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|Alice| 29|
| 2| Bob| 30|
| 3|Alice| 29|
| 4|David| 35|
| 5|Alice| 25|
+---+-----+---+
DataFrame after removing duplicates and ordering:
+---+-----+---+
| id| name|age|
+---+-----+---+
| 4|David| 35|
| 2| Bob| 30|
| 1|Alice| 29|
| 5|Alice| 25|
+---+-----+---+
Additional Notes:
- dropDuplicates(): This function removes duplicate rows based on the specified columns. If no columns are specified, it removes rows that are identical in all columns.
- orderBy()/sort(): These functions are used to order the DataFrame. You can specify multiple columns and the sorting order (ascending or descending) for each column.
- You can chain multiple DataFrame operations together. For example, you can combine
dropDuplicates()
andorderBy()
in a single statement:df.dropDuplicates(["name", "age"]).orderBy(col("age").desc())
2.pyspark dataframes Remove duplicates based on specific columns while ordering by different columns
To remove duplicates from a PySpark DataFrame based on specific columns while ensuring the ordering of the data based on other columns, you can use the window
functions in PySpark. This approach allows you to specify how to handle duplicates and maintain the desired order.
Below is an example to demonstrate how to remove duplicates based on specific columns (name
, age
) while ordering the rows by different columns (age
descending and id
ascending):
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
# Initialize SparkSession
spark = SparkSession.builder.appName("RemoveDuplicatesWithOrdering").getOrCreate()
# Sample data
data = [
(1, "Alice", 29),
(2, "Bob", 30),
(3, "Alice", 29),
(4, "David", 35),
(5, "Alice", 25)
]
# Create DataFrame
columns = ["id", "name", "age"]
df = spark.createDataFrame(data, columns)
# Show the original DataFrame
print("Original DataFrame:")
df.show()
# Define a window specification
windowSpec = Window.partitionBy("name", "age").orderBy(col("age").desc(), col("id").asc())
# Add a row number to each partition
df_with_row_number = df.withColumn("row_number", row_number().over(windowSpec))
# Filter the rows to keep only the first occurrence of each combination of name and age
df_no_duplicates = df_with_row_number.filter(col("row_number") == 1).drop("row_number")
# Show the resulting DataFrame
print("DataFrame after removing duplicates and ordering:")
df_no_duplicates.show()
# Stop SparkSession
spark.stop()
Explanation:
- Initialization and Data Preparation:
- A
SparkSession
is created. - Sample data is provided, and a DataFrame is created from this data.
- A
- Define Window Specification:
- A window specification is defined using
Window.partitionBy("name", "age").orderBy(col("age").desc(), col("id").asc())
. This specifies that the data should be partitioned by the columnsname
andage
and ordered within each partition byage
in descending order andid
in ascending order.
- A window specification is defined using
- Add Row Number:
- The
row_number()
function is used to add a row number to each row within the specified window. This row number helps to identify and keep only the first occurrence of each partition.
- The
- Filter Rows:
- The DataFrame is filtered to keep only the rows where the
row_number
is 1. This effectively removes duplicates while maintaining the specified order.
- The DataFrame is filtered to keep only the rows where the
- Display Results:
- The original and resulting DataFrames are displayed using the
show()
function.
- The original and resulting DataFrames are displayed using the
Example Output:
Original DataFrame:
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|Alice| 29|
| 2| Bob| 30|
| 3|Alice| 29|
| 4|David| 35|
| 5|Alice| 25|
+---+-----+---+
DataFrame after removing duplicates and ordering:
+---+-----+---+
| id| name|age|
+---+-----+---+
| 4|David| 35|
| 2| Bob| 30|
| 1|Alice| 29|
| 5|Alice| 25|
+---+-----+---+
Notes:
- Window Specification: The
Window
specification defines how the data should be partitioned and ordered. - Row Number: The
row_number()
function assigns a unique row number within each window partition. - Filter and Drop: The resulting DataFrame is filtered to keep only the rows where the
row_number
is 1, and the temporaryrow_number
column is dropped.
This approach ensures that duplicates are removed based on the specified columns while maintaining the desired order of the data. You can adjust the partitioning and ordering columns according to your specific requirements.
Codes for Revising Pyspark Windows
# Sample data
data = [
(1, "Alice", 10, 8000, "New York"),
(2, "Bob", 11, 9000, "New York"),
(3, "Charlie", 10, 10000, "Chicago"),
(4, "David", 12, 9000, "New York"),
(6, "Eve", 13, 9000, "Chicago"),
(7, "GEve", 13, 10000, "Chicago"),
(8, "REve", 13, 5000, "Chicago"),
(9, "ScEve", 14, 5600, "LA"),
(10, "DEve", 15, 11000, "LA"),
(11, "Ram", 14, 11000, "LA"),
(12, "Hem", 10, 8000, "LA"),
(13, "Hith", 11, 6000, "Chicago"),
(14, "Pit", 15, 13000, "Chicago"),
(15, "Evelyn", 15, 14000, "New York"),
(16, "FteEve", 12, 9200, "New York"),
(17, "sctuEve", 12, None, "Chicago"),
]
# Define schema
columns = ["EmpID", "Emp_name", "Manager_id", "Salary", "Location"]
df = spark.createDataFrame(data, schema=columns)
df.show()
from pyspark.sql.functions import row_number, rank, dense_rank
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, col
wf=Window.partitionBy("Location").orderBy("Salary")
# Calculate row_number, rank, and dense_rank separately
row_number_col = row_number().over(wf).alias("row_number")
rank_col = rank().over(wf).alias("rank")
dense_rank_col = dense_rank().over(wf).alias("dense_rank")
# Select columns including calculated window function results
df.select(
"EmpID",
"Emp_name",
"Manager_id",
"salary",
"Location",
row_number_col,
rank_col,
dense_rank_col
).show()
df.select(
"EmpID",
"Emp_name",
"Manager_id",
"salary",
"Location",
row_number().over(wf).alias("row_number"),
rank().over(wf).alias("rank"),
dense_rank().over(wf).alias("dense_rank")
).show()
#Using withColumn with window functions
df.withColumn("row_number", row_number().over(wf)) \
.withColumn("rank", rank().over(wf)) \
.withColumn("dense_rank", dense_rank().over(wf)) \
.show()
#Using selectExpr with window functions
df.selectExpr(
"EmpID",
"Emp_name",
"Manager_id",
"salary",
"Location",
"row_number() OVER (PARTITION BY Location ORDER BY Salary) AS row_number", # Define window here
"rank() OVER (PARTITION BY Location ORDER BY Salary) AS rank", # Define window here
"dense_rank() OVER (PARTITION BY Location ORDER BY Salary) AS dense_rank" # Define window here
).show()
#Using withColumn with window functions and chaining
df.withColumn("row_number", row_number().over(wf)) \
.withColumn("rank", rank().over(wf)) \
.withColumn("dense_rank", dense_rank().over(wf)) \
.drop("salary") \
.filter(col("row_number") == 1) \
.show()
df.createOrReplaceTempView("dfview")
spark.sql(""" select EmpID,Emp_name,Manager_id,salary,Location,row_number() OVER (PARTITION BY Location ORDER BY Salary) AS row_number,
rank() OVER (PARTITION BY Location ORDER BY Salary) AS rank,dense_rank() OVER (PARTITION BY Location ORDER BY Salary) AS dense_rank
from dfview """ ) .show()
spark.sql("""
SELECT EmpID, Emp_name, Manager_id, salary, Location,
row_number() OVER w AS row_number,
rank() OVER w AS rank,
dense_rank() OVER w AS dense_rank
FROM dfview
WINDOW w AS (PARTITION BY Location ORDER BY Salary)
""").show()
Leave a Reply