HintsToday

Hints and Answers for Everything

SET 1 - HintsToday

SET 1

Viewing 8 posts - 1 through 8 (of 8 total)
  • Author
    Posts
  • #6486
    lochan2014
    Keymaster

      list of top 20 scenario-based PySpark questions that are frequently asked in interviews:
      🎯 #1: You have a large dataset. How do you optimize reading only specific partitions using spark.read.parquet()?
      🎯 #2: Explain how you would handle schema mismatch when reading multiple JSON/Parquet files with different structures.
      🎯 #3: You want to write a DataFrame back to Parquet but keep the original file size consistent. What options do you use?
      🎯 #4: Why might a join operation cause executor OOM errors , and how can you avoid it?
      🎯 #5: You’re reading a CSV with missing values. How would you replace nulls dynamically across all columns?
      🎯 #6: How do you read a nested JSON and flatten it into a tabular format using PySpark?
      🎯 #7: How would you implement slowly changing dimension Type 2 (SCD2) logic using PySpark?
      🎯 #8: You need to read from a Kafka topic where each message is a JSON. How would you parse and process it?
      🎯 #9: How would you perform incremental data loading from a source like MySQL to Delta Lake?
      🎯 #10: What’s the difference between cache() and persist()? When would you use one over the other?
      🎯 #11: You have skewed data during a groupBy. How would you optimize this?
      🎯 #12: How would you write unit tests for your PySpark code?
      🎯 #13: You want to add a row number to a DataFrame partitioned by a key and ordered by timestamp. How do you do it?
      🎯 #14: You’re writing to a Delta table and want to merge only new records. How do you use MERGE INTO?
      🎯 #15: Your Spark job is taking too long. How would you go about debugging and optimizing performance?
      🎯 #16: How would you read a huge CSV file and write it as partitioned Parquet based on a date column?
      🎯 #17: You want to broadcast a small DataFrame in a join. How do you do it and what are the caveats?
      🎯 #18: You’re processing streaming data from Kafka. How would you ensure exactly-once semantics?
      🎯 #19: You have a list of dates per user and want to generate a daily activity flag for each day in a month. How do you do it?
      🎯 #20: You have a PySpark script that runs fine locally but fails on the cluster. What could be the possible reasons?
      Thanks to https://www.linkedin.com/feed/update/urn:li:activity:7337048186416439296/

      #6493
      lochan2014
      Keymaster

        To **optimize reading only specific partitions** using spark.read.parquet(), you can leverage **partition pruning**, which allows Spark to skip reading unnecessary files and only load relevant partitions.

        ### βœ… Here’s how:

        ### πŸ’‘ **Assume your dataset is partitioned like this:**

        /data/events/year=2023/month=01/
        /month=02/
        /year=2024/month=01/
        `

        ### βœ… Option 1: **Partition Pruning via Filter**

        If the Parquet data is partitioned by year and month, Spark will **automatically prune partitions** when you filter on those columns **after reading**:

        df = spark.read.parquet(“/data/events”)
        df_filtered = df.filter((df.year == 2023) & (df.month == 1))
        `

        ➑️ Spark reads **only the matching folders** using directory structure without scanning full data.

        ### βœ… Option 2: **Partition Directory Path Filtering (Manual)**

        You can also directly specify the partition path to **only read that portion** of the dataset:

        df = spark.read.parquet(“/data/events/year=2023/month=01”)
        `

        ➑️ This skips the full directory scan and reads only data from specified partition paths.

        ### βœ… Option 3: **Passing multiple paths**

        If you want to read a few selected partitions:

        df = spark.read.parquet(
        “/data/events/year=2023/month=01”,
        “/data/events/year=2024/month=01”
        )
        `

        ➑️ Only the specified paths are readβ€”efficient when you know exact partitions to load.

        ### πŸ› οΈ Best Practices

        * **Use filter on partition columns early** (for lazy pruning).
        * Always **partition your data** based on query patterns (e.g., time-based like year/month/day).
        * Avoid filtering on non-partitioned columns if performance is a concern during reads.
        * **Don’t cast partition columns**, as it can disable pruning (e.g., df.filter(df.year.cast("string") == "2023") may break pruning).

        ### πŸ” Check if pruning is applied:

        You can verify partition pruning by enabling physical plan explain:

        df_filtered.explain(True)
        `

        Look for PushedFilters: and ensure only necessary partitions are being accessed.

        #6494
        lochan2014
        Keymaster

          from pyspark.sql import SparkSession
          from pyspark.sql.utils import AnalysisException

          spark = SparkSession.builder \
          .appName(“SchemaMismatchHandler”) \
          .getOrCreate()

          from pyspark.sql.functions import lit

          # Example file paths
          file_paths = [“s3://bucket/data1.json”, “s3://bucket/data2.json”, “s3://bucket/data3.json”]

          # Step 1: Read all files separately and collect schemas
          dfs = []
          all_fields = set()

          for path in file_paths:
          try:
          df = spark.read.json(path)
          dfs.append(df)
          all_fields.update(df.schema.fieldNames())
          except AnalysisException as e:
          print(f”Failed to read {path}: {e}”)

          # Step 2: Align all DataFrames to a common schema
          def align_schema(df, all_columns):
          for col in all_columns:
          if col not in df.columns:
          df = df.withColumn(col, lit(None))
          return df.select(sorted(all_columns)) # Sort for consistent column order

          # Step 3: Apply schema alignment and union all
          aligned_dfs = [align_schema(df, all_fields) for df in dfs]
          final_df = aligned_dfs[0]
          for df in aligned_dfs[1:]:
          final_df = final_df.unionByName(df)

          # final_df is now a unified DataFrame with aligned schema
          final_df.show()
          `

          This handles schema mismatches by:

          * Reading each file independently.
          * Building a union of all column names.
          * Padding missing columns with null.
          * Ensuring consistent column order before unionByName.

          #6495
          lochan2014
          Keymaster

            `python
            # Write DataFrame to Parquet while controlling file size

            # Step 1: Estimate desired file size per file (in bytes)
            # For example, target ~128MB per file
            target_file_size = 128 * 1024 * 1024 # 128 MB

            # Step 2: Use df.rdd.getNumPartitions() and data size to tune partitions
            # Alternatively, use coalesce/repartition if size is known or can be estimated
            # Suppose you want 10 files around 128MB each
            df = df.repartition(10)

            # Step 3: Write with compression to reduce actual storage size
            df.write \
            .mode(“overwrite”) \
            .option(“compression”, “snappy”) \
            .parquet(“s3://your-bucket/target-path/”)

            # Optional: Use maxRecordsPerFile to cap records per file if you know average row size
            df.write \
            .mode(“overwrite”) \
            .option(“compression”, “snappy”) \
            .option(“maxRecordsPerFile”, 1_000_000) \
            .parquet(“s3://your-bucket/target-path/”)
            `

            **Key options used:**

            * .repartition(n) to control the number of output files.
            * .option("maxRecordsPerFile", n) to cap the number of rows per file.
            * Compression (snappy) to ensure consistent file size and performance.

            #6496
            lochan2014
            Keymaster

              # REASON: Join operations can cause executor Out Of Memory (OOM) errors when:
              # – One side of the join (especially in a shuffle join) is too large to fit in executor memory.
              # – Skewed keys cause data to pile up in specific partitions.
              # – Spark performs a wide transformation and shuffles too much data.

              # HOW TO AVOID OOM IN JOINS:

              # 1. Use Broadcast Join (if one side is small enough)
              from pyspark.sql.functions import broadcast

              result = large_df.join(broadcast(small_df), “join_key”)

              # 2. Repartition by join key to evenly distribute data
              df1 = df1.repartition(“join_key”)
              df2 = df2.repartition(“join_key”)

              # 3. Filter unnecessary columns before join to reduce memory footprint
              df1_trimmed = df1.select(“join_key”, “needed_col1”)
              df2_trimmed = df2.select(“join_key”, “needed_col2”)

              # 4. Handle skew by salting
              from pyspark.sql.functions import rand, concat_ws

              # Salt one side of the skewed join
              df1 = df1.withColumn(“salted_key”, concat_ws(“_”, df1[“join_key”], (rand() * 10).cast(“int”)))
              df2 = df2.withColumn(“salted_key”, concat_ws(“_”, df2[“join_key”], (rand() * 10).cast(“int”)))

              # Then join on salted_key

              # 5. Use spark.sql.autoBroadcastJoinThreshold wisely
              spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, 50 * 1024 * 1024) # 50MB

              # 6. Monitor partition sizes and optimize memory settings
              # Avoid shuffling massive partitions; tune spark.sql.shuffle.partitions

              # 7. Cache intermediate DataFrames if reused
              df1.cache()
              `

              **Summary:**
              OOM during joins typically occurs due to shuffling large or skewed data. Mitigate it using broadcast joins, repartitioning, filtering columns, salting for skew, tuning configs, and caching when needed.

              #6497
              lochan2014
              Keymaster

                from pyspark.sql import SparkSession
                from pyspark.sql.functions import col, lit, when

                spark = SparkSession.builder.appName(“NullHandling”).getOrCreate()

                # Step 1: Read CSV with header and infer schema
                df = spark.read.option(“header”, True).option(“inferSchema”, True).csv(“path/to/file.csv”)

                # Step 2: Dynamically replace nulls based on column data types
                for column, dtype in df.dtypes:
                if dtype in [“int”, “bigint”, “double”, “float”]:
                df = df.withColumn(column, when(col(column).isNull(), lit(0)).otherwise(col(column)))
                elif dtype == “string”:
                df = df.withColumn(column, when(col(column).isNull(), lit(“missing”)).otherwise(col(column)))
                elif dtype == “boolean”:
                df = df.withColumn(column, when(col(column).isNull(), lit(False)).otherwise(col(column)))
                # Add other type-specific defaults as needed

                # Final cleaned DataFrame
                df.show()
                `

                ### βœ… Summary:

                * Automatically detects column types.
                * Fills nulls with type-appropriate defaults:
                0 for numbers, "missing" for strings, False for booleans.
                * Avoids hardcoding column names.

                #6499
                lochan2014
                Keymaster

                  Absolutely! Here’s a crisp, sharp snippet-style answer for each of the commonly asked questions:

                  ### πŸ”Ή **How do you handle corrupt records while reading JSON/CSV?**

                  # For JSON
                  df = spark.read.option(“badRecordsPath”, “s3://path/to/badRecords”).json(“s3://path/to/json”)

                  # For CSV
                  df = spark.read.option(“mode”, “PERMISSIVE”) \
                  .option(“columnNameOfCorruptRecord”, “_corrupt_record”) \
                  .csv(“s3://path/to/csv”)

                  # Options: PERMISSIVE (default), DROPMALFORMED, FAILFAST

                  ### πŸ”Ή **How do you handle duplicate rows in a DataFrame?**

                  # Remove exact duplicates
                  df = df.dropDuplicates()

                  # Remove duplicates based on specific columns
                  df = df.dropDuplicates([“col1”, “col2”])
                  `

                  ### πŸ”Ή **How to handle nulls before aggregation?**

                  # Replace nulls before groupBy
                  df = df.fillna(0) # or df.fillna({“col1”: 0, “col2”: “unknown”})

                  # Then safely aggregate
                  df.groupBy(“group_col”).agg({“metric_col”: “sum”})
                  `

                  ### πŸ”Ή **How do you read only specific columns from a Parquet file?**

                  df = spark.read.parquet(“s3://path/to/data”).select(“col1”, “col2”)
                  `

                  ### πŸ”Ή **How do you optimize wide transformations like joins and groupBy?**

                  # Use broadcast join if one side is small
                  from pyspark.sql.functions import broadcast
                  df = large_df.join(broadcast(small_df), “key”)

                  # Repartition on group key to reduce shuffle pressure
                  df = df.repartition(“group_key”)

                  # Cache intermediate results if reused
                  df.cache()
                  `

                  ### πŸ”Ή **How do you write partitioned Parquet files with overwrite behavior?**

                  df.write.mode(“overwrite”) \
                  .partitionBy(“year”, “month”) \
                  .parquet(“s3://path/output/”)
                  `

                  ### πŸ”Ή **How do you check for skew in Spark join keys?**

                  df.groupBy(“join_key”).count().orderBy(“count”, ascending=False).show(10)
                  `

                  Would you like a few more on file formats, cache vs persist, or checkpointing?

                  #6500
                  lochan2014
                  Keymaster

                    🎯 #6: Flatten Nested JSON to Tabular Format

                    from pyspark.sql.functions import col

                    df = spark.read.json(“path/to/nested.json”)

                    # Flatten all nested structs
                    flattened_df = df.selectExpr(“id”, “details.name”, “details.address.city”, “details.address.zip”)

                    🎯 #7: SCD Type 2 in PySpark

                    from delta.tables import DeltaTable
                    from pyspark.sql.functions import current_date, lit

                    # Existing Delta table
                    target = DeltaTable.forPath(spark, “s3://path/delta_table”)

                    # New data
                    incoming_df = spark.read.parquet(“s3://path/incoming”)

                    # Merge logic
                    target.alias(“t”).merge(
                    incoming_df.alias(“s”),
                    “t.id = s.id AND t.current_flag = true AND (t.attr1 != s.attr1 OR t.attr2 != s.attr2)”
                    ).whenMatchedUpdate(set={
                    “current_flag”: lit(False),
                    “end_date”: current_date()
                    }).whenNotMatchedInsert(values={
                    “id”: “s.id”,
                    “attr1”: “s.attr1”,
                    “attr2”: “s.attr2”,
                    “start_date”: current_date(),
                    “end_date”: lit(None),
                    “current_flag”: lit(True)
                    }).execute()

                    🎯 #8: Parse Kafka JSON Messages

                    from pyspark.sql.functions import from_json, col
                    from pyspark.sql.types import StructType, StringType

                    schema = StructType().add(“user_id”, StringType()).add(“action”, StringType())

                    df = spark.readStream \
                    .format(“kafka”) \
                    .option(“subscribe”, “topic-name”) \
                    .load()

                    json_df = df.select(from_json(col(“value”).cast(“string”), schema).alias(“data”)).select(“data.*”)

                    🎯 #9: Incremental Load from MySQL to Delta

                    # Step 1: Load latest watermark
                    last_ts = spark.read.format(“delta”).table(“delta_table”).agg({“updated_at”: “max”}).collect()[0][0]

                    # Step 2: Pull only new rows
                    jdbc_df = spark.read \
                    .format(“jdbc”) \
                    .option(“url”, “jdbc:mysql://…”) \
                    .option(“query”, f”SELECT * FROM table WHERE updated_at > ‘{last_ts}'”) \
                    .load()

                    # Step 3: Upsert into Delta
                    from delta.tables import DeltaTable
                    delta_table = DeltaTable.forPath(spark, “s3://delta_table”)

                    delta_table.alias(“t”).merge(
                    jdbc_df.alias(“s”),
                    “t.id = s.id”
                    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

                    🎯 #10: cache() vs persist()

                    df.cache() # Uses MEMORY_AND_DISK by default
                    df.persist() # Customize storage level, e.g., MEMORY_ONLY, DISK_ONLY

                    # Use cache() when memory suffices and the data is reused multiple times.
                    # Use persist() when memory may not be enough or if you want control.

                    🎯 #11: Optimize Skewed groupBy

                    # Add random salt to reduce skew
                    from pyspark.sql.functions import expr

                    df = df.withColumn(“salted_key”, expr(“concat(group_key, ‘_’, floor(rand()*10))”))

                    df.groupBy(“salted_key”).agg(…) # Later remove salt if needed

                    🎯 #12: Unit Tests in PySpark

                    import unittest
                    from pyspark.sql import SparkSession

                    class MyTest(unittest.TestCase):
                    @classmethod
                    def setUpClass(cls):
                    cls.spark = SparkSession.builder.master(“local[1]”).appName(“test”).getOrCreate()

                    def test_transformation(self):
                    df = self.spark.createDataFrame([(1, “a”), (2, “b”)], [“id”, “val”])
                    result = df.filter(“id = 1”).collect()
                    self.assertEqual(result[0][“val”], “a”)

                    🎯 #13: Row Number by Partition

                    from pyspark.sql.window import Window
                    from pyspark.sql.functions import row_number

                    window_spec = Window.partitionBy(“user_id”).orderBy(“event_time”)

                    df = df.withColumn(“row_num”, row_number().over(window_spec))

                    🎯 #14: MERGE INTO Delta for New Records

                    from delta.tables import DeltaTable

                    delta = DeltaTable.forPath(spark, “s3://delta_table”)
                    new_data = spark.read.parquet(“s3://incoming”)

                    delta.alias(“t”).merge(
                    new_data.alias(“s”),
                    “t.id = s.id”
                    ).whenNotMatchedInsertAll().execute()

                    🎯 #15: Debug & Optimize Slow Spark Jobs

                    # πŸ” Steps:
                    # 1. Use Spark UI β†’ check stages, skew, shuffle size.
                    # 2. Cache/persist reused DataFrames.
                    # 3. Avoid UDFs unless necessary.
                    # 4. Repartition wisely.
                    # 5. Tune configs:
                    spark.conf.set(“spark.sql.shuffle.partitions”, 200)
                    spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, 50*1024*1024)

                    🎯 #16: CSV β†’ Partitioned Parquet by Date

                    df = spark.read.option(“header”, True).csv(“s3://path/data.csv”)

                    df.write.partitionBy(“date_col”) \
                    .mode(“overwrite”) \
                    .parquet(“s3://path/output/”)

                    🎯 #17: Broadcast Join + Caveats

                    from pyspark.sql.functions import broadcast

                    df = big_df.join(broadcast(small_df), “id”)

                    # ⚠️ Caveat: Ensure small_df fits in executor memory (~<10MB-50MB)

                    🎯 #18: Kafka Streaming with Exactly-Once

                    stream_df = spark.readStream.format("kafka")... # your stream config

                    # Ensure checkpointing + idempotent sink (like Delta)
                    stream_df.writeStream \
                    .format("delta") \
                    .option("checkpointLocation", "s3://chk/") \
                    .outputMode("append") \
                    .start("s3://delta/output")

                    🎯 #19: Daily Activity Flag

                    from pyspark.sql.functions import explode, sequence, lit, col, to_date

                    # List of active dates per user
                    df = df.withColumn("date", to_date("date"))

                    # Generate full month range
                    full_dates = spark.range(1).select(explode(sequence(lit("2024-01-01"), lit("2024-01-31"))).alias("date"))

                    # Cross join users with date range
                    users = df.select("user_id").distinct()
                    calendar = users.crossJoin(full_dates)

                    # Left join and flag activity
                    result = calendar.join(df, ["user_id", "date"], "left") \
                    .withColumn("active_flag", (col("df.date").isNotNull()).cast("int"))

                    🎯 #20: Script Fails on Cluster but Not Locally – Possible Causes

                    1. Missing files/resources on cluster (e.g., JARs, dependencies).
                    2. Schema inference issues (e.g., inferSchema=True works locally but fails on cluster scale).
                    3. Cluster memory limits (OOM).
                    4. Code depends on local paths not accessible in distributed mode.
                    5. Local mode uses single JVM – cluster uses distributed shuffle, causing real execution paths.
                    6. Version mismatch (e.g., Spark/Python/PyArrow).
                    7. Missing permissions on HDFS/S3.

                  Viewing 8 posts - 1 through 8 (of 8 total)
                  • You must be logged in to reply to this topic.