Here’s a detailed guide on PySpark DataFrame column and row manipulation with useful implementations:
1. Column Manipulation in PySpark DataFrames
1.1 Renaming Columns
Rename a Single Column
df = df.withColumnRenamed("old_column", "new_column")
Rename Multiple Columns
new_column_names = {"old1": "new1", "old2": "new2"}
for old, new in new_column_names.items():
df = df.withColumnRenamed(old, new)
Add a Suffix to All Column Names
df = df.toDF(*[col + "_v1" for col in df.columns])
1.2 Checking Data Types
Check Data Type of a Specific Column
print(df.schema["column_name"].dataType)
Get Data Types of All Columns
df.dtypes # Returns a list of (column_name, data_type)
Check Schema of DataFrame
df.printSchema()
1.3 Apply Dynamic Logic to All Columns
Example: Trim All String Columns
from pyspark.sql.functions import col, trim
df = df.select([trim(col(c)).alias(c) if dtype == "string" else col(c) for c, dtype in df.dtypes])
Example: Convert All Integer Columns to Double
from pyspark.sql.functions import col
df = df.select([col(c).cast("double") if dtype == "int" else col(c) for c, dtype in df.dtypes])
Example: Replace Nulls in All String Columns with “Unknown”
df = df.fillna("Unknown")
2. Row-Based DataFrame Manipulation
2.1 Collecting Rows One by One
Convert DataFrame to a List of Rows
rows = df.collect()
for row in rows:
print(row)
Using toLocalIterator()
for Large DataFrames (Efficient)
for row in df.toLocalIterator():
print(row)
2.2 Filtering Rows
Filter Rows Based on a Condition
df_filtered = df.filter(df["Age"] > 30)
Filter Multiple Conditions
df_filtered = df.filter((df["Age"] > 30) & (df["Gender"] == "Male"))
2.3 Sorting Rows
df_sorted = df.orderBy("Age", ascending=False)
2.4 Adding a New Row (Using Union)
from pyspark.sql import Row
new_row = Row(ID=100, Name="John Doe", Age=40)
df_new = df.union(spark.createDataFrame([new_row], df.schema))
3. Useful Implementations
3.1 Finding Duplicate Rows
df.groupBy(df.columns).count().filter("count > 1").show()
3.2 Removing Duplicate Rows
df = df.dropDuplicates()
3.3 Adding a New Column Dynamically
from pyspark.sql.functions import lit
df = df.withColumn("NewColumn", lit("DefaultValue"))
Conclusion
- PySpark allows flexible column manipulations like renaming, checking types, and applying transformations.
- Row operations like filtering, sorting, and iterating can be done efficiently.
- Collecting data should be handled carefully to avoid memory overload.
- Dynamic transformations make it easy to process large datasets.
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import col, trim, lit
# Initialize Spark Session
spark = SparkSession.builder.appName("DataFrame_Manipulation").getOrCreate()
# Sample Data
data = [(1, "Alice", 25, "Engineer"), (2, "Bob", 30, "Doctor"), (3, "Charlie", 35, "Teacher")]
df = spark.createDataFrame(data, ["ID", "Name", "Age", "Occupation"])
# 1. Column Manipulation
# Rename a Single Column
df = df.withColumnRenamed("Occupation", "Job")
# Rename Multiple Columns
column_rename_map = {"ID": "UserID", "Name": "FullName"}
for old, new in column_rename_map.items():
df = df.withColumnRenamed(old, new)
# Add a Suffix to All Columns
df = df.toDF(*[col + "_v1" for col in df.columns])
# Check Data Types
df.printSchema()
# Apply Dynamic Logic to All Columns: Trim All String Columns
df = df.select([trim(col(c)).alias(c) if dtype == "string" else col(c) for c, dtype in df.dtypes])
# Convert All Integer Columns to Double
df = df.select([col(c).cast("double") if dtype == "int" else col(c) for c, dtype in df.dtypes])
# 2. Row-Based Manipulation
# Collect Rows One by One
for row in df.collect():
print(row)
# Efficient Row Iteration
for row in df.toLocalIterator():
print(row)
# Filtering Rows
df_filtered = df.filter((df["Age_v1"] > 28.0) & (df["FullName_v1"] != "Bob"))
df_filtered.show()
# Sorting Rows
df_sorted = df.orderBy("Age_v1", ascending=False)
df_sorted.show()
# Adding a New Row
df_new = df.union(spark.createDataFrame([Row(UserID_v1=4.0, FullName_v1="David", Age_v1=40.0, Job_v1="Scientist")], df.schema))
df_new.show()
# Finding Duplicate Rows
df.groupBy(df.columns).count().filter("count > 1").show()
# Removing Duplicate Rows
df = df.dropDuplicates()
df.show()
# Adding a New Column Dynamically
df = df.withColumn("NewColumn_v1", lit("DefaultValue"))
df.show()
# Stop Spark Session
spark.stop()
Most Important PySpark DataFrame Transformation Operations 🚀
PySpark transformations are lazy operations that create a new DataFrame without modifying the original one. Here are the most essential transformation operations in PySpark:
1. Selecting and Renaming Columns
Select Specific Columns
df_selected = df.select("column1", "column2")
Rename Columns
df_renamed = df.withColumnRenamed("old_col", "new_col")
2. Filtering Data
Filter Based on Condition
df_filtered = df.filter(df["age"] > 25)
Multiple Conditions
df_filtered = df.filter((df["age"] > 25) & (df["gender"] == "Male"))
3. Adding or Modifying Columns
Create a New Column
from pyspark.sql.functions import lit
df_new = df.withColumn("new_column", lit("default_value"))
Modify Existing Column
df_modified = df.withColumn("salary_incremented", df["salary"] * 1.10)
4. Dropping Columns
df_dropped = df.drop("column_to_remove")
5. Handling Missing Data
Fill Missing Values
df_filled = df.fillna({"age": 0, "name": "Unknown"})
Drop Rows with Nulls
df_cleaned = df.dropna()
6. Aggregations & Grouping
Group By and Aggregate
from pyspark.sql.functions import sum, avg, count
df_grouped = df.groupBy("department").agg(sum("salary").alias("total_salary"), avg("age"))
Count Distinct Values
df.select("department").distinct().count()
7. Sorting Data
df_sorted = df.orderBy("age", ascending=False)
8. Joining DataFrames
Inner Join
df_joined = df1.join(df2, df1["id"] == df2["id"], "inner")
Left Join
df_left = df1.join(df2, df1["id"] == df2["id"], "left")
9. Union (Appending DataFrames)
df_combined = df1.union(df2)
10. Exploding Nested Data
from pyspark.sql.functions import explode
df_exploded = df.withColumn("exploded_column", explode(df["nested_column"]))
Conclusion
- These transformations do not modify the original DataFrame but return a new one.
- PySpark applies lazy evaluation, meaning transformations are only executed when an action is performed.
Most Useful PySpark DataFrame Functions
PySpark provides many built-in functions for column aliasing, distinct values, transformations, and aggregations. Here’s a collection of the most useful ones:
1. Column Aliasing (alias()
)
- Used to rename a column temporarily within a query.
from pyspark.sql.functions import col
df_alias = df.select(col("name").alias("full_name"), col("age"))
df_alias.show()
2. Removing Duplicates (distinct()
)
- Removes duplicate rows from the DataFrame.
df_distinct = df.distinct()
df_distinct.show()
- Count distinct values in a column:
df.select("department").distinct().count()
3. Filtering Data (filter()
& where()
)
- Using
.filter()
:
df_filtered = df.filter(df["age"] > 25)
- Using
.where()
(same asfilter
but SQL-like syntax):
df_filtered = df.where("age > 25")
4. Column Operations
withColumn()
– Create or Modify Columns
from pyspark.sql.functions import lit
df_new = df.withColumn("new_column", lit("default_value"))
cast()
– Change Data Type
df_casted = df.withColumn("salary", df["salary"].cast("double"))
5. Aggregations
groupBy()
with Aggregations
from pyspark.sql.functions import sum, avg, count
df_grouped = df.groupBy("department").agg(
sum("salary").alias("total_salary"),
avg("age").alias("average_age")
)
df_grouped.show()
6. Sorting (orderBy()
)
df_sorted = df.orderBy("age", ascending=False)
df_sorted.show()
7. Joins
df_joined = df1.join(df2, df1["id"] == df2["id"], "inner")
8. Exploding Nested Data (explode()
)
from pyspark.sql.functions import explode
df_exploded = df.withColumn("exploded_column", explode(df["nested_column"]))
df_exploded.show()
9. Collecting Rows
rows = df.collect()
for row in rows:
print(row)
10. Row Numbering & Ranking
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec = Window.partitionBy("department").orderBy("salary")
df_ranked = df.withColumn("rank", row_number().over(windowSpec))
df_ranked.show()
Conclusion
.alias()
is useful for renaming columns temporarily..distinct()
removes duplicates..filter()
and.where()
allow conditional selection..groupBy()
and.orderBy()
are useful for aggregations and sorting.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, sum, avg, count, explode, row_number
from pyspark.sql.window import Window
# Initialize Spark Session
spark = SparkSession.builder.appName("PySpark_Useful_Functions").getOrCreate()
# Sample Data
data = [(1, "Alice", 25, "HR", 50000),
(2, "Bob", 30, "IT", 60000),
(3, "Charlie", 35, "IT", 70000),
(4, "David", 40, "Finance", 80000),
(5, "Eve", 45, "Finance", 90000)]
columns = ["ID", "Name", "Age", "Department", "Salary"]
df = spark.createDataFrame(data, columns)
# 1. Alias (Renaming Columns Temporarily)
df_alias = df.select(col("Name").alias("Full_Name"), col("Age"))
df_alias.show()
# 2. Distinct (Remove Duplicates)
df_distinct = df.select("Department").distinct()
df_distinct.show()
# 3. Filtering Data
df_filtered = df.filter((df["Age"] > 30) & (df["Department"] == "IT"))
df_filtered.show()
# 4. Adding & Modifying Columns
df_new = df.withColumn("New_Column", lit("DefaultValue"))
df_casted = df.withColumn("Salary", df["Salary"].cast("double"))
df_new.show()
df_casted.printSchema()
# 5. Aggregations (Sum, Average, Count)
df_grouped = df.groupBy("Department").agg(
sum("Salary").alias("Total_Salary"),
avg("Age").alias("Average_Age")
)
df_grouped.show()
# 6. Sorting
df_sorted = df.orderBy("Age", ascending=False)
df_sorted.show()
# 7. Joining DataFrames
extra_data = [(1, "US"), (2, "Canada"), (3, "UK"), (4, "Germany"), (5, "India")]
columns_extra = ["ID", "Country"]
df_extra = spark.createDataFrame(extra_data, columns_extra)
df_joined = df.join(df_extra, "ID", "inner")
df_joined.show()
# 8. Exploding Nested Data
df_nested = df.withColumn("Hobbies", lit("['Reading', 'Sports']"))
df_exploded = df_nested.withColumn("Hobby", explode(lit(["Reading", "Sports"])))
df_exploded.show()
# 9. Collecting Rows
rows = df.collect()
for row in rows:
print(row)
# 10. Row Numbering & Ranking
windowSpec = Window.partitionBy("Department").orderBy("Salary")
df_ranked = df.withColumn("Rank", row_number().over(windowSpec))
df_ranked.show()
# Stop Spark Session
spark.stop()
Leave a Reply