- This topic has 7 replies, 1 voice, and was last updated 2 hours, 46 minutes ago by
lochan2014.
- AuthorPosts
- June 7, 2025 at 1:09 pm #6486
lochan2014
Keymasterlist of top 20 scenario-based PySpark questions that are frequently asked in interviews:
π― #1: You have a large dataset. How do you optimize reading only specific partitions using spark.read.parquet()?
π― #2: Explain how you would handle schema mismatch when reading multiple JSON/Parquet files with different structures.
π― #3: You want to write a DataFrame back to Parquet but keep the original file size consistent. What options do you use?
π― #4: Why might a join operation cause executor OOM errors , and how can you avoid it?
π― #5: Youβre reading a CSV with missing values. How would you replace nulls dynamically across all columns?
π― #6: How do you read a nested JSON and flatten it into a tabular format using PySpark?
π― #7: How would you implement slowly changing dimension Type 2 (SCD2) logic using PySpark?
π― #8: You need to read from a Kafka topic where each message is a JSON. How would you parse and process it?
π― #9: How would you perform incremental data loading from a source like MySQL to Delta Lake?
π― #10: Whatβs the difference between cache() and persist()? When would you use one over the other?
π― #11: You have skewed data during a groupBy. How would you optimize this?
π― #12: How would you write unit tests for your PySpark code?
π― #13: You want to add a row number to a DataFrame partitioned by a key and ordered by timestamp. How do you do it?
π― #14: Youβre writing to a Delta table and want to merge only new records. How do you use MERGE INTO?
π― #15: Your Spark job is taking too long. How would you go about debugging and optimizing performance?
π― #16: How would you read a huge CSV file and write it as partitioned Parquet based on a date column?
π― #17: You want to broadcast a small DataFrame in a join. How do you do it and what are the caveats?
π― #18: You’re processing streaming data from Kafka. How would you ensure exactly-once semantics?
π― #19: You have a list of dates per user and want to generate a daily activity flag for each day in a month. How do you do it?
π― #20: You have a PySpark script that runs fine locally but fails on the cluster. What could be the possible reasons?
Thanks to https://www.linkedin.com/feed/update/urn:li:activity:7337048186416439296/June 7, 2025 at 1:22 pm #6493lochan2014
KeymasterTo **optimize reading only specific partitions** using
spark.read.parquet()
, you can leverage **partition pruning**, which allows Spark to skip reading unnecessary files and only load relevant partitions.### β Here’s how:
—
### π‘ **Assume your dataset is partitioned like this:**
/data/events/year=2023/month=01/
/month=02/
/year=2024/month=01/
`
—
### β Option 1: **Partition Pruning via Filter**
If the Parquet data is partitioned by
year
andmonth
, Spark will **automatically prune partitions** when you filter on those columns **after reading**:df = spark.read.parquet(“/data/events”)
df_filtered = df.filter((df.year == 2023) & (df.month == 1))
`
β‘οΈ Spark reads **only the matching folders** using directory structure without scanning full data.
—
### β Option 2: **Partition Directory Path Filtering (Manual)**
You can also directly specify the partition path to **only read that portion** of the dataset:
df = spark.read.parquet(“/data/events/year=2023/month=01”)
`
β‘οΈ This skips the full directory scan and reads only data from specified partition paths.
—
### β Option 3: **Passing multiple paths**
If you want to read a few selected partitions:
df = spark.read.parquet(
“/data/events/year=2023/month=01”,
“/data/events/year=2024/month=01”
)
`
β‘οΈ Only the specified paths are readβefficient when you know exact partitions to load.
—
### π οΈ Best Practices
* **Use filter on partition columns early** (for lazy pruning).
* Always **partition your data** based on query patterns (e.g., time-based likeyear/month/day
).
* Avoid filtering on non-partitioned columns if performance is a concern during reads.
* **Donβt cast partition columns**, as it can disable pruning (e.g.,df.filter(df.year.cast("string") == "2023")
may break pruning).—
### π Check if pruning is applied:
You can verify partition pruning by enabling physical plan explain:
df_filtered.explain(True)
`
Look for
PushedFilters:
and ensure only necessary partitions are being accessed.June 7, 2025 at 1:39 pm #6494lochan2014
Keymasterfrom pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisExceptionspark = SparkSession.builder \
.appName(“SchemaMismatchHandler”) \
.getOrCreate()from pyspark.sql.functions import lit
# Example file paths
file_paths = [“s3://bucket/data1.json”, “s3://bucket/data2.json”, “s3://bucket/data3.json”]# Step 1: Read all files separately and collect schemas
dfs = []
all_fields = set()for path in file_paths:
try:
df = spark.read.json(path)
dfs.append(df)
all_fields.update(df.schema.fieldNames())
except AnalysisException as e:
print(f”Failed to read {path}: {e}”)# Step 2: Align all DataFrames to a common schema
def align_schema(df, all_columns):
for col in all_columns:
if col not in df.columns:
df = df.withColumn(col, lit(None))
return df.select(sorted(all_columns)) # Sort for consistent column order# Step 3: Apply schema alignment and union all
aligned_dfs = [align_schema(df, all_fields) for df in dfs]
final_df = aligned_dfs[0]
for df in aligned_dfs[1:]:
final_df = final_df.unionByName(df)# final_df is now a unified DataFrame with aligned schema
final_df.show()
`
This handles schema mismatches by:
* Reading each file independently.
* Building a union of all column names.
* Padding missing columns withnull
.
* Ensuring consistent column order beforeunionByName
.June 7, 2025 at 1:42 pm #6495lochan2014
Keymaster`python
# Write DataFrame to Parquet while controlling file size# Step 1: Estimate desired file size per file (in bytes)
# For example, target ~128MB per file
target_file_size = 128 * 1024 * 1024 # 128 MB# Step 2: Use
df.rdd.getNumPartitions()
and data size to tune partitions
# Alternatively, use coalesce/repartition if size is known or can be estimated
# Suppose you want 10 files around 128MB each
df = df.repartition(10)# Step 3: Write with compression to reduce actual storage size
df.write \
.mode(“overwrite”) \
.option(“compression”, “snappy”) \
.parquet(“s3://your-bucket/target-path/”)# Optional: Use
maxRecordsPerFile
to cap records per file if you know average row size
df.write \
.mode(“overwrite”) \
.option(“compression”, “snappy”) \
.option(“maxRecordsPerFile”, 1_000_000) \
.parquet(“s3://your-bucket/target-path/”)
`
**Key options used:**
*
.repartition(n)
to control the number of output files.
*.option("maxRecordsPerFile", n)
to cap the number of rows per file.
* Compression (snappy
) to ensure consistent file size and performance.June 7, 2025 at 1:49 pm #6496lochan2014
Keymaster# REASON: Join operations can cause executor Out Of Memory (OOM) errors when:
# – One side of the join (especially in a shuffle join) is too large to fit in executor memory.
# – Skewed keys cause data to pile up in specific partitions.
# – Spark performs a wide transformation and shuffles too much data.# HOW TO AVOID OOM IN JOINS:
# 1. Use Broadcast Join (if one side is small enough)
from pyspark.sql.functions import broadcastresult = large_df.join(broadcast(small_df), “join_key”)
# 2. Repartition by join key to evenly distribute data
df1 = df1.repartition(“join_key”)
df2 = df2.repartition(“join_key”)# 3. Filter unnecessary columns before join to reduce memory footprint
df1_trimmed = df1.select(“join_key”, “needed_col1”)
df2_trimmed = df2.select(“join_key”, “needed_col2”)# 4. Handle skew by salting
from pyspark.sql.functions import rand, concat_ws# Salt one side of the skewed join
df1 = df1.withColumn(“salted_key”, concat_ws(“_”, df1[“join_key”], (rand() * 10).cast(“int”)))
df2 = df2.withColumn(“salted_key”, concat_ws(“_”, df2[“join_key”], (rand() * 10).cast(“int”)))# Then join on salted_key
# 5. Use
spark.sql.autoBroadcastJoinThreshold
wisely
spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, 50 * 1024 * 1024) # 50MB# 6. Monitor partition sizes and optimize memory settings
# Avoid shuffling massive partitions; tunespark.sql.shuffle.partitions
# 7. Cache intermediate DataFrames if reused
df1.cache()
`
**Summary:**
OOM during joins typically occurs due to shuffling large or skewed data. Mitigate it using broadcast joins, repartitioning, filtering columns, salting for skew, tuning configs, and caching when needed.June 7, 2025 at 1:51 pm #6497lochan2014
Keymasterfrom pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, whenspark = SparkSession.builder.appName(“NullHandling”).getOrCreate()
# Step 1: Read CSV with header and infer schema
df = spark.read.option(“header”, True).option(“inferSchema”, True).csv(“path/to/file.csv”)# Step 2: Dynamically replace nulls based on column data types
for column, dtype in df.dtypes:
if dtype in [“int”, “bigint”, “double”, “float”]:
df = df.withColumn(column, when(col(column).isNull(), lit(0)).otherwise(col(column)))
elif dtype == “string”:
df = df.withColumn(column, when(col(column).isNull(), lit(“missing”)).otherwise(col(column)))
elif dtype == “boolean”:
df = df.withColumn(column, when(col(column).isNull(), lit(False)).otherwise(col(column)))
# Add other type-specific defaults as needed# Final cleaned DataFrame
df.show()
`
### β Summary:
* Automatically detects column types.
* Fills nulls with type-appropriate defaults:
0
for numbers,"missing"
for strings,False
for booleans.
* Avoids hardcoding column names.June 7, 2025 at 1:57 pm #6499lochan2014
KeymasterAbsolutely! Here’s a crisp, sharp snippet-style answer for each of the commonly asked questions:
—
### πΉ **How do you handle corrupt records while reading JSON/CSV?**
# For JSON
df = spark.read.option(“badRecordsPath”, “s3://path/to/badRecords”).json(“s3://path/to/json”)# For CSV
df = spark.read.option(“mode”, “PERMISSIVE”) \
.option(“columnNameOfCorruptRecord”, “_corrupt_record”) \
.csv(“s3://path/to/csv”)# Options: PERMISSIVE (default), DROPMALFORMED, FAILFAST
### πΉ **How do you handle duplicate rows in a DataFrame?**
# Remove exact duplicates
df = df.dropDuplicates()# Remove duplicates based on specific columns
df = df.dropDuplicates([“col1”, “col2”])
`
—
### πΉ **How to handle nulls before aggregation?**
# Replace nulls before groupBy
df = df.fillna(0) # or df.fillna({“col1”: 0, “col2”: “unknown”})# Then safely aggregate
df.groupBy(“group_col”).agg({“metric_col”: “sum”})
`
—
### πΉ **How do you read only specific columns from a Parquet file?**
df = spark.read.parquet(“s3://path/to/data”).select(“col1”, “col2”)
`
—
### πΉ **How do you optimize wide transformations like joins and groupBy?**
# Use broadcast join if one side is small
from pyspark.sql.functions import broadcast
df = large_df.join(broadcast(small_df), “key”)# Repartition on group key to reduce shuffle pressure
df = df.repartition(“group_key”)# Cache intermediate results if reused
df.cache()
`
—
### πΉ **How do you write partitioned Parquet files with overwrite behavior?**
df.write.mode(“overwrite”) \
.partitionBy(“year”, “month”) \
.parquet(“s3://path/output/”)
`
—
### πΉ **How do you check for skew in Spark join keys?**
df.groupBy(“join_key”).count().orderBy(“count”, ascending=False).show(10)
`
—
Would you like a few more on file formats, cache vs persist, or checkpointing?
June 7, 2025 at 1:58 pm #6500lochan2014
Keymasterπ― #6: Flatten Nested JSON to Tabular Format
from pyspark.sql.functions import col
df = spark.read.json(“path/to/nested.json”)
# Flatten all nested structs
flattened_df = df.selectExpr(“id”, “details.name”, “details.address.city”, “details.address.zip”)π― #7: SCD Type 2 in PySpark
from delta.tables import DeltaTable
from pyspark.sql.functions import current_date, lit# Existing Delta table
target = DeltaTable.forPath(spark, “s3://path/delta_table”)# New data
incoming_df = spark.read.parquet(“s3://path/incoming”)# Merge logic
target.alias(“t”).merge(
incoming_df.alias(“s”),
“t.id = s.id AND t.current_flag = true AND (t.attr1 != s.attr1 OR t.attr2 != s.attr2)”
).whenMatchedUpdate(set={
“current_flag”: lit(False),
“end_date”: current_date()
}).whenNotMatchedInsert(values={
“id”: “s.id”,
“attr1”: “s.attr1”,
“attr2”: “s.attr2”,
“start_date”: current_date(),
“end_date”: lit(None),
“current_flag”: lit(True)
}).execute()π― #8: Parse Kafka JSON Messages
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringTypeschema = StructType().add(“user_id”, StringType()).add(“action”, StringType())
df = spark.readStream \
.format(“kafka”) \
.option(“subscribe”, “topic-name”) \
.load()json_df = df.select(from_json(col(“value”).cast(“string”), schema).alias(“data”)).select(“data.*”)
π― #9: Incremental Load from MySQL to Delta
# Step 1: Load latest watermark
last_ts = spark.read.format(“delta”).table(“delta_table”).agg({“updated_at”: “max”}).collect()[0][0]# Step 2: Pull only new rows
jdbc_df = spark.read \
.format(“jdbc”) \
.option(“url”, “jdbc:mysql://…”) \
.option(“query”, f”SELECT * FROM table WHERE updated_at > ‘{last_ts}'”) \
.load()# Step 3: Upsert into Delta
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, “s3://delta_table”)delta_table.alias(“t”).merge(
jdbc_df.alias(“s”),
“t.id = s.id”
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()π― #10: cache() vs persist()
df.cache() # Uses MEMORY_AND_DISK by default
df.persist() # Customize storage level, e.g., MEMORY_ONLY, DISK_ONLY# Use
cache()
when memory suffices and the data is reused multiple times.
# Usepersist()
when memory may not be enough or if you want control.π― #11: Optimize Skewed groupBy
# Add random salt to reduce skew
from pyspark.sql.functions import exprdf = df.withColumn(“salted_key”, expr(“concat(group_key, ‘_’, floor(rand()*10))”))
df.groupBy(“salted_key”).agg(…) # Later remove salt if needed
π― #12: Unit Tests in PySpark
import unittest
from pyspark.sql import SparkSessionclass MyTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.spark = SparkSession.builder.master(“local[1]”).appName(“test”).getOrCreate()def test_transformation(self):
df = self.spark.createDataFrame([(1, “a”), (2, “b”)], [“id”, “val”])
result = df.filter(“id = 1”).collect()
self.assertEqual(result[0][“val”], “a”)π― #13: Row Number by Partition
from pyspark.sql.window import Window
from pyspark.sql.functions import row_numberwindow_spec = Window.partitionBy(“user_id”).orderBy(“event_time”)
df = df.withColumn(“row_num”, row_number().over(window_spec))
π― #14: MERGE INTO Delta for New Records
from delta.tables import DeltaTable
delta = DeltaTable.forPath(spark, “s3://delta_table”)
new_data = spark.read.parquet(“s3://incoming”)delta.alias(“t”).merge(
new_data.alias(“s”),
“t.id = s.id”
).whenNotMatchedInsertAll().execute()π― #15: Debug & Optimize Slow Spark Jobs
# π Steps:
# 1. Use Spark UI β check stages, skew, shuffle size.
# 2. Cache/persist reused DataFrames.
# 3. Avoid UDFs unless necessary.
# 4. Repartition wisely.
# 5. Tune configs:
spark.conf.set(“spark.sql.shuffle.partitions”, 200)
spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, 50*1024*1024)π― #16: CSV β Partitioned Parquet by Date
df = spark.read.option(“header”, True).csv(“s3://path/data.csv”)
df.write.partitionBy(“date_col”) \
.mode(“overwrite”) \
.parquet(“s3://path/output/”)π― #17: Broadcast Join + Caveats
from pyspark.sql.functions import broadcast
df = big_df.join(broadcast(small_df), “id”)
# β οΈ Caveat: Ensure small_df fits in executor memory (~<10MB-50MB)
π― #18: Kafka Streaming with Exactly-Once
stream_df = spark.readStream.format("kafka")... # your stream config
# Ensure checkpointing + idempotent sink (like Delta)
stream_df.writeStream \
.format("delta") \
.option("checkpointLocation", "s3://chk/") \
.outputMode("append") \
.start("s3://delta/output")π― #19: Daily Activity Flag
from pyspark.sql.functions import explode, sequence, lit, col, to_date
# List of active dates per user
df = df.withColumn("date", to_date("date"))# Generate full month range
full_dates = spark.range(1).select(explode(sequence(lit("2024-01-01"), lit("2024-01-31"))).alias("date"))# Cross join users with date range
users = df.select("user_id").distinct()
calendar = users.crossJoin(full_dates)# Left join and flag activity
result = calendar.join(df, ["user_id", "date"], "left") \
.withColumn("active_flag", (col("df.date").isNotNull()).cast("int"))π― #20: Script Fails on Cluster but Not Locally β Possible Causes
1. Missing files/resources on cluster (e.g., JARs, dependencies).
2. Schema inference issues (e.g., inferSchema=True works locally but fails on cluster scale).
3. Cluster memory limits (OOM).
4. Code depends on local paths not accessible in distributed mode.
5. Local mode uses single JVM β cluster uses distributed shuffle, causing real execution paths.
6. Version mismatch (e.g., Spark/Python/PyArrow).
7. Missing permissions on HDFS/S3. - AuthorPosts
- You must be logged in to reply to this topic.