PySpark Coding Practice Questions

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType

spark = SparkSession.builder.appName("AdvancedPySpark").getOrCreate()

# Sample DataFrame
data = [
    ("Sales", "Jan", 1000),
    ("Sales", "Feb", 1200),
    ("HR", "Jan", 800),
    ("HR", "Feb", 900),
    ("IT", "Jan", 2000),
    ("IT", "Feb", 2100)
]
df = spark.createDataFrame(data, ["dept", "month", "amount"])
df.show()

# 11. Row-Level Trick: Flag High Value Rows
df.withColumn("is_high", F.col("amount") > 1000).show()

# 12. Summary Aggregation: Monthly Totals and Grand Total
df.groupByExpr("GROUPING SETS ((dept, month), ())") \
  .agg(F.sum("amount").alias("total_amount")) \
  .show()

# 13. Column-Level Manipulation: Rename All Columns to Uppercase
df_upper = df.toDF(*[c.upper() for c in df.columns])
df_upper.show()

# 14. Dynamic Column Renaming with Prefix
prefix = "PRE_"
df_prefixed = df.select([F.col(c).alias(f"{prefix}{c}") for c in df.columns])
df_prefixed.show()

# 15. Cross-Row Comparison: Rank Within Dept
rank_window = Window.partitionBy("dept").orderBy(F.col("amount").desc())
df.withColumn("rank_in_dept", F.rank().over(rank_window)).show()

# 16. Schema Trick: Safe Union of Two Mismatched DataFrames
df1 = df.select("dept", "month", "amount")
df2 = df1.withColumn("bonus", F.lit(500))

common_cols = list(set(df1.columns) | set(df2.columns))
df1_safe = df1.select([F.col(c) if c in df1.columns else F.lit(None).alias(c) for c in common_cols])
df2_safe = df2.select([F.col(c) if c in df2.columns else F.lit(None).alias(c) for c in common_cols])
df1_safe.unionByName(df2_safe).show()

# 17. Schema Trick: Explode Tags and Group
tags_data = [(1, ["a", "b"]), (2, ["b", "c"])]
tags_df = spark.createDataFrame(tags_data, ["id", "tags"])
tags_df.withColumn("tag", F.explode("tags")) \
       .groupBy("tag") \
       .count() \
       .show()

# 18. Join + Group + Window: Compare Amount with Dept Average After Join
avg_df = df.groupBy("dept").agg(F.avg("amount").alias("avg_amount"))
df.join(avg_df, on="dept") \
  .withColumn("above_avg", F.col("amount") > F.col("avg_amount")) \
  .show()

# 19. Join + Row Number in Joined Result
joined = df.join(avg_df, on="dept")
window_spec = Window.partitionBy("dept").orderBy("amount")
joined.withColumn("row_num", F.row_number().over(window_spec)).show()

# 20. Bonus: Concatenate all months per dept into one string
concat_window = Window.partitionBy("dept")
df.withColumn("all_months", F.concat_ws(",", F.collect_list("month").over(concat_window))) \
  .select("dept", "month", "all_months") \
  .distinct() \
  .show()

# 21. JSON Flattening: Extract Fields from Nested JSON Column
json_data = [(1, '{"name": "Alice", "age": 30}'), (2, '{"name": "Bob", "age": 25}')]
json_df = spark.createDataFrame(json_data, ["id", "json_str"])
json_parsed = json_df.withColumn("parsed", F.from_json("json_str", StructType([
    StructField("name", StringType()),
    StructField("age", IntegerType())
])))
json_parsed.select("id", "parsed.name", "parsed.age").show()

# 22. Pivot Trick: Pivot Month and Fill Missing with 0
df.groupBy("dept") \
  .pivot("month", ["Jan", "Feb", "Mar"]) \
  .agg(F.sum("amount")) \
  .na.fill(0) \
  .show()

# 23. Advanced Dynamic Aggregation Using Metadata
aggs = {"amount": "sum", "dept": "count"}  # From metadata
agg_exprs = [getattr(F, func)(col).alias(f"{func}_{col}") for col, func in aggs.items()]
df.agg(*agg_exprs).show()

