HintsToday

Hints and Answers for Everything

- HintsToday

Reply To: SET 1

#6494
lochan2014
Keymaster

    from pyspark.sql import SparkSession
    from pyspark.sql.utils import AnalysisException

    spark = SparkSession.builder \
    .appName(“SchemaMismatchHandler”) \
    .getOrCreate()

    from pyspark.sql.functions import lit

    # Example file paths
    file_paths = [“s3://bucket/data1.json”, “s3://bucket/data2.json”, “s3://bucket/data3.json”]

    # Step 1: Read all files separately and collect schemas
    dfs = []
    all_fields = set()

    for path in file_paths:
    try:
    df = spark.read.json(path)
    dfs.append(df)
    all_fields.update(df.schema.fieldNames())
    except AnalysisException as e:
    print(f”Failed to read {path}: {e}”)

    # Step 2: Align all DataFrames to a common schema
    def align_schema(df, all_columns):
    for col in all_columns:
    if col not in df.columns:
    df = df.withColumn(col, lit(None))
    return df.select(sorted(all_columns)) # Sort for consistent column order

    # Step 3: Apply schema alignment and union all
    aligned_dfs = [align_schema(df, all_fields) for df in dfs]
    final_df = aligned_dfs[0]
    for df in aligned_dfs[1:]:
    final_df = final_df.unionByName(df)

    # final_df is now a unified DataFrame with aligned schema
    final_df.show()
    `

    This handles schema mismatches by:

    * Reading each file independently.
    * Building a union of all column names.
    * Padding missing columns with null.
    * Ensuring consistent column order before unionByName.

    Subscribe