PySpark Coding Practice Questions

๐Ÿ”ฅ 50+ Realistic Data Engineering Coding-Based Interview Questions (PySpark Focused)


๐ŸŒŸ DATAFRAME TRANSFORMATION & QUERYING

  1. Read a JSON file with nested schema. Flatten all levels into separate columns.
  2. Load a CSV file and remove duplicate rows based on specific columns.
  3. Extract domain names from an email column.
  4. Filter records where a column contains any of a list of values.
  5. Cast a column from string to timestamp with error handling.
  6. Fill missing values differently for different columns.
  7. Count number of nulls per column in a DataFrame.
  8. Replace all occurrences of “N/A”, “null”, “missing” with None.
  9. Identify and drop columns that are completely null.
  10. Select top 3 highest values per group (e.g., top 3 sales per region).
  11. Compare two DataFrames for schema and row-level differences.
  12. Pivot a DataFrame and fill missing values with zero.
  13. Unpivot a wide DataFrame to long format.
  14. Write PySpark code to extract year and month from a date column.
  15. Rename all columns to lowercase.

๐Ÿชจ JOINS & ADVANCED WINDOWING

  1. Perform an inner join and remove duplicate columns post-join.
  2. Handle skewed joins using salting or broadcast joins.
  3. Rank users within a group by revenue using Window functions.
  4. Compute moving average over past 3 rows per group.
  5. Create a lagged column and use it to detect trend direction.
  6. Calculate cumulative sum and reset at partition boundaries.
  7. Fill NULLs in a column with the last non-null value using windowing.
  8. Perform join between two DataFrames with different column names.
  9. Implement full outer join and tag source of each row (left, right, both).

โœจ COMPLEX LOGIC & OPTIMIZATION

  1. Read multiple files from a folder and apply schema inference only once.
  2. Apply multiple filters dynamically using a list of conditions.
  3. Create a DataFrame from multiple heterogeneous JSON structures.
  4. Perform union on two DataFrames with differing schemas.
  5. Detect and log rows violating specific data quality rules.
  6. Handle column name collisions in chained joins.
  7. Debug why a join causes data duplication.
  8. Optimize a transformation-heavy job using cache/persist correctly.
  9. Show impact of wide vs narrow transformations on DAG.
  10. Detect skewed data distributions by key.
  11. Optimize slow Parquet read using predicate pushdown.

๐Ÿš€ DELTA LAKE + FILE I/O

  1. Implement SCD Type 1 and Type 2 with Delta Lake.
  2. Use Delta time travel to read older version of a table.
  3. Merge streaming and batch data into the same Delta table.
  4. Write a partitioned Parquet file with compression.
  5. Handle schema evolution when writing to a Delta table.
  6. Describe and optimize small file problem in Delta Lake.
  7. Use Delta’s OPTIMIZE and VACUUM correctly in pipeline.

โš™๏ธ PIPELINE / REAL-WORLD FLOWS

  1. Design an ingestion pipeline from S3 to Hive using Spark.
  2. Apply business rules from metadata/config JSON to transform data.
  3. Parameterize file paths and job configuration dynamically.
  4. Write log entries at each stage of the Spark job.
  5. Implement retry and failure logic for tasks that intermittently fail.
  6. Validate data schema against an expected contract.
  7. Convert raw CSV data to bronze/silver/gold layers.
  8. Cleanse and join real-world IoT datasets from multiple devices.

๐Ÿ”ง DEBUGGING, TESTING, INTERVIEW TRICKS

  1. Identify bottlenecks using Spark UI.
  2. Trace shuffle operations from logical plan.
  3. Write a PyTest test case for Spark DataFrame comparison.
  4. Track lineage of a column through multiple transformations.
  5. Handle corrupt record detection and quarantine.

๐Ÿ”ฅ 50+ Realistic Data Engineering Coding-Based Interview Questions (PySpark Focused) โ€” With Solutions


๐ŸŒŸ DATAFRAME TRANSFORMATION & QUERYING

  1. Flatten JSON Schema
    Q: Read a nested JSON file and flatten it.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType

def flatten_df(df):
    complex_fields = dict([(field.name, field.dataType)
        for field in df.schema.fields 
        if isinstance(field.dataType, StructType)])
    while complex_fields:
        col_name = list(complex_fields.keys())[0]
        expanded = [col(col_name + '.' + k).alias(col_name + '_' + k) 
                    for k in [n.name for n in complex_fields[col_name]]]
        df = df.select("*", *expanded).drop(col_name)
        complex_fields = dict([(field.name, field.dataType)
            for field in df.schema.fields 
            if isinstance(field.dataType, StructType)])
    return df

nested_df = spark.read.json("nested.json")
flat_df = flatten_df(nested_df)
flat_df.show()
  1. Drop Duplicates on Specific Columns
df.dropDuplicates(["id", "name"]).show()
  1. Extract Domain from Email
from pyspark.sql.functions import split

df.withColumn("domain", split(col("email"), "@")[1]).show()
  1. Filter Where Column in List
values = ["A", "B", "C"]
df.filter(col("category").isin(values)).show()
  1. Cast Column to Timestamp With Fallback
from pyspark.sql.functions import to_timestamp, when

df = df.withColumn("ts", to_timestamp("ts_str", "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn("ts", when(col("ts").isNull(), to_timestamp("ts_str", "MM/dd/yyyy"))
                                 .otherwise(col("ts")))
  1. Fill Missing Values Per Column
fill_map = {"name": "Unknown", "age": 0}
df.fillna(fill_map).show()
  1. Count Nulls Per Column
from pyspark.sql.functions import sum, col, isnan

null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts.show()
  1. Replace “N/A”, “null”, etc. with None
replace_vals = ["N/A", "null", "missing"]
df = df.replace(replace_vals, None)
  1. Drop Fully Null Columns
non_null_df = df.select([c for c in df.columns if df.filter(col(c).isNotNull()).count() > 0])
  1. Top-N by Group
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

w = Window.partitionBy("region").orderBy(col("sales").desc())
df.withColumn("rnk", row_number().over(w)).filter("rnk <= 3").show()
  1. Compare Two DataFrames Row-by-Row
df1.exceptAll(df2).show()
df2.exceptAll(df1).show()
  1. Pivot with Fill
df.groupBy("id").pivot("month").agg(sum("sales")).fillna(0).show()
  1. Unpivot (Melt)
from pyspark.sql.functions import expr

melt_cols = ["math", "physics", "chem"]
df_melted = df.selectExpr("id", "name", 
    f"stack({len(melt_cols)}, " + 
    ", ".join([f"'{col}', {col}" for col in melt_cols]) + 
    ") as (subject, score)")
df_melted.show()
  1. Extract Year & Month
from pyspark.sql.functions import year, month

df.withColumn("year", year("order_date")).withColumn("month", month("order_date")).show()
  1. Rename Columns to Lowercase
df = df.toDF(*[c.lower() for c in df.columns])

๐Ÿ”ฅ 50+ Realistic Data Engineering Coding-Based Interview Questions (PySpark Focused) โ€” With Solutions


๐ŸŒŸ DATAFRAME TRANSFORMATION & QUERYING

(Already provided: Q1โ€“Q15)


๐Ÿ” ADVANCED TRANSFORMATIONS & JOINS (Q16โ€“Q30)

  1. Join with Null-safe Equality
df1.join(df2, df1["id"] <=> df2["id"]).show()
  1. Anti Join (records in left not in right)
df1.join(df2, on="id", how="left_anti").show()
  1. Semi Join (records in left with matches in right)
df1.join(df2, on="id", how="left_semi").show()
  1. Join with Broadcast Hint
from pyspark.sql.functions import broadcast
df1.join(broadcast(df2), on="id").show()
  1. Self Join with Alias
left = df.alias("left")
right = df.alias("right")
left.join(right, left["manager_id"] == right["id"]).select("left.name", "right.name").show()
  1. Join with Expression Keys
df1.join(df2, expr("df1.id = df2.parent_id AND df1.status = 'active'"), "inner")
  1. Detect Join Skew
df.groupBy("join_key").count().orderBy("count", ascending=False).show()
  1. Skew Join Handling with Salting
from pyspark.sql.functions import rand, concat_ws

salted = df1.withColumn("salt", (rand()*10).cast("int"))
lookup = df2.withColumn("salt", expr("explode(array(0,1,2,3,4,5,6,7,8,9))"))
joined = salted.join(lookup, ["key", "salt"])
  1. Join and Aggregate on Same Column
joined = df1.join(df2, "id").groupBy("category").agg(sum("amount"))
  1. Detect Duplicate Records Across DFs
duplicates = df1.intersect(df2)
duplicates.show()
  1. Chained Transformations with Aliases
df.withColumn("net", (col("price") - col("discount")).alias("net_price")).filter("net > 1000").show()
  1. Z-Score Normalization
from pyspark.sql.functions import mean, stddev

mean_val = df.select(mean("score")).collect()[0][0]
std_val = df.select(stddev("score")).collect()[0][0]
df.withColumn("zscore", (col("score") - mean_val) / std_val).show()
  1. Explode Array Column
from pyspark.sql.functions import explode
df.withColumn("skill", explode("skills")).show()
  1. Create Lag-Based Flag
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

w = Window.partitionBy("user").orderBy("timestamp")
df = df.withColumn("prev_action", lag("action").over(w))
  1. Filter Based on Aggregated Condition
agg_df = df.groupBy("region").agg(avg("sales").alias("avg_sales"))
df.join(agg_df, "region").filter("sales > avg_sales").show()

๐ŸงŠ DELTA LAKE OPERATIONS (Q31โ€“Q35)

  1. Create a Delta Table
df.write.format("delta").mode("overwrite").save("/mnt/delta/bronze")
  1. Time Travel to Specific Version
spark.read.format("delta").option("versionAsOf", 3).load("/mnt/delta/bronze").show()
  1. Upsert with MERGE INTO (SCD1)
spark.sql("""
MERGE INTO target USING source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
  1. SCD Type 2 Logic in Delta
spark.sql("""
MERGE INTO scd_table AS tgt
USING updates AS src
ON tgt.id = src.id AND tgt.current = true
WHEN MATCHED AND tgt.data <> src.data THEN
  UPDATE SET current = false, end_date = current_date()
WHEN NOT MATCHED THEN
  INSERT (id, data, current, start_date, end_date)
  VALUES (src.id, src.data, true, current_date(), null)
""")
  1. OPTIMIZE + VACUUM
spark.sql("OPTIMIZE delta.`/mnt/delta/bronze`")
spark.sql("VACUUM delta.`/mnt/delta/bronze` RETAIN 168 HOURS")

๐Ÿš€ PERFORMANCE + DEBUGGING + OPTIMIZATION (Q36โ€“Q50)

  1. Cache & Unpersist
df.cache()
df.count()
df.unpersist()
  1. Persist to DISK_ONLY
from pyspark.storagelevel import StorageLevel
df.persist(StorageLevel.DISK_ONLY)
  1. Repartition Before Wide Join
df1 = df1.repartition(100, "join_key")
df2 = df2.repartition(100, "join_key")
  1. Explain Plan Debugging
df.explain(True)
  1. Trigger Shuffle Exception (for learning)
# Force skewed data and observe DAG
  1. Log Row Counts Per Stage
print("Row Count:", df.count())
  1. Checkpointing for Fault Recovery
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
df.checkpoint()
  1. Coalesce to Reduce Output Files
df.coalesce(1).write.csv("/single_file")
  1. PartitionBy When Saving Table
df.write.partitionBy("year", "month").parquet("/partitioned")
  1. Custom Partition Column for Joins
df = df.withColumn("bucket", col("user_id") % 10)
df.repartition("bucket")
  1. Broadcast Join Threshold Check
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
  1. Run Code on Specific Executor Cores
# Not direct in PySpark, handled via spark-submit --executor-cores
  1. Handling Nulls in Window Rank
df = df.withColumn("rank", rank().over(Window.orderBy(col("value").desc_nulls_last())))
  1. Retry Logic for Job Step
for i in range(3):
    try:
        df.write.save("/output")
        break
    except Exception as e:
        print(f"Retry {i+1}:", e)
  1. Unit Testing PySpark Jobs (Pytest)
# Use spark.createDataFrame() with sample inputs + assert expected outputs

Pages: 1 2 3 4 5 6