lochan2014

Forum Replies Created

Viewing 7 posts - 1 through 7 (of 7 total)
  • Author
    Posts
  • in reply to: SET 1 #6500
    lochan2014
    Keymaster

    🎯 #6: Flatten Nested JSON to Tabular Format

    from pyspark.sql.functions import col

    df = spark.read.json(“path/to/nested.json”)

    # Flatten all nested structs
    flattened_df = df.selectExpr(“id”, “details.name”, “details.address.city”, “details.address.zip”)

    🎯 #7: SCD Type 2 in PySpark

    from delta.tables import DeltaTable
    from pyspark.sql.functions import current_date, lit

    # Existing Delta table
    target = DeltaTable.forPath(spark, “s3://path/delta_table”)

    # New data
    incoming_df = spark.read.parquet(“s3://path/incoming”)

    # Merge logic
    target.alias(“t”).merge(
    incoming_df.alias(“s”),
    “t.id = s.id AND t.current_flag = true AND (t.attr1 != s.attr1 OR t.attr2 != s.attr2)”
    ).whenMatchedUpdate(set={
    “current_flag”: lit(False),
    “end_date”: current_date()
    }).whenNotMatchedInsert(values={
    “id”: “s.id”,
    “attr1”: “s.attr1”,
    “attr2”: “s.attr2”,
    “start_date”: current_date(),
    “end_date”: lit(None),
    “current_flag”: lit(True)
    }).execute()

    🎯 #8: Parse Kafka JSON Messages

    from pyspark.sql.functions import from_json, col
    from pyspark.sql.types import StructType, StringType

    schema = StructType().add(“user_id”, StringType()).add(“action”, StringType())

    df = spark.readStream \
    .format(“kafka”) \
    .option(“subscribe”, “topic-name”) \
    .load()

    json_df = df.select(from_json(col(“value”).cast(“string”), schema).alias(“data”)).select(“data.*”)

    🎯 #9: Incremental Load from MySQL to Delta

    # Step 1: Load latest watermark
    last_ts = spark.read.format(“delta”).table(“delta_table”).agg({“updated_at”: “max”}).collect()[0][0]

    # Step 2: Pull only new rows
    jdbc_df = spark.read \
    .format(“jdbc”) \
    .option(“url”, “jdbc:mysql://…”) \
    .option(“query”, f”SELECT * FROM table WHERE updated_at > ‘{last_ts}'”) \
    .load()

    # Step 3: Upsert into Delta
    from delta.tables import DeltaTable
    delta_table = DeltaTable.forPath(spark, “s3://delta_table”)

    delta_table.alias(“t”).merge(
    jdbc_df.alias(“s”),
    “t.id = s.id”
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

    🎯 #10: cache() vs persist()

    df.cache() # Uses MEMORY_AND_DISK by default
    df.persist() # Customize storage level, e.g., MEMORY_ONLY, DISK_ONLY

    # Use cache() when memory suffices and the data is reused multiple times.
    # Use persist() when memory may not be enough or if you want control.

    🎯 #11: Optimize Skewed groupBy

    # Add random salt to reduce skew
    from pyspark.sql.functions import expr

    df = df.withColumn(“salted_key”, expr(“concat(group_key, ‘_’, floor(rand()*10))”))

    df.groupBy(“salted_key”).agg(…) # Later remove salt if needed

    🎯 #12: Unit Tests in PySpark

    import unittest
    from pyspark.sql import SparkSession

    class MyTest(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
    cls.spark = SparkSession.builder.master(“local[1]”).appName(“test”).getOrCreate()

    def test_transformation(self):
    df = self.spark.createDataFrame([(1, “a”), (2, “b”)], [“id”, “val”])
    result = df.filter(“id = 1”).collect()
    self.assertEqual(result[0][“val”], “a”)

    🎯 #13: Row Number by Partition

    from pyspark.sql.window import Window
    from pyspark.sql.functions import row_number

    window_spec = Window.partitionBy(“user_id”).orderBy(“event_time”)

    df = df.withColumn(“row_num”, row_number().over(window_spec))

    🎯 #14: MERGE INTO Delta for New Records

    from delta.tables import DeltaTable

    delta = DeltaTable.forPath(spark, “s3://delta_table”)
    new_data = spark.read.parquet(“s3://incoming”)

    delta.alias(“t”).merge(
    new_data.alias(“s”),
    “t.id = s.id”
    ).whenNotMatchedInsertAll().execute()

    🎯 #15: Debug & Optimize Slow Spark Jobs

    # πŸ” Steps:
    # 1. Use Spark UI β†’ check stages, skew, shuffle size.
    # 2. Cache/persist reused DataFrames.
    # 3. Avoid UDFs unless necessary.
    # 4. Repartition wisely.
    # 5. Tune configs:
    spark.conf.set(“spark.sql.shuffle.partitions”, 200)
    spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, 50*1024*1024)

    🎯 #16: CSV β†’ Partitioned Parquet by Date

    df = spark.read.option(“header”, True).csv(“s3://path/data.csv”)

    df.write.partitionBy(“date_col”) \
    .mode(“overwrite”) \
    .parquet(“s3://path/output/”)

    🎯 #17: Broadcast Join + Caveats

    from pyspark.sql.functions import broadcast

    df = big_df.join(broadcast(small_df), “id”)

    # ⚠️ Caveat: Ensure small_df fits in executor memory (~<10MB-50MB)

    🎯 #18: Kafka Streaming with Exactly-Once

    stream_df = spark.readStream.format("kafka")... # your stream config

    # Ensure checkpointing + idempotent sink (like Delta)
    stream_df.writeStream \
    .format("delta") \
    .option("checkpointLocation", "s3://chk/") \
    .outputMode("append") \
    .start("s3://delta/output")

    🎯 #19: Daily Activity Flag

    from pyspark.sql.functions import explode, sequence, lit, col, to_date

    # List of active dates per user
    df = df.withColumn("date", to_date("date"))

    # Generate full month range
    full_dates = spark.range(1).select(explode(sequence(lit("2024-01-01"), lit("2024-01-31"))).alias("date"))

    # Cross join users with date range
    users = df.select("user_id").distinct()
    calendar = users.crossJoin(full_dates)

    # Left join and flag activity
    result = calendar.join(df, ["user_id", "date"], "left") \
    .withColumn("active_flag", (col("df.date").isNotNull()).cast("int"))

    🎯 #20: Script Fails on Cluster but Not Locally – Possible Causes

    1. Missing files/resources on cluster (e.g., JARs, dependencies).
    2. Schema inference issues (e.g., inferSchema=True works locally but fails on cluster scale).
    3. Cluster memory limits (OOM).
    4. Code depends on local paths not accessible in distributed mode.
    5. Local mode uses single JVM – cluster uses distributed shuffle, causing real execution paths.
    6. Version mismatch (e.g., Spark/Python/PyArrow).
    7. Missing permissions on HDFS/S3.

    in reply to: SET 1 #6499
    lochan2014
    Keymaster

    Absolutely! Here’s a crisp, sharp snippet-style answer for each of the commonly asked questions:

    ### πŸ”Ή **How do you handle corrupt records while reading JSON/CSV?**

    # For JSON
    df = spark.read.option(“badRecordsPath”, “s3://path/to/badRecords”).json(“s3://path/to/json”)

    # For CSV
    df = spark.read.option(“mode”, “PERMISSIVE”) \
    .option(“columnNameOfCorruptRecord”, “_corrupt_record”) \
    .csv(“s3://path/to/csv”)

    # Options: PERMISSIVE (default), DROPMALFORMED, FAILFAST

    ### πŸ”Ή **How do you handle duplicate rows in a DataFrame?**

    # Remove exact duplicates
    df = df.dropDuplicates()

    # Remove duplicates based on specific columns
    df = df.dropDuplicates([“col1”, “col2”])
    `

    ### πŸ”Ή **How to handle nulls before aggregation?**

    # Replace nulls before groupBy
    df = df.fillna(0) # or df.fillna({“col1”: 0, “col2”: “unknown”})

    # Then safely aggregate
    df.groupBy(“group_col”).agg({“metric_col”: “sum”})
    `

    ### πŸ”Ή **How do you read only specific columns from a Parquet file?**

    df = spark.read.parquet(“s3://path/to/data”).select(“col1”, “col2”)
    `

    ### πŸ”Ή **How do you optimize wide transformations like joins and groupBy?**

    # Use broadcast join if one side is small
    from pyspark.sql.functions import broadcast
    df = large_df.join(broadcast(small_df), “key”)

    # Repartition on group key to reduce shuffle pressure
    df = df.repartition(“group_key”)

    # Cache intermediate results if reused
    df.cache()
    `

    ### πŸ”Ή **How do you write partitioned Parquet files with overwrite behavior?**

    df.write.mode(“overwrite”) \
    .partitionBy(“year”, “month”) \
    .parquet(“s3://path/output/”)
    `

    ### πŸ”Ή **How do you check for skew in Spark join keys?**

    df.groupBy(“join_key”).count().orderBy(“count”, ascending=False).show(10)
    `

    Would you like a few more on file formats, cache vs persist, or checkpointing?

    in reply to: SET 1 #6497
    lochan2014
    Keymaster

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, lit, when

    spark = SparkSession.builder.appName(“NullHandling”).getOrCreate()

    # Step 1: Read CSV with header and infer schema
    df = spark.read.option(“header”, True).option(“inferSchema”, True).csv(“path/to/file.csv”)

    # Step 2: Dynamically replace nulls based on column data types
    for column, dtype in df.dtypes:
    if dtype in [“int”, “bigint”, “double”, “float”]:
    df = df.withColumn(column, when(col(column).isNull(), lit(0)).otherwise(col(column)))
    elif dtype == “string”:
    df = df.withColumn(column, when(col(column).isNull(), lit(“missing”)).otherwise(col(column)))
    elif dtype == “boolean”:
    df = df.withColumn(column, when(col(column).isNull(), lit(False)).otherwise(col(column)))
    # Add other type-specific defaults as needed

    # Final cleaned DataFrame
    df.show()
    `

    ### βœ… Summary:

    * Automatically detects column types.
    * Fills nulls with type-appropriate defaults:
    0 for numbers, "missing" for strings, False for booleans.
    * Avoids hardcoding column names.

    in reply to: SET 1 #6496
    lochan2014
    Keymaster

    # REASON: Join operations can cause executor Out Of Memory (OOM) errors when:
    # – One side of the join (especially in a shuffle join) is too large to fit in executor memory.
    # – Skewed keys cause data to pile up in specific partitions.
    # – Spark performs a wide transformation and shuffles too much data.

    # HOW TO AVOID OOM IN JOINS:

    # 1. Use Broadcast Join (if one side is small enough)
    from pyspark.sql.functions import broadcast

    result = large_df.join(broadcast(small_df), “join_key”)

    # 2. Repartition by join key to evenly distribute data
    df1 = df1.repartition(“join_key”)
    df2 = df2.repartition(“join_key”)

    # 3. Filter unnecessary columns before join to reduce memory footprint
    df1_trimmed = df1.select(“join_key”, “needed_col1”)
    df2_trimmed = df2.select(“join_key”, “needed_col2”)

    # 4. Handle skew by salting
    from pyspark.sql.functions import rand, concat_ws

    # Salt one side of the skewed join
    df1 = df1.withColumn(“salted_key”, concat_ws(“_”, df1[“join_key”], (rand() * 10).cast(“int”)))
    df2 = df2.withColumn(“salted_key”, concat_ws(“_”, df2[“join_key”], (rand() * 10).cast(“int”)))

    # Then join on salted_key

    # 5. Use spark.sql.autoBroadcastJoinThreshold wisely
    spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, 50 * 1024 * 1024) # 50MB

    # 6. Monitor partition sizes and optimize memory settings
    # Avoid shuffling massive partitions; tune spark.sql.shuffle.partitions

    # 7. Cache intermediate DataFrames if reused
    df1.cache()
    `

    **Summary:**
    OOM during joins typically occurs due to shuffling large or skewed data. Mitigate it using broadcast joins, repartitioning, filtering columns, salting for skew, tuning configs, and caching when needed.

    in reply to: SET 1 #6495
    lochan2014
    Keymaster

    `python
    # Write DataFrame to Parquet while controlling file size

    # Step 1: Estimate desired file size per file (in bytes)
    # For example, target ~128MB per file
    target_file_size = 128 * 1024 * 1024 # 128 MB

    # Step 2: Use df.rdd.getNumPartitions() and data size to tune partitions
    # Alternatively, use coalesce/repartition if size is known or can be estimated
    # Suppose you want 10 files around 128MB each
    df = df.repartition(10)

    # Step 3: Write with compression to reduce actual storage size
    df.write \
    .mode(“overwrite”) \
    .option(“compression”, “snappy”) \
    .parquet(“s3://your-bucket/target-path/”)

    # Optional: Use maxRecordsPerFile to cap records per file if you know average row size
    df.write \
    .mode(“overwrite”) \
    .option(“compression”, “snappy”) \
    .option(“maxRecordsPerFile”, 1_000_000) \
    .parquet(“s3://your-bucket/target-path/”)
    `

    **Key options used:**

    * .repartition(n) to control the number of output files.
    * .option("maxRecordsPerFile", n) to cap the number of rows per file.
    * Compression (snappy) to ensure consistent file size and performance.

    in reply to: SET 1 #6494
    lochan2014
    Keymaster

    from pyspark.sql import SparkSession
    from pyspark.sql.utils import AnalysisException

    spark = SparkSession.builder \
    .appName(“SchemaMismatchHandler”) \
    .getOrCreate()

    from pyspark.sql.functions import lit

    # Example file paths
    file_paths = [“s3://bucket/data1.json”, “s3://bucket/data2.json”, “s3://bucket/data3.json”]

    # Step 1: Read all files separately and collect schemas
    dfs = []
    all_fields = set()

    for path in file_paths:
    try:
    df = spark.read.json(path)
    dfs.append(df)
    all_fields.update(df.schema.fieldNames())
    except AnalysisException as e:
    print(f”Failed to read {path}: {e}”)

    # Step 2: Align all DataFrames to a common schema
    def align_schema(df, all_columns):
    for col in all_columns:
    if col not in df.columns:
    df = df.withColumn(col, lit(None))
    return df.select(sorted(all_columns)) # Sort for consistent column order

    # Step 3: Apply schema alignment and union all
    aligned_dfs = [align_schema(df, all_fields) for df in dfs]
    final_df = aligned_dfs[0]
    for df in aligned_dfs[1:]:
    final_df = final_df.unionByName(df)

    # final_df is now a unified DataFrame with aligned schema
    final_df.show()
    `

    This handles schema mismatches by:

    * Reading each file independently.
    * Building a union of all column names.
    * Padding missing columns with null.
    * Ensuring consistent column order before unionByName.

    in reply to: SET 1 #6493
    lochan2014
    Keymaster

    To **optimize reading only specific partitions** using spark.read.parquet(), you can leverage **partition pruning**, which allows Spark to skip reading unnecessary files and only load relevant partitions.

    ### βœ… Here’s how:

    ### πŸ’‘ **Assume your dataset is partitioned like this:**

    /data/events/year=2023/month=01/
    /month=02/
    /year=2024/month=01/
    `

    ### βœ… Option 1: **Partition Pruning via Filter**

    If the Parquet data is partitioned by year and month, Spark will **automatically prune partitions** when you filter on those columns **after reading**:

    df = spark.read.parquet(“/data/events”)
    df_filtered = df.filter((df.year == 2023) & (df.month == 1))
    `

    ➑️ Spark reads **only the matching folders** using directory structure without scanning full data.

    ### βœ… Option 2: **Partition Directory Path Filtering (Manual)**

    You can also directly specify the partition path to **only read that portion** of the dataset:

    df = spark.read.parquet(“/data/events/year=2023/month=01”)
    `

    ➑️ This skips the full directory scan and reads only data from specified partition paths.

    ### βœ… Option 3: **Passing multiple paths**

    If you want to read a few selected partitions:

    df = spark.read.parquet(
    “/data/events/year=2023/month=01”,
    “/data/events/year=2024/month=01”
    )
    `

    ➑️ Only the specified paths are readβ€”efficient when you know exact partitions to load.

    ### πŸ› οΈ Best Practices

    * **Use filter on partition columns early** (for lazy pruning).
    * Always **partition your data** based on query patterns (e.g., time-based like year/month/day).
    * Avoid filtering on non-partitioned columns if performance is a concern during reads.
    * **Don’t cast partition columns**, as it can disable pruning (e.g., df.filter(df.year.cast("string") == "2023") may break pruning).

    ### πŸ” Check if pruning is applied:

    You can verify partition pruning by enabling physical plan explain:

    df_filtered.explain(True)
    `

    Look for PushedFilters: and ensure only necessary partitions are being accessed.

Viewing 7 posts - 1 through 7 (of 7 total)