✅ Schema Enforcement in PySpark DataFrames

In PySpark, schema enforcement ensures that the structure (schema) of your DataFrame strictly matches the expected data types and field names during read and write operations.


🧠 What is Schema Enforcement?

Schema enforcement (also called schema-on-write) is the ability of Spark to validate incoming data against a predefined schema, and reject rows or throw errors when mismatches occur.


🔧 1. Enforcing Schema During DataFrame Read

🧪 Example:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

df = spark.read.schema(schema).json("people.json")
df.show()

✅ What happens here:

  • Spark does not infer schema
  • Spark expects the JSON to match the defined schema
  • If data has "age": "twenty" (a string), Spark sets it to null or throws an error depending on file format and options

🔍 What If You Don’t Define Schema?

df = spark.read.json("people.json")
  • Spark infers the schema
  • No enforcement → Can lead to inconsistent column types or nested ambiguity

🛠️ 2. Enforcing Schema While Creating a DataFrame from Python Objects

data = [("Alice", 30), ("Bob", 28)]
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

df = spark.createDataFrame(data, schema=schema)

If data has a wrong type (e.g., ("Alice", "thirty")), Spark will:

  • Try to cast or
  • Throw an error depending on the strictness of schema matching

💾 3. Schema Enforcement on Write (with Delta Lake or Parquet)

Delta Example:

df.write.format("delta").mode("overwrite").save("/delta/users")

Then:

# Writing different schema (e.g., extra column)
df2.write.format("delta").mode("append").save("/delta/users")

📛 If df2 has a different schema (e.g., extra column gender), this fails unless you allow schema evolution.

Enabling schema evolution:

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", True)

🔐 4. Related Spark Configurations

ConfigDescription
spark.sql.caseSensitiveField name case enforcement
spark.sql.parquet.mergeSchemaEnable schema merging in Parquet
spark.databricks.delta.schema.autoMerge.enabledAllow schema evolution on writes (Delta)

✅ Summary Table

OperationEnforcement Applies?Behavior
read.json(..., schema)✅ YesRejects/warns on mismatch
createDataFrame(data, schema)✅ YesEnforces casting and structure
write.format("delta")✅ YesRequires same schema unless auto-merge is on
write.csv() or write.parquet()✅ Yes (with metadata)Fails if schema mismatches with saved one

📘 Best Practices

  • Always define schema explicitly for production pipelines.
  • Validate schema before writing with .printSchema() or schema.json().
  • Use Delta Lake for strong schema enforcement + evolution combo.
  • Avoid relying on schema inference with complex or nested data.

✅ Read Modes in PySpark DataFrames (including Databricks)

When reading data in PySpark (whether on vanilla Spark or Databricks), read modes define how Spark handles malformed records, corrupt rows, or schema mismatches during file reading.


📘 mode Parameter in DataFrameReader

Applies to formats like:

.read.format("json" / "csv" / "parquet" / "delta")

🔧 Syntax:

df = spark.read.format("json").option("mode", "<mode>").load("path")

🧠 Available Read Modes in PySpark

ModeDescription
"PERMISSIVE" (default)Tries to parse all rows, sets corrupt or missing fields to null, and puts bad data in a special column (e.g., _corrupt_record for JSON)
"DROPMALFORMED"Silently drops rows that don’t match the schema
"FAILFAST"Throws an exception as soon as it encounters a malformed row
"IGNORE"Ignores corrupted rows completely, does not load them (used mainly for Parquet/ORC with corrupt block skipping)

🧪 Example: Reading Malformed JSON

File (bad.json)

{"name": "Alice", "age": 30}
{"name": "Bob", "age": "thirty"}   <-- Malformed
{"name": "Carol"}

Read in PERMISSIVE mode:

df = spark.read.option("mode", "PERMISSIVE").json("bad.json")
df.show()

✅ Output:

  • Parses valid rows
  • age becomes null where invalid
  • _corrupt_record column may be added

Read in FAILFAST mode:

df = spark.read.option("mode", "FAILFAST").json("bad.json")

🚨 Fails immediately on second row.


Read in DROPMALFORMED mode:

df = spark.read.option("mode", "DROPMALFORMED").json("bad.json")

✅ Drops rows that cannot be parsed.


🧱 Parquet, ORC, and Delta Specifics

FormatMode SupportedDefault Behavior
CSVAll modesPERMISSIVE
JSONAll modesPERMISSIVE
ParquetSupports IGNORE corrupt files (spark.sql.files.ignoreCorruptFiles = true)Fails
DeltaSchema must match or be evolved (Delta has stronger enforcement)Fails on mismatch unless autoMerge is on

🛠️ In Databricks

Databricks also supports these modes, plus:

  • Auto-detect schema with .option("inferSchema", "true")
  • Notebook-level configs to handle corrupt files:
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")

✅ Summary Table

ModeGood ForRisk/Note
PERMISSIVEDefault, safest, logs errorsMay include invalid rows
DROPMALFORMEDQuick cleanup pipelinesMay drop valid but slightly malformed data
FAILFASTStrict schema enforcementFails fast, best for production
IGNORECorrupt file skipping (Parquet, ORC)Doesn’t work for JSON/CSV

🔄 Bonus Tip: Audit Malformed Rows

df = spark.read.option("mode", "PERMISSIVE").json("bad.json")
df.filter("_corrupt_record IS NOT NULL").show()

✅ Helps you isolate bad records for manual review or automated cleanup.


Pages: 1 2 3 4 5 6 7


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Posted in

Leave a Reply

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading