PySpark Coding Practice Questions

Absolutely! Let’s break down select(), withColumn(), and expr() in PySpark β€” with supercharged examples that combine:

  • Column creation
  • Complex logic
  • Conditional expressions
  • Regex, math, string ops
  • Cross-column logic
  • SQL-style expressions inside code

πŸ”Ή 1. select() β†’ Used to pick or transform columns

df.select("name", "salary", (F.col("salary") * 0.10).alias("bonus")).show()

βœ… select() is pure projection β€” useful when you want to create a new DataFrame with selected and derived columns.


πŸ”Ή 2. withColumn() β†’ Used to add or replace a column

df.withColumn("bonus", F.col("salary") * 0.10).show()

βœ… withColumn() modifies the existing DataFrame by adding a new column or replacing an existing one.


πŸ”Ή 3. expr() β†’ Used for SQL-style expressions in DataFrame code

df.selectExpr("name", "salary", "salary * 0.10 as bonus").show()
df.withColumn("bonus", F.expr("salary * 0.10")).show()

βœ… expr() and selectExpr() allow complex SQL-style syntax directly inside PySpark.


πŸš€ Super Examples


βœ… 1. Conditional Bonus (with expr)

df.withColumn("bonus",
    F.expr("CASE WHEN salary > 10000 THEN salary * 0.2 ELSE salary * 0.1 END")
).show()

βœ… 2. Select + Cast + Format + Rename

df.select(
    F.col("salary").cast("double").alias("salary_dbl"),
    F.date_format("hire_date", "MMM-yyyy").alias("hired_month"),
    F.upper("dept").alias("DEPT_CAPS")
).show()

βœ… 3. Create Score Category using withColumn() + when()

df.withColumn("grade",
    F.when(F.col("score") >= 90, "A")
     .when(F.col("score") >= 80, "B")
     .otherwise("C")
).show()

βœ… 4. Regex Column Extraction (withColumn + regexp_extract)

df.withColumn("region_code", F.regexp_extract(F.col("address"), r"(\\d{2})$", 1)).show()

βœ… 5. Cross-Column Logic (compute total = base + tax - discount)

df.withColumn("total", F.col("base") + F.col("tax") - F.col("discount")).show()

βœ… 6. Use selectExpr() for Compact SQL Style

df.selectExpr(
    "name",
    "salary",
    "salary * 0.1 AS bonus",
    "CASE WHEN dept = 'HR' THEN 1 ELSE 0 END as is_hr"
).show()

βœ… 7. Date Logic using expr

df.withColumn("last_30_days_flag", F.expr("date >= date_sub(current_date(), 30)")).show()

βœ… 8. Chain Multiple withColumn() Calls

df.withColumn("bonus", F.col("salary") * 0.1) \
  .withColumn("after_tax", F.col("bonus") * 0.7) \
  .withColumn("flag", F.when(F.col("after_tax") > 1000, "High").otherwise("Low")) \
  .show()

βœ… 9. Null-safe Expressions with expr

df.withColumn("safe_sum", F.expr("coalesce(base, 0) + coalesce(tax, 0)")).show()

βœ… 10. Dynamically Rename All Columns with select()

df.select([F.col(c).alias(f"{c}_new") for c in df.columns]).show()

🧠 When to Use What?

Use CaseUse
Create/Replace columnwithColumn()
Return new DataFrame with selected columnsselect()
SQL-style expressionsexpr() or selectExpr()
Loop or rename dynamicallyselect() with list comprehension
Multiple expressions in one callselectExpr() is concise

Awesome! Here’s your extended guide to most-used PySpark DataFrame operations β€” including select, withColumn, expr, and a ton of practical examples β€” all grouped by category:


πŸ”₯ Master PySpark Operations – With Super Examples


πŸ”Ή 1. select() – pick or transform columns

df.select("id", F.col("amount") * 1.2).alias("adjusted").show()

βœ… Returns new DataFrame with specified columns


πŸ”Ή 2. withColumn() – add or overwrite column

df.withColumn("tax", F.col("amount") * 0.18).show()

βœ… Adds or replaces a column (mutable style)


πŸ”Ή 3. expr() / selectExpr() – SQL-style column logic

df.selectExpr("id", "amount * 1.1 as adjusted", "CASE WHEN amount > 1000 THEN 'High' ELSE 'Low' END as risk").show()

βœ… SQL syntax inside DataFrame code


πŸ”Ή 4. filter() / where() – row filtering

df.filter(F.col("amount") > 500).show()
df.where("amount > 500 AND status = 'PAID'").show()

βœ… Same functionality. filter() = programmatic, where() = SQL-style


πŸ”Ή 5. drop() / dropDuplicates()

df.drop("col1").dropDuplicates(["id", "month"]).show()

βœ… Drop columns or deduplicate by subset


πŸ”Ή 6. withColumnRenamed()

df.withColumnRenamed("oldName", "newName").show()

βœ… Rename a column


πŸ”Ή 7. groupBy() + agg()

df.groupBy("dept").agg(
    F.sum("amount").alias("total"),
    F.count("*").alias("count")
).show()

βœ… Grouping + aggregate logic


πŸ”Ή 8. orderBy() / sort()

df.orderBy("amount").show()
df.orderBy(F.col("amount").desc()).show()

βœ… Ascending/descending sorting


πŸ”Ή 9. distinct()

df.select("dept").distinct().show()

βœ… Unique values


πŸ”Ή 10. join()

df1.join(df2, on="id", how="inner").show()

βœ… Supports inner, left, right, outer, cross


πŸ”Ή 11. union() / unionByName()

df1.union(df2).show()
df1.unionByName(df2).show()

βœ… Combine rows of two DataFrames (schema should match)


πŸ”Ή 12. explode() – unnest array/struct

df.withColumn("tag", F.explode("tags")).show()

βœ… Flatten arrays/structs into rows


πŸ”Ή 13. array(), struct(), lit(), when()

df.withColumn("new_struct", F.struct("id", "amount")) \
  .withColumn("new_array", F.array("id", "amount")) \
  .withColumn("flag", F.when(F.col("amount") > 1000, "High").otherwise("Low")).show()

βœ… Combine fields or conditionally assign values


πŸ”Ή 14. isNull() / isNotNull() / fillna() / dropna()

df.filter(F.col("email").isNull()).show()
df.fillna(0, subset=["amount"]).show()
df.dropna(subset=["name", "dept"]).show()

βœ… Null-safe handling


πŸ”Ή 15. cast() – type conversion

df.withColumn("amount_int", F.col("amount").cast("int")).show()

βœ… Data type conversions


πŸ”Ή 16. repartition() / coalesce()

df.repartition(10).write.parquet("path")  # Wider shuffle
df.coalesce(1).write.csv("out.csv")       # Fewer files

βœ… Control number of partitions before writing


πŸ”Ή 17. cache() / persist()

df.cache()
df.persist(StorageLevel.MEMORY_AND_DISK)

βœ… Optimize re-use of DataFrames


πŸ”Ή 18. describe() / summary()

df.describe("amount", "salary").show()
df.summary("count", "mean", "min", "max").show()

βœ… Basic statistics


πŸ”Ή 19. toPandas() (only for small datasets)

pdf = df.limit(1000).toPandas()

βœ… Bring data to driver for display or ML use


πŸ”Ή 20. window() functions

from pyspark.sql.window import Window
w = Window.partitionBy("dept").orderBy("date")
df.withColumn("running_total", F.sum("amount").over(w)).show()

βœ… Do rank, lead/lag, rolling stats within groups


πŸ”Ή 21. regexp_extract / replace / split (Regex magic)

df.withColumn("clean_code", F.regexp_extract("text", r"(\\d{3})-(\\d{2})", 1)).show()

βœ… Pattern matching and string processing


Perfect. Here’s the extended, interview-ready PySpark operations cheat sheet, now tailored for:


βœ… Use in Interviews Β· ETL Pipelines Β· Notebooks Β· Metadata Frameworks

We’ll go over:

1. πŸ“Œ Real-life use case with logic

2. 🧠 What interviewers expect

3. πŸ’Ό ETL-friendly expression

4. πŸ› οΈ Reusable in Metadata Framework


πŸ”Ή 1. Column Logic: withColumn, select, expr

βœ… Example Use Case:

Add 18% GST to amount if item type is β€œTAXABLE”

df.withColumn("final_price", 
    F.when(F.col("type") == "TAXABLE", F.col("amount") * 1.18)
     .otherwise(F.col("amount"))
)

🧠 Interview Q: How do you apply conditional logic across columns?

πŸ’Ό ETL: Can be expressed via metadata like:

{
  "column_name": "final_price",
  "expression": "CASE WHEN type = 'TAXABLE' THEN amount * 1.18 ELSE amount END"
}

πŸ› οΈ Metadata layer: Execute using F.expr(metadata["expression"])


