Yes β schema enforcement absolutely applies when reading CSV or Excel files in PySpark, just like it does for Parquet, JSON, or other formats.
β What is Schema Enforcement in PySpark?
Schema Enforcement means:
- You define the expected data types and structure (columns, types) manually.
- Spark rejects or fails rows that donβt conform to that schema β depending on how you set the
mode
.
π Supported File Formats
Format | Supports Schema Enforcement? | Notes |
---|---|---|
CSV | β Yes | You must provide a schema manually; otherwise, all columns default to StringType |
Excel | β Yes (via third-party libs) | Not natively supported; use com.crealytics.spark.excel or Pandas for reading |
JSON | β Yes | Can infer or define schema; nested supported |
Parquet | β Strongly enforced | Schema is embedded; mismatch causes failure unless handled |
Delta | β Strongly enforced | Best schema management (auto-merge, evolution, enforcement) |
π§ͺ Example: Schema Enforcement in CSV Reading
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.read.csv("people.csv", header=True, schema=schema, mode="FAILFAST")
π₯ mode="FAILFAST"
If a row has "age"
as "twenty"
(invalid for IntegerType
), Spark will throw an error and stop.
β οΈ Without Schema:
df = spark.read.csv("people.csv", header=True, inferSchema=True)
Spark will try to infer, but:
- It’s slow on large files.
- Not always accurate.
- Can misinterpret data types (e.g., nulls, integers with leading zeroes).
π§Ύ What About Excel?
To read Excel with schema enforcement, use the crealytics Excel Spark package:
df = spark.read.format("com.crealytics.spark.excel") \
.option("header", "true") \
.option("inferSchema", "false") \
.schema(my_schema) \
.load("file.xlsx")
Install with:
--packages com.crealytics:spark-excel_2.12:0.13.5
π§ Summary
Question | Answer |
---|---|
Can schema enforcement be used with CSV? | β Yes, and it’s highly recommended |
What if I donβt define schema? | Spark uses inferSchema (less reliable) or defaults to StringType |
Can it fail on mismatched data? | β
Yes, if using FAILFAST or DROPMALFORMED |
What about Excel? | β
Yes, but with 3rd-party connectors like spark-excel |
Yes β you’re absolutely right to clarify this.
β
Default Behavior of spark.read.csv("file_name")
If you do not specify a schema and you do not enable inferSchema, then:
df = spark.read.csv("file_name.csv", header=True)
β€ All columns will be read as StringType
β regardless of whether the values look numeric, boolean, etc.
π Why?
Because spark.read.csv()
by default:
- Assumes all fields are strings
- Treats values like
"123"
or"true"
as strings ("123"
β 123)
π§ͺ Example
CSV File Content:
id,name,age
1,Alice,30
2,Bob,28
Without schema or inferSchema:
df = spark.read.csv("file.csv", header=True)
df.printSchema()
π Output:
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- age: string (nullable = true)
So all are StringType
.
β To fix this, use one of the two:
Option 1: Enable Inference
df = spark.read.csv("file.csv", header=True, inferSchema=True)
Option 2: Define a Schema Manually
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.read.csv("file.csv", header=True, schema=schema)
π‘ Bonus Tip: Check Data Types After Load
df.dtypes
# [('id', 'int'), ('name', 'string'), ('age', 'int')] # only if schema or inferSchema was used
π Summary
Condition | Behavior |
---|---|
No schema, no inferSchema | β All columns = StringType |
inferSchema=True | β Spark will guess data types (not always perfect) |
Manual schema provided | β Strong schema enforcement, best for production |
Great question β and this is exactly the kind of subtle PySpark behavior that can be confusing!
Letβs clarify the truth behind:
df = spark.read.csv("data.csv", header=True)
df_filtered = df.filter(df["price"] > 100)
π§ Whatβs Happening Here?
When you do:
df = spark.read.csv("data.csv", header=True)
- You did not define a schema
- You did not set
inferSchema=True
- So all columns, including
"price"
, are read asStringType
That means:
df.printSchema()
β Will show:
|-- price: string (nullable = true)
π§ͺ So Why Doesnβt This Code Throw an Error?
df_filtered = df.filter(df["price"] > 100)
Because PySpark will coerce the comparison β it will implicitly convert "price"
(string) to a number when comparing with 100
. But this can lead to silent bugs, like:
"99"
< 100 β"abc"
β fails or filtered out silently"100.0"
may get treated as string and sorted incorrectly
β Best Practice
Always define the schema or use inferSchema=True
if you want to do numeric operations, filtering, or aggregations.
df = spark.read.csv("data.csv", header=True, inferSchema=True)
# OR
df = spark.read.csv("data.csv", header=True, schema=your_schema)
Then:
df.printSchema()
Would show:
|-- price: double (nullable = true)
And your filter will behave correctly:
df_filtered = df.filter(df["price"] > 100)
β οΈ What Can Go Wrong Without Schema?
1. Lexicographic Comparison
"900" > "1000" β β
True (because "9" > "1")
So your filter:
df.filter(df["price"] > 100)
may return wrong rows unless "price"
is really a number.
β Conclusion
Case | price Column Type | Filter Behavior |
---|---|---|
No schema, no inferSchema | StringType | Implicit cast (risky, may misbehave) |
inferSchema=True | DoubleType | β Safe |
Manual schema | DoubleType | β Recommended |
Absolutely! Here’s a robust PySpark code template to read CSV files with:
- β Manual schema enforcement
- β
Error handling (
FAILFAST
,DROPMALFORMED
,PERMISSIVE
) - β Null value handling
- β Logging the schema and a sample preview
β PySpark CSV Reader with Schema Enforcement
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
# Initialize Spark Session
spark = SparkSession.builder \
.appName("CSV Reader with Schema Enforcement") \
.getOrCreate()
# Define schema explicitly (update as per your CSV)
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("price", DoubleType(), True),
StructField("category", StringType(), True)
])
# File path
file_path = "path/to/your/data.csv"
# Read CSV with schema and error handling
try:
df = spark.read.format("csv") \
.option("header", True) \
.option("mode", "FAILFAST") \ # Options: PERMISSIVE | DROPMALFORMED | FAILFAST
.option("nullValue", "NA") \
.schema(schema) \
.load(file_path)
print("β
File loaded successfully with enforced schema.")
df.printSchema()
df.show(5)
except Exception as e:
print("β Failed to load CSV file. Error:")
print(e)
π Mode Options for .option("mode", ...)
Mode | Behavior |
---|---|
"PERMISSIVE" (default) | Malformed rows are kept with null values |
"DROPMALFORMED" | Malformed rows are dropped |
"FAILFAST" | Job fails immediately on bad data (strictest) |
π οΈ Optional Settings You Can Add
.option("delimiter", ",") # For custom delimiters like "|"
.option("quote", "\"") # Handle quoted fields
.option("escape", "\\") # Escape characters
.option("multiLine", "true") # For fields that span multiple lines
.option("encoding", "UTF-8") # For non-ASCII files
.option("ignoreLeadingWhiteSpace", "true")
.option("ignoreTrailingWhiteSpace", "true")
π§ͺ Bonus: Check Schema Types After Read
print(df.dtypes) # [('id', 'int'), ('name', 'string'), ('price', 'double'), ...]
β Good Practices
- π Always use
.schema()
in production pipelines - π§Ό Clean or validate your input files manually if malformed rows are common
- π§ͺ Add a
.count()
after read to confirm row loading - π‘ Log invalid rows if you’re using
PERMISSIVE
Here is your robust PySpark CSV reading template β production-ready, with schema enforcement, error handling, logging, flexible options, and safe defaults.
β Full PySpark CSV Reading Template (Best Practices)
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import logging
# ----------------------------
# 1. Spark Session
# ----------------------------
spark = SparkSession.builder \
.appName("Robust CSV Reader") \
.getOrCreate()
# ----------------------------
# 2. Logging Setup (optional)
# ----------------------------
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("CSVReader")
# ----------------------------
# 3. Define File Path
# ----------------------------
file_path = "path/to/your/file.csv" # Change this
# ----------------------------
# 4. Define Schema Explicitly
# ----------------------------
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("price", DoubleType(), True),
StructField("category", StringType(), True),
StructField("quantity", IntegerType(), True)
])
# ----------------------------
# 5. Read CSV with Best Practices
# ----------------------------
try:
df = spark.read.format("csv") \
.option("header", True) \
.option("inferSchema", False) \
.option("mode", "FAILFAST") \ # or DROPMALFORMED or PERMISSIVE
.option("nullValue", "NA") \
.option("delimiter", ",") \
.option("ignoreLeadingWhiteSpace", True) \
.option("ignoreTrailingWhiteSpace", True) \
.schema(schema) \
.load(file_path)
logger.info("β
CSV loaded successfully with schema enforcement")
df.printSchema()
df.show(5)
logger.info(f"Total rows loaded: {df.count()}")
except Exception as e:
logger.error("β Error while loading CSV:")
logger.error(e)
β οΈ Optional Parameters (use when needed)
.option("quote", "\"") # For quoted text values
.option("escape", "\\") # Escape character
.option("multiLine", True) # Handle multi-line cells
.option("encoding", "UTF-8") # File encoding
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss") # For timestamp parsing
π Modes Comparison
Mode | Behavior |
---|---|
PERMISSIVE (default) | Malformed rows β null values |
DROPMALFORMED | Drops invalid rows |
FAILFAST | Immediately stops on bad data βοΈ recommended for schema testing |
π‘ Pro Tips
- β
Always use
.schema()
in production ETL. - β οΈ Avoid
.inferSchema=True
for large datasets β itβs expensive. - π§ͺ Add
.count()
or.summary()
after read for sanity checks. - β
Wrap in
try/except
to catch schema mismatch or corruption. - π¦ Save cleaned output using
.write.parquet()
or.write.format("delta")
.
π₯ Example Save Output
df_cleaned.write.mode("overwrite").parquet("/cleaned/output/path")
Hereβs a robust PySpark template that covers both your requests:
β Smart CSV Loader for PySpark
π Features:
- Auto schema detection (
inferSchema=True
) - Fallback to manual schema if inference fails
- Handles multiple CSV files
- Logs and isolates bad/malformed files
- Saves good data as Parquet
π§Ύ Template Code
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import os
import logging
# ----------------------------
# 1. Spark Session
# ----------------------------
spark = SparkSession.builder \
.appName("Smart CSV Loader with Fallback & Logging") \
.getOrCreate()
# ----------------------------
# 2. Logging Setup
# ----------------------------
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("SmartCSVLoader")
# ----------------------------
# 3. File Directory
# ----------------------------
input_dir = "path/to/csv/folder"
output_dir = "path/to/cleaned/output"
bad_data_dir = "path/to/bad_files"
# ----------------------------
# 4. Manual Fallback Schema
# ----------------------------
fallback_schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("price", DoubleType(), True),
StructField("category", StringType(), True),
StructField("quantity", IntegerType(), True)
])
# ----------------------------
# 5. Helper Function
# ----------------------------
def load_csv_with_fallback(file_path):
try:
logger.info(f"π Trying to infer schema for: {file_path}")
df = spark.read.format("csv") \
.option("header", True) \
.option("inferSchema", True) \
.option("mode", "FAILFAST") \
.load(file_path)
logger.info("β
Schema inferred successfully")
return df, True
except Exception as e:
logger.warning(f"β οΈ Schema inference failed for {file_path}. Falling back to manual schema.")
try:
df = spark.read.format("csv") \
.option("header", True) \
.option("inferSchema", False) \
.option("mode", "DROPMALFORMED") \
.schema(fallback_schema) \
.load(file_path)
return df, True
except Exception as fallback_err:
logger.error(f"β Failed to load file even with fallback schema: {file_path}")
return None, False
# ----------------------------
# 6. Process All CSV Files
# ----------------------------
for file in os.listdir(input_dir):
if file.endswith(".csv"):
full_path = os.path.join(input_dir, file)
logger.info(f"\nπ Processing file: {file}")
df, success = load_csv_with_fallback(full_path)
if success:
out_path = os.path.join(output_dir, f"{os.path.splitext(file)[0]}.parquet")
df.write.mode("overwrite").parquet(out_path)
logger.info(f"β
Saved cleaned data to {out_path}")
else:
os.rename(full_path, os.path.join(bad_data_dir, file))
logger.warning(f"π« Moved bad file to: {bad_data_dir}")
π Folder Structure Example
input_dir/
βββ good1.csv
βββ good2.csv
βββ bad.csv β malformed rows or bad structure
output_dir/
βββ good1.parquet
βββ good2.parquet
bad_data_dir/
βββ bad.csv β automatically moved here
π§ Summary
Feature | Covered? |
---|---|
Auto schema inference | β |
Fallback to manual schema | β |
Multiple file processing | β |
Logs & error tracking | β |
Moves bad files separately | β |
Saves clean data as Parquet | β |
Leave a Reply