Absolutely! Letβs break down select()
, withColumn()
, and expr()
in PySpark β with supercharged examples that combine:
- Column creation
- Complex logic
- Conditional expressions
- Regex, math, string ops
- Cross-column logic
- SQL-style expressions inside code
πΉ 1. select()
β Used to pick or transform columns
df.select("name", "salary", (F.col("salary") * 0.10).alias("bonus")).show()
β
select()
is pure projection β useful when you want to create a new DataFrame with selected and derived columns.
πΉ 2. withColumn()
β Used to add or replace a column
df.withColumn("bonus", F.col("salary") * 0.10).show()
β
withColumn()
modifies the existing DataFrame by adding a new column or replacing an existing one.
πΉ 3. expr()
β Used for SQL-style expressions in DataFrame code
df.selectExpr("name", "salary", "salary * 0.10 as bonus").show()
df.withColumn("bonus", F.expr("salary * 0.10")).show()
β
expr()
and selectExpr()
allow complex SQL-style syntax directly inside PySpark.
π Super Examples
β
1. Conditional Bonus (with expr
)
df.withColumn("bonus",
F.expr("CASE WHEN salary > 10000 THEN salary * 0.2 ELSE salary * 0.1 END")
).show()
β 2. Select + Cast + Format + Rename
df.select(
F.col("salary").cast("double").alias("salary_dbl"),
F.date_format("hire_date", "MMM-yyyy").alias("hired_month"),
F.upper("dept").alias("DEPT_CAPS")
).show()
β
3. Create Score Category using withColumn() + when()
df.withColumn("grade",
F.when(F.col("score") >= 90, "A")
.when(F.col("score") >= 80, "B")
.otherwise("C")
).show()
β
4. Regex Column Extraction (withColumn
+ regexp_extract
)
df.withColumn("region_code", F.regexp_extract(F.col("address"), r"(\\d{2})$", 1)).show()
β
5. Cross-Column Logic (compute total = base + tax - discount
)
df.withColumn("total", F.col("base") + F.col("tax") - F.col("discount")).show()
β
6. Use selectExpr()
for Compact SQL Style
df.selectExpr(
"name",
"salary",
"salary * 0.1 AS bonus",
"CASE WHEN dept = 'HR' THEN 1 ELSE 0 END as is_hr"
).show()
β
7. Date Logic using expr
df.withColumn("last_30_days_flag", F.expr("date >= date_sub(current_date(), 30)")).show()
β
8. Chain Multiple withColumn()
Calls
df.withColumn("bonus", F.col("salary") * 0.1) \
.withColumn("after_tax", F.col("bonus") * 0.7) \
.withColumn("flag", F.when(F.col("after_tax") > 1000, "High").otherwise("Low")) \
.show()
β
9. Null-safe Expressions with expr
df.withColumn("safe_sum", F.expr("coalesce(base, 0) + coalesce(tax, 0)")).show()
β
10. Dynamically Rename All Columns with select()
df.select([F.col(c).alias(f"{c}_new") for c in df.columns]).show()
π§ When to Use What?
Use Case | Use |
---|---|
Create/Replace column | withColumn() |
Return new DataFrame with selected columns | select() |
SQL-style expressions | expr() or selectExpr() |
Loop or rename dynamically | select() with list comprehension |
Multiple expressions in one call | selectExpr() is concise |
Awesome! Here’s your extended guide to most-used PySpark DataFrame operations β including select
, withColumn
, expr
, and a ton of practical examples β all grouped by category:
π₯ Master PySpark Operations β With Super Examples
πΉ 1. select() β pick or transform columns
df.select("id", F.col("amount") * 1.2).alias("adjusted").show()
β Returns new DataFrame with specified columns
πΉ 2. withColumn() β add or overwrite column
df.withColumn("tax", F.col("amount") * 0.18).show()
β Adds or replaces a column (mutable style)
πΉ 3. expr() / selectExpr() β SQL-style column logic
df.selectExpr("id", "amount * 1.1 as adjusted", "CASE WHEN amount > 1000 THEN 'High' ELSE 'Low' END as risk").show()
β SQL syntax inside DataFrame code
πΉ 4. filter() / where() β row filtering
df.filter(F.col("amount") > 500).show()
df.where("amount > 500 AND status = 'PAID'").show()
β
Same functionality. filter()
= programmatic, where()
= SQL-style
πΉ 5. drop() / dropDuplicates()
df.drop("col1").dropDuplicates(["id", "month"]).show()
β Drop columns or deduplicate by subset
πΉ 6. withColumnRenamed()
df.withColumnRenamed("oldName", "newName").show()
β Rename a column
πΉ 7. groupBy() + agg()
df.groupBy("dept").agg(
F.sum("amount").alias("total"),
F.count("*").alias("count")
).show()
β Grouping + aggregate logic
πΉ 8. orderBy() / sort()
df.orderBy("amount").show()
df.orderBy(F.col("amount").desc()).show()
β Ascending/descending sorting
πΉ 9. distinct()
df.select("dept").distinct().show()
β Unique values
πΉ 10. join()
df1.join(df2, on="id", how="inner").show()
β
Supports inner
, left
, right
, outer
, cross
πΉ 11. union() / unionByName()
df1.union(df2).show()
df1.unionByName(df2).show()
β Combine rows of two DataFrames (schema should match)
πΉ 12. explode() β unnest array/struct
df.withColumn("tag", F.explode("tags")).show()
β Flatten arrays/structs into rows
πΉ 13. array(), struct(), lit(), when()
df.withColumn("new_struct", F.struct("id", "amount")) \
.withColumn("new_array", F.array("id", "amount")) \
.withColumn("flag", F.when(F.col("amount") > 1000, "High").otherwise("Low")).show()
β Combine fields or conditionally assign values
πΉ 14. isNull() / isNotNull() / fillna() / dropna()
df.filter(F.col("email").isNull()).show()
df.fillna(0, subset=["amount"]).show()
df.dropna(subset=["name", "dept"]).show()
β Null-safe handling
πΉ 15. cast() β type conversion
df.withColumn("amount_int", F.col("amount").cast("int")).show()
β Data type conversions
πΉ 16. repartition() / coalesce()
df.repartition(10).write.parquet("path") # Wider shuffle
df.coalesce(1).write.csv("out.csv") # Fewer files
β Control number of partitions before writing
πΉ 17. cache() / persist()
df.cache()
df.persist(StorageLevel.MEMORY_AND_DISK)
β Optimize re-use of DataFrames
πΉ 18. describe() / summary()
df.describe("amount", "salary").show()
df.summary("count", "mean", "min", "max").show()
β Basic statistics
πΉ 19. toPandas() (only for small datasets)
pdf = df.limit(1000).toPandas()
β Bring data to driver for display or ML use
πΉ 20. window() functions
from pyspark.sql.window import Window
w = Window.partitionBy("dept").orderBy("date")
df.withColumn("running_total", F.sum("amount").over(w)).show()
β Do rank, lead/lag, rolling stats within groups
πΉ 21. regexp_extract / replace / split (Regex magic)
df.withColumn("clean_code", F.regexp_extract("text", r"(\\d{3})-(\\d{2})", 1)).show()
β Pattern matching and string processing
Perfect. Here’s the extended, interview-ready PySpark operations cheat sheet, now tailored for:
β Use in Interviews Β· ETL Pipelines Β· Notebooks Β· Metadata Frameworks
Weβll go over:
1. π Real-life use case with logic
2. π§ What interviewers expect
3. πΌ ETL-friendly expression
4. π οΈ Reusable in Metadata Framework
πΉ 1. Column Logic: withColumn
, select
, expr
β Example Use Case:
Add 18% GST to amount if item type is βTAXABLEβ
df.withColumn("final_price",
F.when(F.col("type") == "TAXABLE", F.col("amount") * 1.18)
.otherwise(F.col("amount"))
)
π§ Interview Q: How do you apply conditional logic across columns?
πΌ ETL: Can be expressed via metadata like:
{
"column_name": "final_price",
"expression": "CASE WHEN type = 'TAXABLE' THEN amount * 1.18 ELSE amount END"
}
π οΈ Metadata layer: Execute using F.expr(metadata["expression"])
πΉ 2. Filtering Logic
β Use Case:
Only keep active users in the last 12 months
df.filter(
F.col("status") == "ACTIVE"
).filter(
F.col("last_login") >= F.add_months(F.current_date(), -12)
)
π§ Interview Q: How would you implement rolling time windows?
πΌ Use in A/B retention, churn logic.
π οΈ Use metadata["filter_condition"] = "status = 'ACTIVE' AND last_login >= add_months(current_date(), -12)"
πΉ 3. Dynamic Aggregations with GroupBy
β Example:
agg_expr = {
"amount": "sum",
"bonus": "avg",
"region": "collect_set"
}
agg_df = df.groupBy("dept").agg(
*[getattr(F, func)(col).alias(f"{col}_{func}") for col, func in agg_expr.items()]
)
π§ Interview Q: How do you dynamically generate aggregates?
πΌ ETL: Flexible groupBy + metric configs
π οΈ Metadata JSON:
{
"group_by": ["dept"],
"metrics": {"amount": "sum", "bonus": "avg", "region": "collect_set"}
}
πΉ 4. Joins & Lookups
df.join(dim_region, on="region_code", how="left")
π§ Q: When to use broadcast
, skew
, z-order
?
πΌ Typical in dimension enrichment
π οΈ Metadata:
{
"join_type": "left",
"join_column": "region_code",
"lookup_table": "dim_region"
}
πΉ 5. Window Function with Row Enrichment
from pyspark.sql.window import Window
w = Window.partitionBy("customer").orderBy("date")
df.withColumn("running_total", F.sum("amount").over(w))
π§ Q: Difference between rowsBetween
vs rangeBetween
?
πΌ Used in revenue trending, cohort metrics
π οΈ Can be parameterized:
{
"partition_by": "customer",
"order_by": "date",
"window_function": "sum(amount)",
"window_type": "rows",
"frame": [-sys.maxsize, 0]
}
πΉ 6. Explode Arrays / Structs
df.withColumn("tag", F.explode("tags"))
π§ Q: When do you use explode vs inline
?
πΌ Used in JSON flattening, log processing
π οΈ Metadata flag:
{ "explode_column": "tags" }
πΉ 7. String Cleaning & Parsing
df.withColumn("cleaned", F.regexp_replace("email", r"[^a-zA-Z0-9@.]", ""))
π§ Q: Regex functions in PySpark?
πΌ Used in input cleansing
π οΈ Metadata mapping:
{
"column": "email",
"operation": "regexp_replace",
"pattern": "[^a-zA-Z0-9@.]",
"replacement": ""
}
πΉ 8. Schema Alignment for Safe Union
all_cols = list(set(df1.columns) | set(df2.columns))
df1_aligned = df1.select([F.col(c) if c in df1.columns else F.lit(None).alias(c) for c in all_cols])
df2_aligned = df2.select([F.col(c) if c in df2.columns else F.lit(None).alias(c) for c in all_cols])
df1_aligned.unionByName(df2_aligned)
π§ Q: How do you perform safe union across mismatched schemas?
πΌ ETL: Useful in multi-source pipeline
π οΈ Metadata: union list
πΉ 9. Dynamic selectExpr()
for Column Transformations
cols = ["id", "upper(name) as name_uc", "salary * 1.2 as updated_salary"]
df.selectExpr(*cols).show()
π§ Q: How do you parameterize transformation logic?
πΌ Declarative transformation via config
π οΈ Metadata example:
{
"select_exprs": ["id", "upper(name) as name_uc", "salary * 1.2 as updated_salary"]
}
πΉ 10. Date Filtering Tricks (Dynamic)
def filter_last_n_months(df, date_col, months=12):
start = F.trunc(F.add_months(F.current_date(), -months - 1), "MM")
end = F.last_day(F.add_months(F.current_date(), -1))
return df.filter((F.col(date_col) >= start) & (F.col(date_col) <= end))
π§ Q: How do you apply rolling logic for time windows?
πΌ Used in retention, churn, time-based KPIs
π οΈ Parameter:
{
"filter_type": "rolling_months",
"months_back": 12
}