πŸ”Ή 2. Filtering Logic

βœ… Use Case:

Only keep active users in the last 12 months

df.filter(
    F.col("status") == "ACTIVE"
).filter(
    F.col("last_login") >= F.add_months(F.current_date(), -12)
)

🧠 Interview Q: How would you implement rolling time windows?

πŸ’Ό Use in A/B retention, churn logic.

πŸ› οΈ Use metadata["filter_condition"] = "status = 'ACTIVE' AND last_login >= add_months(current_date(), -12)"


πŸ”Ή 3. Dynamic Aggregations with GroupBy

βœ… Example:

agg_expr = {
  "amount": "sum",
  "bonus": "avg",
  "region": "collect_set"
}

agg_df = df.groupBy("dept").agg(
    *[getattr(F, func)(col).alias(f"{col}_{func}") for col, func in agg_expr.items()]
)

🧠 Interview Q: How do you dynamically generate aggregates?

πŸ’Ό ETL: Flexible groupBy + metric configs

πŸ› οΈ Metadata JSON:

{
  "group_by": ["dept"],
  "metrics": {"amount": "sum", "bonus": "avg", "region": "collect_set"}
}

πŸ”Ή 4. Joins & Lookups

df.join(dim_region, on="region_code", how="left")

🧠 Q: When to use broadcast, skew, z-order?

πŸ’Ό Typical in dimension enrichment

πŸ› οΈ Metadata:

{
  "join_type": "left",
  "join_column": "region_code",
  "lookup_table": "dim_region"
}

πŸ”Ή 5. Window Function with Row Enrichment

from pyspark.sql.window import Window
w = Window.partitionBy("customer").orderBy("date")

df.withColumn("running_total", F.sum("amount").over(w))

🧠 Q: Difference between rowsBetween vs rangeBetween?

πŸ’Ό Used in revenue trending, cohort metrics

πŸ› οΈ Can be parameterized:

{
  "partition_by": "customer",
  "order_by": "date",
  "window_function": "sum(amount)",
  "window_type": "rows",
  "frame": [-sys.maxsize, 0]
}

πŸ”Ή 6. Explode Arrays / Structs

df.withColumn("tag", F.explode("tags"))

🧠 Q: When do you use explode vs inline?

πŸ’Ό Used in JSON flattening, log processing

πŸ› οΈ Metadata flag:

{ "explode_column": "tags" }

πŸ”Ή 7. String Cleaning & Parsing

df.withColumn("cleaned", F.regexp_replace("email", r"[^a-zA-Z0-9@.]", ""))

🧠 Q: Regex functions in PySpark?

πŸ’Ό Used in input cleansing

πŸ› οΈ Metadata mapping:

{
  "column": "email",
  "operation": "regexp_replace",
  "pattern": "[^a-zA-Z0-9@.]",
  "replacement": ""
}

πŸ”Ή 8. Schema Alignment for Safe Union

all_cols = list(set(df1.columns) | set(df2.columns))

df1_aligned = df1.select([F.col(c) if c in df1.columns else F.lit(None).alias(c) for c in all_cols])
df2_aligned = df2.select([F.col(c) if c in df2.columns else F.lit(None).alias(c) for c in all_cols])

df1_aligned.unionByName(df2_aligned)

🧠 Q: How do you perform safe union across mismatched schemas?

πŸ’Ό ETL: Useful in multi-source pipeline

πŸ› οΈ Metadata: union list


πŸ”Ή 9. Dynamic selectExpr() for Column Transformations

cols = ["id", "upper(name) as name_uc", "salary * 1.2 as updated_salary"]
df.selectExpr(*cols).show()

🧠 Q: How do you parameterize transformation logic?

πŸ’Ό Declarative transformation via config

πŸ› οΈ Metadata example:

{
  "select_exprs": ["id", "upper(name) as name_uc", "salary * 1.2 as updated_salary"]
}

πŸ”Ή 10. Date Filtering Tricks (Dynamic)

def filter_last_n_months(df, date_col, months=12):
    start = F.trunc(F.add_months(F.current_date(), -months - 1), "MM")
    end = F.last_day(F.add_months(F.current_date(), -1))
    return df.filter((F.col(date_col) >= start) & (F.col(date_col) <= end))

🧠 Q: How do you apply rolling logic for time windows?

πŸ’Ό Used in retention, churn, time-based KPIs

πŸ› οΈ Parameter:

{
  "filter_type": "rolling_months",
  "months_back": 12
}


Pages: 1 2 3 4 5 6 7