Forum Replies Created
- AuthorPosts
lochan2014
Keymasterπ― #6: Flatten Nested JSON to Tabular Format
from pyspark.sql.functions import col
df = spark.read.json(“path/to/nested.json”)
# Flatten all nested structs
flattened_df = df.selectExpr(“id”, “details.name”, “details.address.city”, “details.address.zip”)π― #7: SCD Type 2 in PySpark
from delta.tables import DeltaTable
from pyspark.sql.functions import current_date, lit# Existing Delta table
target = DeltaTable.forPath(spark, “s3://path/delta_table”)# New data
incoming_df = spark.read.parquet(“s3://path/incoming”)# Merge logic
target.alias(“t”).merge(
incoming_df.alias(“s”),
“t.id = s.id AND t.current_flag = true AND (t.attr1 != s.attr1 OR t.attr2 != s.attr2)”
).whenMatchedUpdate(set={
“current_flag”: lit(False),
“end_date”: current_date()
}).whenNotMatchedInsert(values={
“id”: “s.id”,
“attr1”: “s.attr1”,
“attr2”: “s.attr2”,
“start_date”: current_date(),
“end_date”: lit(None),
“current_flag”: lit(True)
}).execute()π― #8: Parse Kafka JSON Messages
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringTypeschema = StructType().add(“user_id”, StringType()).add(“action”, StringType())
df = spark.readStream \
.format(“kafka”) \
.option(“subscribe”, “topic-name”) \
.load()json_df = df.select(from_json(col(“value”).cast(“string”), schema).alias(“data”)).select(“data.*”)
π― #9: Incremental Load from MySQL to Delta
# Step 1: Load latest watermark
last_ts = spark.read.format(“delta”).table(“delta_table”).agg({“updated_at”: “max”}).collect()[0][0]# Step 2: Pull only new rows
jdbc_df = spark.read \
.format(“jdbc”) \
.option(“url”, “jdbc:mysql://…”) \
.option(“query”, f”SELECT * FROM table WHERE updated_at > ‘{last_ts}'”) \
.load()# Step 3: Upsert into Delta
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, “s3://delta_table”)delta_table.alias(“t”).merge(
jdbc_df.alias(“s”),
“t.id = s.id”
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()π― #10: cache() vs persist()
df.cache() # Uses MEMORY_AND_DISK by default
df.persist() # Customize storage level, e.g., MEMORY_ONLY, DISK_ONLY# Use
cache()
when memory suffices and the data is reused multiple times.
# Usepersist()
when memory may not be enough or if you want control.π― #11: Optimize Skewed groupBy
# Add random salt to reduce skew
from pyspark.sql.functions import exprdf = df.withColumn(“salted_key”, expr(“concat(group_key, ‘_’, floor(rand()*10))”))
df.groupBy(“salted_key”).agg(…) # Later remove salt if needed
π― #12: Unit Tests in PySpark
import unittest
from pyspark.sql import SparkSessionclass MyTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.spark = SparkSession.builder.master(“local[1]”).appName(“test”).getOrCreate()def test_transformation(self):
df = self.spark.createDataFrame([(1, “a”), (2, “b”)], [“id”, “val”])
result = df.filter(“id = 1”).collect()
self.assertEqual(result[0][“val”], “a”)π― #13: Row Number by Partition
from pyspark.sql.window import Window
from pyspark.sql.functions import row_numberwindow_spec = Window.partitionBy(“user_id”).orderBy(“event_time”)
df = df.withColumn(“row_num”, row_number().over(window_spec))
π― #14: MERGE INTO Delta for New Records
from delta.tables import DeltaTable
delta = DeltaTable.forPath(spark, “s3://delta_table”)
new_data = spark.read.parquet(“s3://incoming”)delta.alias(“t”).merge(
new_data.alias(“s”),
“t.id = s.id”
).whenNotMatchedInsertAll().execute()π― #15: Debug & Optimize Slow Spark Jobs
# π Steps:
# 1. Use Spark UI β check stages, skew, shuffle size.
# 2. Cache/persist reused DataFrames.
# 3. Avoid UDFs unless necessary.
# 4. Repartition wisely.
# 5. Tune configs:
spark.conf.set(“spark.sql.shuffle.partitions”, 200)
spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, 50*1024*1024)π― #16: CSV β Partitioned Parquet by Date
df = spark.read.option(“header”, True).csv(“s3://path/data.csv”)
df.write.partitionBy(“date_col”) \
.mode(“overwrite”) \
.parquet(“s3://path/output/”)π― #17: Broadcast Join + Caveats
from pyspark.sql.functions import broadcast
df = big_df.join(broadcast(small_df), “id”)
# β οΈ Caveat: Ensure small_df fits in executor memory (~<10MB-50MB)
π― #18: Kafka Streaming with Exactly-Once
stream_df = spark.readStream.format("kafka")... # your stream config
# Ensure checkpointing + idempotent sink (like Delta)
stream_df.writeStream \
.format("delta") \
.option("checkpointLocation", "s3://chk/") \
.outputMode("append") \
.start("s3://delta/output")π― #19: Daily Activity Flag
from pyspark.sql.functions import explode, sequence, lit, col, to_date
# List of active dates per user
df = df.withColumn("date", to_date("date"))# Generate full month range
full_dates = spark.range(1).select(explode(sequence(lit("2024-01-01"), lit("2024-01-31"))).alias("date"))# Cross join users with date range
users = df.select("user_id").distinct()
calendar = users.crossJoin(full_dates)# Left join and flag activity
result = calendar.join(df, ["user_id", "date"], "left") \
.withColumn("active_flag", (col("df.date").isNotNull()).cast("int"))π― #20: Script Fails on Cluster but Not Locally β Possible Causes
1. Missing files/resources on cluster (e.g., JARs, dependencies).
2. Schema inference issues (e.g., inferSchema=True works locally but fails on cluster scale).
3. Cluster memory limits (OOM).
4. Code depends on local paths not accessible in distributed mode.
5. Local mode uses single JVM β cluster uses distributed shuffle, causing real execution paths.
6. Version mismatch (e.g., Spark/Python/PyArrow).
7. Missing permissions on HDFS/S3.lochan2014
KeymasterAbsolutely! Here’s a crisp, sharp snippet-style answer for each of the commonly asked questions:
—
### πΉ **How do you handle corrupt records while reading JSON/CSV?**
# For JSON
df = spark.read.option(“badRecordsPath”, “s3://path/to/badRecords”).json(“s3://path/to/json”)# For CSV
df = spark.read.option(“mode”, “PERMISSIVE”) \
.option(“columnNameOfCorruptRecord”, “_corrupt_record”) \
.csv(“s3://path/to/csv”)# Options: PERMISSIVE (default), DROPMALFORMED, FAILFAST
### πΉ **How do you handle duplicate rows in a DataFrame?**
# Remove exact duplicates
df = df.dropDuplicates()# Remove duplicates based on specific columns
df = df.dropDuplicates([“col1”, “col2”])
`
—
### πΉ **How to handle nulls before aggregation?**
# Replace nulls before groupBy
df = df.fillna(0) # or df.fillna({“col1”: 0, “col2”: “unknown”})# Then safely aggregate
df.groupBy(“group_col”).agg({“metric_col”: “sum”})
`
—
### πΉ **How do you read only specific columns from a Parquet file?**
df = spark.read.parquet(“s3://path/to/data”).select(“col1”, “col2”)
`
—
### πΉ **How do you optimize wide transformations like joins and groupBy?**
# Use broadcast join if one side is small
from pyspark.sql.functions import broadcast
df = large_df.join(broadcast(small_df), “key”)# Repartition on group key to reduce shuffle pressure
df = df.repartition(“group_key”)# Cache intermediate results if reused
df.cache()
`
—
### πΉ **How do you write partitioned Parquet files with overwrite behavior?**
df.write.mode(“overwrite”) \
.partitionBy(“year”, “month”) \
.parquet(“s3://path/output/”)
`
—
### πΉ **How do you check for skew in Spark join keys?**
df.groupBy(“join_key”).count().orderBy(“count”, ascending=False).show(10)
`
—
Would you like a few more on file formats, cache vs persist, or checkpointing?
lochan2014
Keymasterfrom pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, whenspark = SparkSession.builder.appName(“NullHandling”).getOrCreate()
# Step 1: Read CSV with header and infer schema
df = spark.read.option(“header”, True).option(“inferSchema”, True).csv(“path/to/file.csv”)# Step 2: Dynamically replace nulls based on column data types
for column, dtype in df.dtypes:
if dtype in [“int”, “bigint”, “double”, “float”]:
df = df.withColumn(column, when(col(column).isNull(), lit(0)).otherwise(col(column)))
elif dtype == “string”:
df = df.withColumn(column, when(col(column).isNull(), lit(“missing”)).otherwise(col(column)))
elif dtype == “boolean”:
df = df.withColumn(column, when(col(column).isNull(), lit(False)).otherwise(col(column)))
# Add other type-specific defaults as needed# Final cleaned DataFrame
df.show()
`
### β Summary:
* Automatically detects column types.
* Fills nulls with type-appropriate defaults:
0
for numbers,"missing"
for strings,False
for booleans.
* Avoids hardcoding column names.lochan2014
Keymaster# REASON: Join operations can cause executor Out Of Memory (OOM) errors when:
# – One side of the join (especially in a shuffle join) is too large to fit in executor memory.
# – Skewed keys cause data to pile up in specific partitions.
# – Spark performs a wide transformation and shuffles too much data.# HOW TO AVOID OOM IN JOINS:
# 1. Use Broadcast Join (if one side is small enough)
from pyspark.sql.functions import broadcastresult = large_df.join(broadcast(small_df), “join_key”)
# 2. Repartition by join key to evenly distribute data
df1 = df1.repartition(“join_key”)
df2 = df2.repartition(“join_key”)# 3. Filter unnecessary columns before join to reduce memory footprint
df1_trimmed = df1.select(“join_key”, “needed_col1”)
df2_trimmed = df2.select(“join_key”, “needed_col2”)# 4. Handle skew by salting
from pyspark.sql.functions import rand, concat_ws# Salt one side of the skewed join
df1 = df1.withColumn(“salted_key”, concat_ws(“_”, df1[“join_key”], (rand() * 10).cast(“int”)))
df2 = df2.withColumn(“salted_key”, concat_ws(“_”, df2[“join_key”], (rand() * 10).cast(“int”)))# Then join on salted_key
# 5. Use
spark.sql.autoBroadcastJoinThreshold
wisely
spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, 50 * 1024 * 1024) # 50MB# 6. Monitor partition sizes and optimize memory settings
# Avoid shuffling massive partitions; tunespark.sql.shuffle.partitions
# 7. Cache intermediate DataFrames if reused
df1.cache()
`
**Summary:**
OOM during joins typically occurs due to shuffling large or skewed data. Mitigate it using broadcast joins, repartitioning, filtering columns, salting for skew, tuning configs, and caching when needed.lochan2014
Keymaster`python
# Write DataFrame to Parquet while controlling file size# Step 1: Estimate desired file size per file (in bytes)
# For example, target ~128MB per file
target_file_size = 128 * 1024 * 1024 # 128 MB# Step 2: Use
df.rdd.getNumPartitions()
and data size to tune partitions
# Alternatively, use coalesce/repartition if size is known or can be estimated
# Suppose you want 10 files around 128MB each
df = df.repartition(10)# Step 3: Write with compression to reduce actual storage size
df.write \
.mode(“overwrite”) \
.option(“compression”, “snappy”) \
.parquet(“s3://your-bucket/target-path/”)# Optional: Use
maxRecordsPerFile
to cap records per file if you know average row size
df.write \
.mode(“overwrite”) \
.option(“compression”, “snappy”) \
.option(“maxRecordsPerFile”, 1_000_000) \
.parquet(“s3://your-bucket/target-path/”)
`
**Key options used:**
*
.repartition(n)
to control the number of output files.
*.option("maxRecordsPerFile", n)
to cap the number of rows per file.
* Compression (snappy
) to ensure consistent file size and performance.lochan2014
Keymasterfrom pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisExceptionspark = SparkSession.builder \
.appName(“SchemaMismatchHandler”) \
.getOrCreate()from pyspark.sql.functions import lit
# Example file paths
file_paths = [“s3://bucket/data1.json”, “s3://bucket/data2.json”, “s3://bucket/data3.json”]# Step 1: Read all files separately and collect schemas
dfs = []
all_fields = set()for path in file_paths:
try:
df = spark.read.json(path)
dfs.append(df)
all_fields.update(df.schema.fieldNames())
except AnalysisException as e:
print(f”Failed to read {path}: {e}”)# Step 2: Align all DataFrames to a common schema
def align_schema(df, all_columns):
for col in all_columns:
if col not in df.columns:
df = df.withColumn(col, lit(None))
return df.select(sorted(all_columns)) # Sort for consistent column order# Step 3: Apply schema alignment and union all
aligned_dfs = [align_schema(df, all_fields) for df in dfs]
final_df = aligned_dfs[0]
for df in aligned_dfs[1:]:
final_df = final_df.unionByName(df)# final_df is now a unified DataFrame with aligned schema
final_df.show()
`
This handles schema mismatches by:
* Reading each file independently.
* Building a union of all column names.
* Padding missing columns withnull
.
* Ensuring consistent column order beforeunionByName
.lochan2014
KeymasterTo **optimize reading only specific partitions** using
spark.read.parquet()
, you can leverage **partition pruning**, which allows Spark to skip reading unnecessary files and only load relevant partitions.### β Here’s how:
—
### π‘ **Assume your dataset is partitioned like this:**
/data/events/year=2023/month=01/
/month=02/
/year=2024/month=01/
`
—
### β Option 1: **Partition Pruning via Filter**
If the Parquet data is partitioned by
year
andmonth
, Spark will **automatically prune partitions** when you filter on those columns **after reading**:df = spark.read.parquet(“/data/events”)
df_filtered = df.filter((df.year == 2023) & (df.month == 1))
`
β‘οΈ Spark reads **only the matching folders** using directory structure without scanning full data.
—
### β Option 2: **Partition Directory Path Filtering (Manual)**
You can also directly specify the partition path to **only read that portion** of the dataset:
df = spark.read.parquet(“/data/events/year=2023/month=01”)
`
β‘οΈ This skips the full directory scan and reads only data from specified partition paths.
—
### β Option 3: **Passing multiple paths**
If you want to read a few selected partitions:
df = spark.read.parquet(
“/data/events/year=2023/month=01”,
“/data/events/year=2024/month=01”
)
`
β‘οΈ Only the specified paths are readβefficient when you know exact partitions to load.
—
### π οΈ Best Practices
* **Use filter on partition columns early** (for lazy pruning).
* Always **partition your data** based on query patterns (e.g., time-based likeyear/month/day
).
* Avoid filtering on non-partitioned columns if performance is a concern during reads.
* **Donβt cast partition columns**, as it can disable pruning (e.g.,df.filter(df.year.cast("string") == "2023")
may break pruning).—
### π Check if pruning is applied:
You can verify partition pruning by enabling physical plan explain:
df_filtered.explain(True)
`
Look for
PushedFilters:
and ensure only necessary partitions are being accessed.- AuthorPosts