Certainly! Here’s the complete crisp PySpark Interview Q&A Cheat Sheet with all your questions so far, formatted consistently for flashcards, Excel, or cheat sheet use:
Question | Answer |
---|---|
How do you handle schema mismatch when reading multiple JSON/Parquet files with different structures? | Use .option("mergeSchema", "true") when reading Parquet files; for JSON, unify schemas by selecting common columns or using schema option and .select() with null filling. |
You want to write a DataFrame back to Parquet but keep the original file size consistent. What options do you use? | Control file size with .option("parquet.block.size", sizeInBytes) and .option("parquet.page.size", sizeInBytes) ; also control number of output files via .repartition() before writing. |
Why might a join operation cause executor OOM errors, and how can you avoid it? | Large shuffle data, skewed keys, or huge join sides cause OOM. Avoid by broadcasting small tables, repartitioning by join key, filtering data, and salting skewed keys. |
But I’m joining based on idkey which is 5 million in number — should I do df1.repartition("join_key") ? | Yes, repartition both DataFrames on join_key for shuffle optimization if distribution is even. Beware of skew, consider salting if skewed. |
You’re reading a CSV with missing values. How would you replace nulls dynamically across all columns? | Loop through df.dtypes , use .withColumn() with when(col.isNull(), default) for each type: 0 for numbers, “missing” for strings, False for booleans, etc. |
How do you handle corrupt records while reading JSON/CSV? | For JSON: .option("badRecordsPath", "path") to save corrupt records; For CSV: .option("mode", "PERMISSIVE") or "DROPMALFORMED" , plus .option("columnNameOfCorruptRecord", "_corrupt_record") . |
How do you handle duplicate rows in a DataFrame? | Use .dropDuplicates() to remove exact duplicates or .dropDuplicates([col1, col2]) for specific columns. |
How to handle nulls before aggregation? | Use .fillna() with appropriate defaults before groupBy and aggregation. |
How do you read only specific columns from a Parquet file? | Use .select("col1", "col2") after .read.parquet() to load only required columns. |
How do you optimize wide transformations like joins and groupBy? | Broadcast small DataFrames, repartition by join/group keys, cache reused data, filter early, and avoid unnecessary shuffle. |
How do you write partitioned Parquet files with overwrite behavior? | .write.mode("overwrite").partitionBy("year", "month").parquet("path") |
How do you check for skew in Spark join keys? | .groupBy("join_key").count().orderBy("count", ascending=False).show(10) to find skewed keys. |
#6: How do you read a nested JSON and flatten it into a tabular format using PySpark? | python\ndf = spark.read.json("path/to/json")\nfrom pyspark.sql.functions import explode, col\nflat_df = df.select("id", explode("nested_array").alias("element")) |
#7: How would you implement slowly changing dimension Type 2 (SCD2) logic using PySpark? | Use Delta Lake MERGE:1. Match on business key2. Update old record with is_current = false 3. Insert new row with is_current = true |
#8: You need to read from a Kafka topic where each message is a JSON. How would you parse and process it? | python\ndf = spark.readStream.format("kafka").option("subscribe", "topic").load()\nfrom pyspark.sql.functions import from_json, col\nparsed_df = df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*") |
#9: How would you perform incremental data loading from a source like MySQL to Delta Lake? | Read source with last_updated > last_checkpoint Write to Delta using MERGE or appendTrack watermark using audit column or checkpoint |
#10: What’s the difference between cache() and persist() ? When would you use one over the other? | cache() = memory onlypersist() = supports levels like MEMORY_AND_DISKUse persist() when data is too large or needs fault tolerance |
#11: You have skewed data during a groupBy . How would you optimize this? | Add salting key, repartition by skew key, use map-side combine, filter or pre-aggregate before shuffle |
#12: How would you write unit tests for your PySpark code? | Use unittest or pytest with local SparkSession. Validate logic by comparing actual vs expected DataFrames. |
#13: You want to add a row number to a DataFrame partitioned by a key and ordered by timestamp. How do you do it? | python\nfrom pyspark.sql.window import Window\nfrom pyspark.sql.functions import row_number\nw = Window.partitionBy("user_id").orderBy("timestamp")\ndf = df.withColumn("row_num", row_number().over(w)) |
#14: You’re writing to a Delta table and want to merge only new records. How do you use MERGE INTO ? | python\ndeltaTable.alias("t").merge(\n source_df.alias("s"), "t.id = s.id")\n .whenNotMatchedInsertAll()\n .execute() |
#15: Your Spark job is taking too long. How would you go about debugging and optimizing performance? | Use Spark UI to check stages/tasksOptimize shuffle: avoid wide transformationsUse caching, repartitioning, and broadcasting smartly |
#16: How would you read a huge CSV file and write it as partitioned Parquet based on a date column? | python\ndf = spark.read.csv("path", header=True, inferSchema=True)\ndf.write.partitionBy("date_col").parquet("output_path") |
#17: You want to broadcast a small DataFrame in a join. How do you do it and what are the caveats? | python\nfrom pyspark.sql.functions import broadcast\njoined = df1.join(broadcast(df2), "key") ⚠️ df2 must fit in executor memory |
#18: You’re processing streaming data from Kafka. How would you ensure exactly-once semantics? | Use Kafka + Delta SinkEnable checkpointing with .option("checkpointLocation", "chk_path") Delta ensures idempotent exactly-once writes |
#19: You have a list of dates per user and want to generate a daily activity flag for each day in a month. How do you do it? | Create a full calendar using sequence() and explode , then left join user activity and fill nulls with 0 |
#20: Your PySpark script runs fine locally but fails on the cluster. What could be the possible reasons? | 1. Missing dependencies or JARs2. Incorrect path (local vs HDFS/S3)3. Memory/resource config mismatch4. Spark version conflicts |
Here’s the next set of questions with crisp answers in the same clean format for your cheat sheet or flashcards:
Question | Answer |
---|---|
1. How can you optimize PySpark jobs for better performance? Discuss techniques like partitioning, caching, and broadcasting. | Partition data to reduce shuffle, cache/persist reused DataFrames, broadcast small datasets in joins to avoid shuffle, filter early, avoid wide transformations when possible. |
2. What are accumulators and broadcast variables in PySpark? How are they used? | Accumulators: variables to aggregate info (like counters) across executors.Broadcast variables: read-only shared variables sent to executors to avoid data duplication, mainly for small datasets in joins. |
3. Describe how PySpark handles data serialization and the impact on performance. | Uses JVM serialization and optionally Kryo for faster and compact serialization; inefficient serialization causes slow tasks and high GC overhead. |
4. How does PySpark manage memory, and what are some common issues related to memory management? | JVM heap divided into execution memory (shuffle, sort) and storage memory (cached data); issues include OOM errors due to skew, caching too much, or large shuffle spills. |
5. Explain the concept of checkpointing in PySpark and its importance in iterative algorithms. | Checkpoint saves RDD lineage to reliable storage to truncate DAG; helps avoid recomputation and stack overflow in iterative or long lineage jobs. |
6. How can you handle skewed data in PySpark to optimize performance? | Use salting keys, broadcast smaller side, repartition skewed keys separately, or filter/aggregate before join/groupBy. |
7. Discuss the role of the DAG (Directed Acyclic Graph) in PySpark’s execution model. | DAG represents the lineage of transformations; Spark creates stages from DAG to optimize task execution and scheduling. |
8. What are some common pitfalls when joining large datasets in PySpark, and how can they be mitigated? | Skewed joins causing OOM, shuffle explosion, not broadcasting small tables; mitigate by broadcasting, repartitioning, salting skew keys, filtering early. |
9. Describe the process of writing and running unit tests for PySpark applications. | Use local SparkSession in test setup, write test cases using unittest or pytest , compare expected vs actual DataFrames using .collect() or DataFrame equality checks. |
10. How does PySpark handle real-time data processing, and what are the key components involved? | Uses Structured Streaming API; key components: source (Kafka, socket), query with transformations, sink (console, Kafka, Delta), and checkpointing for fault tolerance. |
11. Discuss the importance of schema enforcement in PySpark and how it can be implemented. | Enforces data quality and prevents runtime errors; implemented via explicit schema definition when reading data or using StructType . |
12. What is the Tungsten execution engine in PySpark, and how does it improve performance? | Tungsten optimizes memory management using off-heap memory and code generation, improving CPU efficiency and reducing GC overhead. |
13. Explain the concept of window functions in PySpark and provide use cases where they are beneficial. | Perform calculations across rows related to the current row (e.g., running totals, rankings); useful in time-series, sessionization, and cumulative metrics. |
14. How can you implement custom partitioning in PySpark, and when would it be necessary? | Use partitionBy in write or rdd.partitionBy() with a custom partitioner function; necessary to optimize joins or shuffles on specific keys. |
15. Discuss the methods available in PySpark for handling missing or null values in datasets. | Use .fillna() , .dropna() , or .replace() to handle nulls; conditional filling using .when() and .otherwise() . |
16. What are some strategies for debugging and troubleshooting PySpark applications? | Use Spark UI for logs and stages, enable verbose logging, test locally, isolate problem steps, and use accumulators or debug prints. |
17. What are some best practices for writing efficient PySpark code? | Use DataFrame API over RDD, avoid UDFs if possible, cache smartly, minimize shuffles, broadcast small tables, filter early, and use built-in functions. |
18. How can you monitor and tune the performance of PySpark applications in a production environment? | Use Spark UI, Ganglia, or Spark History Server; tune executor memory, cores, shuffle partitions; analyze DAG and optimize hotspots. |
19. How can you implement custom UDFs (User-Defined Functions) in PySpark, and what are the performance considerations? | Use pyspark.sql.functions.udf or Pandas UDFs for vectorized performance; avoid Python UDFs when possible due to serialization overhead. |
20. What are the key strategies for optimizing memory usage in PySpark applications, and how do you implement them? | Tune executor memory, use Tungsten optimizations, cache only needed data, avoid large shuffles, and repartition data wisely. |
21. How does PySpark’s Tungsten execution engine improve memory and CPU efficiency? | By using off-heap memory management, whole-stage code generation, and cache-friendly data structures to reduce CPU cycles and GC pauses. |
22. What are the different persistence storage levels in PySpark, and how do they impact memory management? | MEMORY_ONLY, MEMORY_AND_DISK, DISK_ONLY, MEMORY_AND_DISK_SER, etc.; choose based on dataset size and available memory to balance speed vs fault tolerance. |
23. How can you identify and resolve memory bottlenecks in a PySpark application? | Monitor Spark UI for GC times and shuffle spills, adjust memory fractions, optimize data skew, reduce cached data size, and tune serialization. |