# 24. Equivalent SQL Style Example for Above Queries
df.createOrReplaceTempView("sales")
spark.sql("SELECT dept, month, SUM(amount) as total FROM sales GROUP BY dept, month").show()
spark.sql("SELECT dept, SUM(CASE WHEN month = 'Jan' THEN amount ELSE 0 END) AS jan_total FROM sales GROUP BY dept").show()

# 25. Null Handling: Replace Nulls and Drop Null Rows
df_with_null = df.withColumn("bonus", F.when(F.col("dept") == "HR", None).otherwise(100))
df_with_null.na.fill({"bonus": 0}).show()
df_with_null.na.drop().show()

# 26. Map Type: Access and Explode Map Columns
map_data = [(1, {"a": 10, "b": 20}), (2, {"x": 5, "y": 15})]
map_df = spark.createDataFrame(map_data, ["id", "scores"])
map_df.withColumn("key", F.explode(F.map_keys("scores"))) \
      .withColumn("value", F.col("scores")[F.col("key")]) \
      .select("id", "key", "value").show()

# 27. Flatten Deeply Nested Structs
nested_data = [(1, ("Alice", 30)), (2, ("Bob", 25))]
nested_schema = StructType([
    StructField("id", IntegerType()),
    StructField("person", StructType([
        StructField("name", StringType()),
        StructField("age", IntegerType())
    ]))
])
nested_df = spark.createDataFrame(nested_data, schema=nested_schema)
nested_df.select("id", "person.name", "person.age").show()

# 28. Wrapper Function for Metadata-Driven GroupBy and Aggregation
def dynamic_groupby(df, group_cols, agg_config, pivot_col=None, pivot_vals=None):
    agg_exprs = [getattr(F, func)(col).alias(f"{func}_{col}") for col, func in agg_config.items()]
    grouped = df.groupBy(group_cols)
    if pivot_col:
        return grouped.pivot(pivot_col, pivot_vals).agg(*agg_exprs)
    else:
        return grouped.agg(*agg_exprs)

# 29. Wrapper for Dynamic Filtering Using Metadata
def apply_filters(df, filters):
    for f in filters:
        col, op, val = f["col"], f["op"], f["val"]
        if op == "eq":
            df = df.filter(F.col(col) == val)
        elif op == "gt":
            df = df.filter(F.col(col) > val)
        elif op == "lt":
            df = df.filter(F.col(col) < val)
        elif op == "in":
            df = df.filter(F.col(col).isin(val))
    return df

# Example
filters_meta = [
    {"col": "dept", "op": "eq", "val": "Sales"},
    {"col": "amount", "op": "gt", "val": 1000}
]
apply_filters(df, filters_meta).show()

# 30. Wrapper for Conditional Column Creation from Metadata
def apply_conditions(df, condition_list):
    for cond in condition_list:
        new_col, when_col, op, threshold, true_val, false_val = cond.values()
        if op == "gt":
            expr = F.when(F.col(when_col) > threshold, true_val).otherwise(false_val)
        elif op == "lt":
            expr = F.when(F.col(when_col) < threshold, true_val).otherwise(false_val)
        elif op == "eq":
            expr = F.when(F.col(when_col) == threshold, true_val).otherwise(false_val)
        df = df.withColumn(new_col, expr)
    return df

# Example
conditions = [
    {"new_col": "amount_label", "when_col": "amount", "op": "gt", "threshold": 1000, "true_val": "High", "false_val": "Low"}
]
apply_conditions(df, conditions).show()

# 31. Wrapper for Safe Join Using Metadata
def safe_join(df1, df2, join_keys, how="inner"):
    try:
        return df1.join(df2, on=join_keys, how=how)
    except Exception as e:
        print(f"Join failed: {e}")
        return df1

# Example join
dept_avg = df.groupBy("dept").agg(F.avg("amount").alias("avg_amount"))
safe_join(df, dept_avg, ["dept"]).show()

