๐ฅ 50+ Realistic Data Engineering Coding-Based Interview Questions (PySpark Focused)
๐ DATAFRAME TRANSFORMATION & QUERYING
- Read a JSON file with nested schema. Flatten all levels into separate columns.
- Load a CSV file and remove duplicate rows based on specific columns.
- Extract domain names from an email column.
- Filter records where a column contains any of a list of values.
- Cast a column from string to timestamp with error handling.
- Fill missing values differently for different columns.
- Count number of nulls per column in a DataFrame.
- Replace all occurrences of “N/A”, “null”, “missing” with None.
- Identify and drop columns that are completely null.
- Select top 3 highest values per group (e.g., top 3 sales per region).
- Compare two DataFrames for schema and row-level differences.
- Pivot a DataFrame and fill missing values with zero.
- Unpivot a wide DataFrame to long format.
- Write PySpark code to extract year and month from a date column.
- Rename all columns to lowercase.
๐ชจ JOINS & ADVANCED WINDOWING
- Perform an inner join and remove duplicate columns post-join.
- Handle skewed joins using salting or broadcast joins.
- Rank users within a group by revenue using Window functions.
- Compute moving average over past 3 rows per group.
- Create a lagged column and use it to detect trend direction.
- Calculate cumulative sum and reset at partition boundaries.
- Fill NULLs in a column with the last non-null value using windowing.
- Perform join between two DataFrames with different column names.
- Implement full outer join and tag source of each row (left, right, both).
โจ COMPLEX LOGIC & OPTIMIZATION
- Read multiple files from a folder and apply schema inference only once.
- Apply multiple filters dynamically using a list of conditions.
- Create a DataFrame from multiple heterogeneous JSON structures.
- Perform union on two DataFrames with differing schemas.
- Detect and log rows violating specific data quality rules.
- Handle column name collisions in chained joins.
- Debug why a join causes data duplication.
- Optimize a transformation-heavy job using cache/persist correctly.
- Show impact of wide vs narrow transformations on DAG.
- Detect skewed data distributions by key.
- Optimize slow Parquet read using predicate pushdown.
๐ DELTA LAKE + FILE I/O
- Implement SCD Type 1 and Type 2 with Delta Lake.
- Use Delta time travel to read older version of a table.
- Merge streaming and batch data into the same Delta table.
- Write a partitioned Parquet file with compression.
- Handle schema evolution when writing to a Delta table.
- Describe and optimize small file problem in Delta Lake.
- Use Delta’s
OPTIMIZE
andVACUUM
correctly in pipeline.
โ๏ธ PIPELINE / REAL-WORLD FLOWS
- Design an ingestion pipeline from S3 to Hive using Spark.
- Apply business rules from metadata/config JSON to transform data.
- Parameterize file paths and job configuration dynamically.
- Write log entries at each stage of the Spark job.
- Implement retry and failure logic for tasks that intermittently fail.
- Validate data schema against an expected contract.
- Convert raw CSV data to bronze/silver/gold layers.
- Cleanse and join real-world IoT datasets from multiple devices.
๐ง DEBUGGING, TESTING, INTERVIEW TRICKS
- Identify bottlenecks using Spark UI.
- Trace shuffle operations from logical plan.
- Write a PyTest test case for Spark DataFrame comparison.
- Track lineage of a column through multiple transformations.
- Handle corrupt record detection and quarantine.
๐ฅ 50+ Realistic Data Engineering Coding-Based Interview Questions (PySpark Focused) โ With Solutions
๐ DATAFRAME TRANSFORMATION & QUERYING
- Flatten JSON Schema
Q: Read a nested JSON file and flatten it.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType
def flatten_df(df):
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if isinstance(field.dataType, StructType)])
while complex_fields:
col_name = list(complex_fields.keys())[0]
expanded = [col(col_name + '.' + k).alias(col_name + '_' + k)
for k in [n.name for n in complex_fields[col_name]]]
df = df.select("*", *expanded).drop(col_name)
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if isinstance(field.dataType, StructType)])
return df
nested_df = spark.read.json("nested.json")
flat_df = flatten_df(nested_df)
flat_df.show()
- Drop Duplicates on Specific Columns
df.dropDuplicates(["id", "name"]).show()
- Extract Domain from Email
from pyspark.sql.functions import split
df.withColumn("domain", split(col("email"), "@")[1]).show()
- Filter Where Column in List
values = ["A", "B", "C"]
df.filter(col("category").isin(values)).show()
- Cast Column to Timestamp With Fallback
from pyspark.sql.functions import to_timestamp, when
df = df.withColumn("ts", to_timestamp("ts_str", "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn("ts", when(col("ts").isNull(), to_timestamp("ts_str", "MM/dd/yyyy"))
.otherwise(col("ts")))
- Fill Missing Values Per Column
fill_map = {"name": "Unknown", "age": 0}
df.fillna(fill_map).show()
- Count Nulls Per Column
from pyspark.sql.functions import sum, col, isnan
null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts.show()
- Replace “N/A”, “null”, etc. with None
replace_vals = ["N/A", "null", "missing"]
df = df.replace(replace_vals, None)
- Drop Fully Null Columns
non_null_df = df.select([c for c in df.columns if df.filter(col(c).isNotNull()).count() > 0])
- Top-N by Group
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
w = Window.partitionBy("region").orderBy(col("sales").desc())
df.withColumn("rnk", row_number().over(w)).filter("rnk <= 3").show()
- Compare Two DataFrames Row-by-Row
df1.exceptAll(df2).show()
df2.exceptAll(df1).show()
- Pivot with Fill
df.groupBy("id").pivot("month").agg(sum("sales")).fillna(0).show()
- Unpivot (Melt)
from pyspark.sql.functions import expr
melt_cols = ["math", "physics", "chem"]
df_melted = df.selectExpr("id", "name",
f"stack({len(melt_cols)}, " +
", ".join([f"'{col}', {col}" for col in melt_cols]) +
") as (subject, score)")
df_melted.show()
- Extract Year & Month
from pyspark.sql.functions import year, month
df.withColumn("year", year("order_date")).withColumn("month", month("order_date")).show()
- Rename Columns to Lowercase
df = df.toDF(*[c.lower() for c in df.columns])
๐ฅ 50+ Realistic Data Engineering Coding-Based Interview Questions (PySpark Focused) โ With Solutions
๐ DATAFRAME TRANSFORMATION & QUERYING
(Already provided: Q1โQ15)
๐ ADVANCED TRANSFORMATIONS & JOINS (Q16โQ30)
- Join with Null-safe Equality
df1.join(df2, df1["id"] <=> df2["id"]).show()
- Anti Join (records in left not in right)
df1.join(df2, on="id", how="left_anti").show()
- Semi Join (records in left with matches in right)
df1.join(df2, on="id", how="left_semi").show()
- Join with Broadcast Hint
from pyspark.sql.functions import broadcast
df1.join(broadcast(df2), on="id").show()
- Self Join with Alias
left = df.alias("left")
right = df.alias("right")
left.join(right, left["manager_id"] == right["id"]).select("left.name", "right.name").show()
- Join with Expression Keys
df1.join(df2, expr("df1.id = df2.parent_id AND df1.status = 'active'"), "inner")
- Detect Join Skew
df.groupBy("join_key").count().orderBy("count", ascending=False).show()
- Skew Join Handling with Salting
from pyspark.sql.functions import rand, concat_ws
salted = df1.withColumn("salt", (rand()*10).cast("int"))
lookup = df2.withColumn("salt", expr("explode(array(0,1,2,3,4,5,6,7,8,9))"))
joined = salted.join(lookup, ["key", "salt"])
- Join and Aggregate on Same Column
joined = df1.join(df2, "id").groupBy("category").agg(sum("amount"))
- Detect Duplicate Records Across DFs
duplicates = df1.intersect(df2)
duplicates.show()
- Chained Transformations with Aliases
df.withColumn("net", (col("price") - col("discount")).alias("net_price")).filter("net > 1000").show()
- Z-Score Normalization
from pyspark.sql.functions import mean, stddev
mean_val = df.select(mean("score")).collect()[0][0]
std_val = df.select(stddev("score")).collect()[0][0]
df.withColumn("zscore", (col("score") - mean_val) / std_val).show()
- Explode Array Column
from pyspark.sql.functions import explode
df.withColumn("skill", explode("skills")).show()
- Create Lag-Based Flag
from pyspark.sql.window import Window
from pyspark.sql.functions import lag
w = Window.partitionBy("user").orderBy("timestamp")
df = df.withColumn("prev_action", lag("action").over(w))
- Filter Based on Aggregated Condition
agg_df = df.groupBy("region").agg(avg("sales").alias("avg_sales"))
df.join(agg_df, "region").filter("sales > avg_sales").show()
๐ง DELTA LAKE OPERATIONS (Q31โQ35)
- Create a Delta Table
df.write.format("delta").mode("overwrite").save("/mnt/delta/bronze")
- Time Travel to Specific Version
spark.read.format("delta").option("versionAsOf", 3).load("/mnt/delta/bronze").show()
- Upsert with MERGE INTO (SCD1)
spark.sql("""
MERGE INTO target USING source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
- SCD Type 2 Logic in Delta
spark.sql("""
MERGE INTO scd_table AS tgt
USING updates AS src
ON tgt.id = src.id AND tgt.current = true
WHEN MATCHED AND tgt.data <> src.data THEN
UPDATE SET current = false, end_date = current_date()
WHEN NOT MATCHED THEN
INSERT (id, data, current, start_date, end_date)
VALUES (src.id, src.data, true, current_date(), null)
""")
- OPTIMIZE + VACUUM
spark.sql("OPTIMIZE delta.`/mnt/delta/bronze`")
spark.sql("VACUUM delta.`/mnt/delta/bronze` RETAIN 168 HOURS")
๐ PERFORMANCE + DEBUGGING + OPTIMIZATION (Q36โQ50)
- Cache & Unpersist
df.cache()
df.count()
df.unpersist()
- Persist to DISK_ONLY
from pyspark.storagelevel import StorageLevel
df.persist(StorageLevel.DISK_ONLY)
- Repartition Before Wide Join
df1 = df1.repartition(100, "join_key")
df2 = df2.repartition(100, "join_key")
- Explain Plan Debugging
df.explain(True)
- Trigger Shuffle Exception (for learning)
# Force skewed data and observe DAG
- Log Row Counts Per Stage
print("Row Count:", df.count())
- Checkpointing for Fault Recovery
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
df.checkpoint()
- Coalesce to Reduce Output Files
df.coalesce(1).write.csv("/single_file")
- PartitionBy When Saving Table
df.write.partitionBy("year", "month").parquet("/partitioned")
- Custom Partition Column for Joins
df = df.withColumn("bucket", col("user_id") % 10)
df.repartition("bucket")
- Broadcast Join Threshold Check
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
- Run Code on Specific Executor Cores
# Not direct in PySpark, handled via spark-submit --executor-cores
- Handling Nulls in Window Rank
df = df.withColumn("rank", rank().over(Window.orderBy(col("value").desc_nulls_last())))
- Retry Logic for Job Step
for i in range(3):
try:
df.write.save("/output")
break
except Exception as e:
print(f"Retry {i+1}:", e)
- Unit Testing PySpark Jobs (Pytest)
# Use spark.createDataFrame() with sample inputs + assert expected outputs