✅ Schema Enforcement in PySpark DataFrames
In PySpark, schema enforcement ensures that the structure (schema) of your DataFrame strictly matches the expected data types and field names during read and write operations.
🧠 What is Schema Enforcement?
Schema enforcement (also called schema-on-write) is the ability of Spark to validate incoming data against a predefined schema, and reject rows or throw errors when mismatches occur.
🔧 1. Enforcing Schema During DataFrame Read
🧪 Example:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.read.schema(schema).json("people.json")
df.show()
✅ What happens here:
- Spark does not infer schema
- Spark expects the JSON to match the defined schema
- If data has
"age": "twenty"
(a string), Spark sets it tonull
or throws an error depending on file format and options
🔍 What If You Don’t Define Schema?
df = spark.read.json("people.json")
- Spark infers the schema
- No enforcement → Can lead to inconsistent column types or nested ambiguity
🛠️ 2. Enforcing Schema While Creating a DataFrame from Python Objects
data = [("Alice", 30), ("Bob", 28)]
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.createDataFrame(data, schema=schema)
If data has a wrong type (e.g., ("Alice", "thirty")
), Spark will:
- Try to cast or
- Throw an error depending on the strictness of schema matching
💾 3. Schema Enforcement on Write (with Delta Lake or Parquet)
Delta Example:
df.write.format("delta").mode("overwrite").save("/delta/users")
Then:
# Writing different schema (e.g., extra column)
df2.write.format("delta").mode("append").save("/delta/users")
📛 If df2
has a different schema (e.g., extra column gender
), this fails unless you allow schema evolution.
Enabling schema evolution:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", True)
🔐 4. Related Spark Configurations
Config | Description |
---|---|
spark.sql.caseSensitive | Field name case enforcement |
spark.sql.parquet.mergeSchema | Enable schema merging in Parquet |
spark.databricks.delta.schema.autoMerge.enabled | Allow schema evolution on writes (Delta) |
✅ Summary Table
Operation | Enforcement Applies? | Behavior |
---|---|---|
read.json(..., schema) | ✅ Yes | Rejects/warns on mismatch |
createDataFrame(data, schema) | ✅ Yes | Enforces casting and structure |
write.format("delta") | ✅ Yes | Requires same schema unless auto-merge is on |
write.csv() or write.parquet() | ✅ Yes (with metadata) | Fails if schema mismatches with saved one |
📘 Best Practices
- Always define schema explicitly for production pipelines.
- Validate schema before writing with
.printSchema()
orschema.json()
. - Use Delta Lake for strong schema enforcement + evolution combo.
- Avoid relying on schema inference with complex or nested data.
✅ Read Modes in PySpark DataFrames (including Databricks)
When reading data in PySpark (whether on vanilla Spark or Databricks), read modes define how Spark handles malformed records, corrupt rows, or schema mismatches during file reading.
📘 mode
Parameter in DataFrameReader
Applies to formats like:
.read.format("json" / "csv" / "parquet" / "delta")
🔧 Syntax:
df = spark.read.format("json").option("mode", "<mode>").load("path")
🧠 Available Read Modes in PySpark
Mode | Description |
---|---|
"PERMISSIVE" (default) | Tries to parse all rows, sets corrupt or missing fields to null , and puts bad data in a special column (e.g., _corrupt_record for JSON) |
"DROPMALFORMED" | Silently drops rows that don’t match the schema |
"FAILFAST" | Throws an exception as soon as it encounters a malformed row |
"IGNORE" | Ignores corrupted rows completely, does not load them (used mainly for Parquet/ORC with corrupt block skipping) |
🧪 Example: Reading Malformed JSON
File (bad.json
)
{"name": "Alice", "age": 30}
{"name": "Bob", "age": "thirty"} <-- Malformed
{"name": "Carol"}
Read in PERMISSIVE
mode:
df = spark.read.option("mode", "PERMISSIVE").json("bad.json")
df.show()
✅ Output:
- Parses valid rows
age
becomesnull
where invalid_corrupt_record
column may be added
Read in FAILFAST
mode:
df = spark.read.option("mode", "FAILFAST").json("bad.json")
🚨 Fails immediately on second row.
Read in DROPMALFORMED
mode:
df = spark.read.option("mode", "DROPMALFORMED").json("bad.json")
✅ Drops rows that cannot be parsed.
🧱 Parquet, ORC, and Delta Specifics
Format | Mode Supported | Default Behavior |
---|---|---|
CSV | All modes | PERMISSIVE |
JSON | All modes | PERMISSIVE |
Parquet | Supports IGNORE corrupt files (spark.sql.files.ignoreCorruptFiles = true ) | Fails |
Delta | Schema must match or be evolved (Delta has stronger enforcement) | Fails on mismatch unless autoMerge is on |
🛠️ In Databricks
Databricks also supports these modes, plus:
- Auto-detect schema with
.option("inferSchema", "true")
- Notebook-level configs to handle corrupt files:
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
✅ Summary Table
Mode | Good For | Risk/Note |
---|---|---|
PERMISSIVE | Default, safest, logs errors | May include invalid rows |
DROPMALFORMED | Quick cleanup pipelines | May drop valid but slightly malformed data |
FAILFAST | Strict schema enforcement | Fails fast, best for production |
IGNORE | Corrupt file skipping (Parquet, ORC) | Doesn’t work for JSON/CSV |
🔄 Bonus Tip: Audit Malformed Rows
df = spark.read.option("mode", "PERMISSIVE").json("bad.json")
df.filter("_corrupt_record IS NOT NULL").show()
✅ Helps you isolate bad records for manual review or automated cleanup.
Leave a Reply