# 32. Error Handling in DataFrame Ops
def safe_with_column(df, col_name, func):
    try:
        return df.withColumn(col_name, func)
    except Exception as e:
        print(f"Column creation failed: {e}")
        return df

# Example
safe_with_column(df, "new_col", F.col("amount") * 2).show()
# PySpark Advanced DataFrame Tricks & Interview Scenarios
# ---------------------------------------------------------
# Covers Row-level Tricks, Summary Aggregations, Column Manipulation,
# Cross-row Comparisons, Schema Tricks, Join + Group + Window Combos,
# Dynamic Queries, Null Handling, Map Types, Error Handling,
# Date & String Manipulations, and Advanced Window Functions

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType, TimestampType

spark = SparkSession.builder.appName("AdvancedPySpark").getOrCreate()

# Sample DataFrame for date, amount, and category
from datetime import datetime

data = [
    ("Sales", "2024-01-15", 1000, "East Zone"),
    ("Sales", "2024-02-20", 1200, "West Zone"),
    ("HR", "2023-12-10", 800, "East-Zone"),
    ("HR", "2024-01-05", 900, "West_Zone"),
    ("IT", "2024-01-30", 2000, "North.Zone"),
    ("IT", "2024-02-25", 2100, "South Zone")
]
df = spark.createDataFrame(data, ["dept", "date_str", "amount", "region"])
df = df.withColumn("date", F.to_date("date_str"))
df = df.drop("date_str")
df.show()

# ----------------------------------
# 33. Date Filtering: Filter Specific Month or Year
# ----------------------------------
df.filter(F.year("date") == 2024).show()
df.filter((F.year("date") == 2024) & (F.month("date") == 1)).show()

# ----------------------------------
# 34. String Cleaning: Standardize Region Names
# ----------------------------------
df.withColumn("region_clean", F.regexp_replace(F.lower("region"), "[^a-z]", " ")) \
  .withColumn("region_final", F.trim(F.regexp_replace("region_clean", "\\s+", " "))) \
  .select("region", "region_final").distinct().show()

# ----------------------------------
# 35. Regex Extract: Extract Zone Info
# ----------------------------------
df.withColumn("zone", F.regexp_extract(F.col("region"), r"([A-Za-z]+)[ -._]Zone", 1)).show()

# ----------------------------------
# 36. Window Function: Running Total by Dept
# ----------------------------------
window_spec = Window.partitionBy("dept").orderBy("date")
df.withColumn("running_total", F.sum("amount").over(window_spec)).show()

# ----------------------------------
# 37. Advanced Window: Lag, Lead, and Percent Change
# ----------------------------------
df = df.withColumn("lag_amount", F.lag("amount").over(window_spec))
df = df.withColumn("pct_change", ((F.col("amount") - F.col("lag_amount")) / F.col("lag_amount")) * 100)
df.select("dept", "date", "amount", "lag_amount", "pct_change").show()

# ----------------------------------
# 38. Window + Filter: Most Recent Entry Per Dept
# ----------------------------------
rank_spec = Window.partitionBy("dept").orderBy(F.col("date").desc())
df.withColumn("rank", F.row_number().over(rank_spec)) \
  .filter("rank = 1").select("dept", "date", "amount").show()

# ----------------------------------
# 39. Rolling Average (3-row window)
# ----------------------------------
rolling_window = Window.partitionBy("dept").orderBy("date").rowsBetween(-2, 0)
df.withColumn("rolling_avg", F.avg("amount").over(rolling_window)).show()

# ----------------------------------
# 40. Combine Date + String Logic: Label Quarter and Clean Names
# ----------------------------------
df.withColumn("quarter", F.quarter("date")) \
  .withColumn("dept_std", F.upper(F.col("dept"))) \
  .withColumn("region_std", F.initcap(F.regexp_replace(F.col("region"), "[._-]", " "))) \
  .select("dept_std", "region_std", "quarter", "amount").show()

# END

Pages: 1 2 3 4 5 6 7