SET 1

Viewing 8 posts - 1 through 8 (of 8 total)
  • Author
    Posts
  • #6486
    lochan2014
    Keymaster

    list of top 20 scenario-based PySpark questions that are frequently asked in interviews:
    🎯 #1: You have a large dataset. How do you optimize reading only specific partitions using spark.read.parquet()?
    🎯 #2: Explain how you would handle schema mismatch when reading multiple JSON/Parquet files with different structures.
    🎯 #3: You want to write a DataFrame back to Parquet but keep the original file size consistent. What options do you use?
    🎯 #4: Why might a join operation cause executor OOM errors , and how can you avoid it?
    🎯 #5: You’re reading a CSV with missing values. How would you replace nulls dynamically across all columns?
    🎯 #6: How do you read a nested JSON and flatten it into a tabular format using PySpark?
    🎯 #7: How would you implement slowly changing dimension Type 2 (SCD2) logic using PySpark?
    🎯 #8: You need to read from a Kafka topic where each message is a JSON. How would you parse and process it?
    🎯 #9: How would you perform incremental data loading from a source like MySQL to Delta Lake?
    🎯 #10: What’s the difference between cache() and persist()? When would you use one over the other?
    🎯 #11: You have skewed data during a groupBy. How would you optimize this?
    🎯 #12: How would you write unit tests for your PySpark code?
    🎯 #13: You want to add a row number to a DataFrame partitioned by a key and ordered by timestamp. How do you do it?
    🎯 #14: You’re writing to a Delta table and want to merge only new records. How do you use MERGE INTO?
    🎯 #15: Your Spark job is taking too long. How would you go about debugging and optimizing performance?
    🎯 #16: How would you read a huge CSV file and write it as partitioned Parquet based on a date column?
    🎯 #17: You want to broadcast a small DataFrame in a join. How do you do it and what are the caveats?
    🎯 #18: You’re processing streaming data from Kafka. How would you ensure exactly-once semantics?
    🎯 #19: You have a list of dates per user and want to generate a daily activity flag for each day in a month. How do you do it?
    🎯 #20: You have a PySpark script that runs fine locally but fails on the cluster. What could be the possible reasons?
    Thanks to https://www.linkedin.com/feed/update/urn:li:activity:7337048186416439296/

    #6493
    lochan2014
    Keymaster

    To **optimize reading only specific partitions** using spark.read.parquet(), you can leverage **partition pruning**, which allows Spark to skip reading unnecessary files and only load relevant partitions.

    ### βœ… Here’s how:

    ### πŸ’‘ **Assume your dataset is partitioned like this:**

    /data/events/year=2023/month=01/
    /month=02/
    /year=2024/month=01/
    `

    ### βœ… Option 1: **Partition Pruning via Filter**

    If the Parquet data is partitioned by year and month, Spark will **automatically prune partitions** when you filter on those columns **after reading**:

    df = spark.read.parquet(“/data/events”)
    df_filtered = df.filter((df.year == 2023) & (df.month == 1))
    `

    ➑️ Spark reads **only the matching folders** using directory structure without scanning full data.

    ### βœ… Option 2: **Partition Directory Path Filtering (Manual)**

    You can also directly specify the partition path to **only read that portion** of the dataset:

    df = spark.read.parquet(“/data/events/year=2023/month=01”)
    `

    ➑️ This skips the full directory scan and reads only data from specified partition paths.

    ### βœ… Option 3: **Passing multiple paths**

    If you want to read a few selected partitions:

    df = spark.read.parquet(
    “/data/events/year=2023/month=01”,
    “/data/events/year=2024/month=01”
    )
    `

    ➑️ Only the specified paths are readβ€”efficient when you know exact partitions to load.

    ### πŸ› οΈ Best Practices

    * **Use filter on partition columns early** (for lazy pruning).
    * Always **partition your data** based on query patterns (e.g., time-based like year/month/day).
    * Avoid filtering on non-partitioned columns if performance is a concern during reads.
    * **Don’t cast partition columns**, as it can disable pruning (e.g., df.filter(df.year.cast("string") == "2023") may break pruning).

    ### πŸ” Check if pruning is applied:

    You can verify partition pruning by enabling physical plan explain:

    df_filtered.explain(True)
    `

    Look for PushedFilters: and ensure only necessary partitions are being accessed.

    #6494
    lochan2014
    Keymaster

    from pyspark.sql import SparkSession
    from pyspark.sql.utils import AnalysisException

    spark = SparkSession.builder \
    .appName(“SchemaMismatchHandler”) \
    .getOrCreate()

    from pyspark.sql.functions import lit

    # Example file paths
    file_paths = [“s3://bucket/data1.json”, “s3://bucket/data2.json”, “s3://bucket/data3.json”]

    # Step 1: Read all files separately and collect schemas
    dfs = []
    all_fields = set()

    for path in file_paths:
    try:
    df = spark.read.json(path)
    dfs.append(df)
    all_fields.update(df.schema.fieldNames())
    except AnalysisException as e:
    print(f”Failed to read {path}: {e}”)

    # Step 2: Align all DataFrames to a common schema
    def align_schema(df, all_columns):
    for col in all_columns:
    if col not in df.columns:
    df = df.withColumn(col, lit(None))
    return df.select(sorted(all_columns)) # Sort for consistent column order

    # Step 3: Apply schema alignment and union all
    aligned_dfs = [align_schema(df, all_fields) for df in dfs]
    final_df = aligned_dfs[0]
    for df in aligned_dfs[1:]:
    final_df = final_df.unionByName(df)

    # final_df is now a unified DataFrame with aligned schema
    final_df.show()
    `

    This handles schema mismatches by:

    * Reading each file independently.
    * Building a union of all column names.
    * Padding missing columns with null.
    * Ensuring consistent column order before unionByName.

    #6495
    lochan2014
    Keymaster

    `python
    # Write DataFrame to Parquet while controlling file size

    # Step 1: Estimate desired file size per file (in bytes)
    # For example, target ~128MB per file
    target_file_size = 128 * 1024 * 1024 # 128 MB

    # Step 2: Use df.rdd.getNumPartitions() and data size to tune partitions
    # Alternatively, use coalesce/repartition if size is known or can be estimated
    # Suppose you want 10 files around 128MB each
    df = df.repartition(10)

    # Step 3: Write with compression to reduce actual storage size
    df.write \
    .mode(“overwrite”) \
    .option(“compression”, “snappy”) \
    .parquet(“s3://your-bucket/target-path/”)

    # Optional: Use maxRecordsPerFile to cap records per file if you know average row size
    df.write \
    .mode(“overwrite”) \
    .option(“compression”, “snappy”) \
    .option(“maxRecordsPerFile”, 1_000_000) \
    .parquet(“s3://your-bucket/target-path/”)
    `

    **Key options used:**

    * .repartition(n) to control the number of output files.
    * .option("maxRecordsPerFile", n) to cap the number of rows per file.
    * Compression (snappy) to ensure consistent file size and performance.

    #6496
    lochan2014
    Keymaster

    # REASON: Join operations can cause executor Out Of Memory (OOM) errors when:
    # – One side of the join (especially in a shuffle join) is too large to fit in executor memory.
    # – Skewed keys cause data to pile up in specific partitions.
    # – Spark performs a wide transformation and shuffles too much data.

    # HOW TO AVOID OOM IN JOINS:

    # 1. Use Broadcast Join (if one side is small enough)
    from pyspark.sql.functions import broadcast

    result = large_df.join(broadcast(small_df), “join_key”)

    # 2. Repartition by join key to evenly distribute data
    df1 = df1.repartition(“join_key”)
    df2 = df2.repartition(“join_key”)

    # 3. Filter unnecessary columns before join to reduce memory footprint
    df1_trimmed = df1.select(“join_key”, “needed_col1”)
    df2_trimmed = df2.select(“join_key”, “needed_col2”)

    # 4. Handle skew by salting
    from pyspark.sql.functions import rand, concat_ws

    # Salt one side of the skewed join
    df1 = df1.withColumn(“salted_key”, concat_ws(“_”, df1[“join_key”], (rand() * 10).cast(“int”)))
    df2 = df2.withColumn(“salted_key”, concat_ws(“_”, df2[“join_key”], (rand() * 10).cast(“int”)))

    # Then join on salted_key

    # 5. Use spark.sql.autoBroadcastJoinThreshold wisely
    spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, 50 * 1024 * 1024) # 50MB

    # 6. Monitor partition sizes and optimize memory settings
    # Avoid shuffling massive partitions; tune spark.sql.shuffle.partitions

    # 7. Cache intermediate DataFrames if reused
    df1.cache()
    `

    **Summary:**
    OOM during joins typically occurs due to shuffling large or skewed data. Mitigate it using broadcast joins, repartitioning, filtering columns, salting for skew, tuning configs, and caching when needed.

    #6497
    lochan2014
    Keymaster

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, lit, when

    spark = SparkSession.builder.appName(“NullHandling”).getOrCreate()

    # Step 1: Read CSV with header and infer schema
    df = spark.read.option(“header”, True).option(“inferSchema”, True).csv(“path/to/file.csv”)

    # Step 2: Dynamically replace nulls based on column data types
    for column, dtype in df.dtypes:
    if dtype in [“int”, “bigint”, “double”, “float”]:
    df = df.withColumn(column, when(col(column).isNull(), lit(0)).otherwise(col(column)))
    elif dtype == “string”:
    df = df.withColumn(column, when(col(column).isNull(), lit(“missing”)).otherwise(col(column)))
    elif dtype == “boolean”:
    df = df.withColumn(column, when(col(column).isNull(), lit(False)).otherwise(col(column)))
    # Add other type-specific defaults as needed

    # Final cleaned DataFrame
    df.show()
    `

    ### βœ… Summary:

    * Automatically detects column types.
    * Fills nulls with type-appropriate defaults:
    0 for numbers, "missing" for strings, False for booleans.
    * Avoids hardcoding column names.

    #6499
    lochan2014
    Keymaster

    Absolutely! Here’s a crisp, sharp snippet-style answer for each of the commonly asked questions:

    ### πŸ”Ή **How do you handle corrupt records while reading JSON/CSV?**

    # For JSON
    df = spark.read.option(“badRecordsPath”, “s3://path/to/badRecords”).json(“s3://path/to/json”)

    # For CSV
    df = spark.read.option(“mode”, “PERMISSIVE”) \
    .option(“columnNameOfCorruptRecord”, “_corrupt_record”) \
    .csv(“s3://path/to/csv”)

    # Options: PERMISSIVE (default), DROPMALFORMED, FAILFAST

    ### πŸ”Ή **How do you handle duplicate rows in a DataFrame?**

    # Remove exact duplicates
    df = df.dropDuplicates()

    # Remove duplicates based on specific columns
    df = df.dropDuplicates([“col1”, “col2”])
    `

    ### πŸ”Ή **How to handle nulls before aggregation?**

    # Replace nulls before groupBy
    df = df.fillna(0) # or df.fillna({“col1”: 0, “col2”: “unknown”})

    # Then safely aggregate
    df.groupBy(“group_col”).agg({“metric_col”: “sum”})
    `

    ### πŸ”Ή **How do you read only specific columns from a Parquet file?**

    df = spark.read.parquet(“s3://path/to/data”).select(“col1”, “col2”)
    `

    ### πŸ”Ή **How do you optimize wide transformations like joins and groupBy?**

    # Use broadcast join if one side is small
    from pyspark.sql.functions import broadcast
    df = large_df.join(broadcast(small_df), “key”)

    # Repartition on group key to reduce shuffle pressure
    df = df.repartition(“group_key”)

    # Cache intermediate results if reused
    df.cache()
    `

    ### πŸ”Ή **How do you write partitioned Parquet files with overwrite behavior?**

    df.write.mode(“overwrite”) \
    .partitionBy(“year”, “month”) \
    .parquet(“s3://path/output/”)
    `

    ### πŸ”Ή **How do you check for skew in Spark join keys?**

    df.groupBy(“join_key”).count().orderBy(“count”, ascending=False).show(10)
    `

    Would you like a few more on file formats, cache vs persist, or checkpointing?

    #6500
    lochan2014
    Keymaster

    🎯 #6: Flatten Nested JSON to Tabular Format

    from pyspark.sql.functions import col

    df = spark.read.json(“path/to/nested.json”)

    # Flatten all nested structs
    flattened_df = df.selectExpr(“id”, “details.name”, “details.address.city”, “details.address.zip”)

    🎯 #7: SCD Type 2 in PySpark

    from delta.tables import DeltaTable
    from pyspark.sql.functions import current_date, lit

    # Existing Delta table
    target = DeltaTable.forPath(spark, “s3://path/delta_table”)

    # New data
    incoming_df = spark.read.parquet(“s3://path/incoming”)

    # Merge logic
    target.alias(“t”).merge(
    incoming_df.alias(“s”),
    “t.id = s.id AND t.current_flag = true AND (t.attr1 != s.attr1 OR t.attr2 != s.attr2)”
    ).whenMatchedUpdate(set={
    “current_flag”: lit(False),
    “end_date”: current_date()
    }).whenNotMatchedInsert(values={
    “id”: “s.id”,
    “attr1”: “s.attr1”,
    “attr2”: “s.attr2”,
    “start_date”: current_date(),
    “end_date”: lit(None),
    “current_flag”: lit(True)
    }).execute()

    🎯 #8: Parse Kafka JSON Messages

    from pyspark.sql.functions import from_json, col
    from pyspark.sql.types import StructType, StringType

    schema = StructType().add(“user_id”, StringType()).add(“action”, StringType())

    df = spark.readStream \
    .format(“kafka”) \
    .option(“subscribe”, “topic-name”) \
    .load()

    json_df = df.select(from_json(col(“value”).cast(“string”), schema).alias(“data”)).select(“data.*”)

    🎯 #9: Incremental Load from MySQL to Delta

    # Step 1: Load latest watermark
    last_ts = spark.read.format(“delta”).table(“delta_table”).agg({“updated_at”: “max”}).collect()[0][0]

    # Step 2: Pull only new rows
    jdbc_df = spark.read \
    .format(“jdbc”) \
    .option(“url”, “jdbc:mysql://…”) \
    .option(“query”, f”SELECT * FROM table WHERE updated_at > ‘{last_ts}'”) \
    .load()

    # Step 3: Upsert into Delta
    from delta.tables import DeltaTable
    delta_table = DeltaTable.forPath(spark, “s3://delta_table”)

    delta_table.alias(“t”).merge(
    jdbc_df.alias(“s”),
    “t.id = s.id”
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

    🎯 #10: cache() vs persist()

    df.cache() # Uses MEMORY_AND_DISK by default
    df.persist() # Customize storage level, e.g., MEMORY_ONLY, DISK_ONLY

    # Use cache() when memory suffices and the data is reused multiple times.
    # Use persist() when memory may not be enough or if you want control.

    🎯 #11: Optimize Skewed groupBy

    # Add random salt to reduce skew
    from pyspark.sql.functions import expr

    df = df.withColumn(“salted_key”, expr(“concat(group_key, ‘_’, floor(rand()*10))”))

    df.groupBy(“salted_key”).agg(…) # Later remove salt if needed

    🎯 #12: Unit Tests in PySpark

    import unittest
    from pyspark.sql import SparkSession

    class MyTest(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
    cls.spark = SparkSession.builder.master(“local[1]”).appName(“test”).getOrCreate()

    def test_transformation(self):
    df = self.spark.createDataFrame([(1, “a”), (2, “b”)], [“id”, “val”])
    result = df.filter(“id = 1”).collect()
    self.assertEqual(result[0][“val”], “a”)

    🎯 #13: Row Number by Partition

    from pyspark.sql.window import Window
    from pyspark.sql.functions import row_number

    window_spec = Window.partitionBy(“user_id”).orderBy(“event_time”)

    df = df.withColumn(“row_num”, row_number().over(window_spec))

    🎯 #14: MERGE INTO Delta for New Records

    from delta.tables import DeltaTable

    delta = DeltaTable.forPath(spark, “s3://delta_table”)
    new_data = spark.read.parquet(“s3://incoming”)

    delta.alias(“t”).merge(
    new_data.alias(“s”),
    “t.id = s.id”
    ).whenNotMatchedInsertAll().execute()

    🎯 #15: Debug & Optimize Slow Spark Jobs

    # πŸ” Steps:
    # 1. Use Spark UI β†’ check stages, skew, shuffle size.
    # 2. Cache/persist reused DataFrames.
    # 3. Avoid UDFs unless necessary.
    # 4. Repartition wisely.
    # 5. Tune configs:
    spark.conf.set(“spark.sql.shuffle.partitions”, 200)
    spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, 50*1024*1024)

    🎯 #16: CSV β†’ Partitioned Parquet by Date

    df = spark.read.option(“header”, True).csv(“s3://path/data.csv”)

    df.write.partitionBy(“date_col”) \
    .mode(“overwrite”) \
    .parquet(“s3://path/output/”)

    🎯 #17: Broadcast Join + Caveats

    from pyspark.sql.functions import broadcast

    df = big_df.join(broadcast(small_df), “id”)

    # ⚠️ Caveat: Ensure small_df fits in executor memory (~<10MB-50MB)

    🎯 #18: Kafka Streaming with Exactly-Once

    stream_df = spark.readStream.format("kafka")... # your stream config

    # Ensure checkpointing + idempotent sink (like Delta)
    stream_df.writeStream \
    .format("delta") \
    .option("checkpointLocation", "s3://chk/") \
    .outputMode("append") \
    .start("s3://delta/output")

    🎯 #19: Daily Activity Flag

    from pyspark.sql.functions import explode, sequence, lit, col, to_date

    # List of active dates per user
    df = df.withColumn("date", to_date("date"))

    # Generate full month range
    full_dates = spark.range(1).select(explode(sequence(lit("2024-01-01"), lit("2024-01-31"))).alias("date"))

    # Cross join users with date range
    users = df.select("user_id").distinct()
    calendar = users.crossJoin(full_dates)

    # Left join and flag activity
    result = calendar.join(df, ["user_id", "date"], "left") \
    .withColumn("active_flag", (col("df.date").isNotNull()).cast("int"))

    🎯 #20: Script Fails on Cluster but Not Locally – Possible Causes

    1. Missing files/resources on cluster (e.g., JARs, dependencies).
    2. Schema inference issues (e.g., inferSchema=True works locally but fails on cluster scale).
    3. Cluster memory limits (OOM).
    4. Code depends on local paths not accessible in distributed mode.
    5. Local mode uses single JVM – cluster uses distributed shuffle, causing real execution paths.
    6. Version mismatch (e.g., Spark/Python/PyArrow).
    7. Missing permissions on HDFS/S3.

Viewing 8 posts - 1 through 8 (of 8 total)
  • You must be logged in to reply to this topic.