from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
spark = SparkSession.builder \
.appName(“SchemaMismatchHandler”) \
.getOrCreate()
from pyspark.sql.functions import lit
# Example file paths
file_paths = [“s3://bucket/data1.json”, “s3://bucket/data2.json”, “s3://bucket/data3.json”]
# Step 1: Read all files separately and collect schemas
dfs = []
all_fields = set()
for path in file_paths:
try:
df = spark.read.json(path)
dfs.append(df)
all_fields.update(df.schema.fieldNames())
except AnalysisException as e:
print(f”Failed to read {path}: {e}”)
# Step 2: Align all DataFrames to a common schema
def align_schema(df, all_columns):
for col in all_columns:
if col not in df.columns:
df = df.withColumn(col, lit(None))
return df.select(sorted(all_columns)) # Sort for consistent column order
# Step 3: Apply schema alignment and union all
aligned_dfs = [align_schema(df, all_fields) for df in dfs]
final_df = aligned_dfs[0]
for df in aligned_dfs[1:]:
final_df = final_df.unionByName(df)
# final_df is now a unified DataFrame with aligned schema
final_df.show()
`
This handles schema mismatches by:
* Reading each file independently.
* Building a union of all column names.
* Padding missing columns with null
.
* Ensuring consistent column order before unionByName
.