Yes β€” schema enforcement absolutely applies when reading CSV or Excel files in PySpark, just like it does for Parquet, JSON, or other formats.


βœ… What is Schema Enforcement in PySpark?

Schema Enforcement means:

  • You define the expected data types and structure (columns, types) manually.
  • Spark rejects or fails rows that don’t conform to that schema β€” depending on how you set the mode.

πŸ“ Supported File Formats

FormatSupports Schema Enforcement?Notes
CSVβœ… YesYou must provide a schema manually; otherwise, all columns default to StringType
Excelβœ… Yes (via third-party libs)Not natively supported; use com.crealytics.spark.excel or Pandas for reading
JSONβœ… YesCan infer or define schema; nested supported
Parquetβœ… Strongly enforcedSchema is embedded; mismatch causes failure unless handled
Deltaβœ… Strongly enforcedBest schema management (auto-merge, evolution, enforcement)

πŸ§ͺ Example: Schema Enforcement in CSV Reading

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

df = spark.read.csv("people.csv", header=True, schema=schema, mode="FAILFAST")

πŸ’₯ mode="FAILFAST"

If a row has "age" as "twenty" (invalid for IntegerType), Spark will throw an error and stop.


⚠️ Without Schema:

df = spark.read.csv("people.csv", header=True, inferSchema=True)

Spark will try to infer, but:

  • It’s slow on large files.
  • Not always accurate.
  • Can misinterpret data types (e.g., nulls, integers with leading zeroes).

🧾 What About Excel?

To read Excel with schema enforcement, use the crealytics Excel Spark package:

df = spark.read.format("com.crealytics.spark.excel") \
    .option("header", "true") \
    .option("inferSchema", "false") \
    .schema(my_schema) \
    .load("file.xlsx")

Install with:

--packages com.crealytics:spark-excel_2.12:0.13.5

🧠 Summary

QuestionAnswer
Can schema enforcement be used with CSV?βœ… Yes, and it’s highly recommended
What if I don’t define schema?Spark uses inferSchema (less reliable) or defaults to StringType
Can it fail on mismatched data?βœ… Yes, if using FAILFAST or DROPMALFORMED
What about Excel?βœ… Yes, but with 3rd-party connectors like spark-excel

Yes β€” you’re absolutely right to clarify this.

βœ… Default Behavior of spark.read.csv("file_name")

If you do not specify a schema and you do not enable inferSchema, then:

df = spark.read.csv("file_name.csv", header=True)

➀ All columns will be read as StringType β€” regardless of whether the values look numeric, boolean, etc.


πŸ” Why?

Because spark.read.csv() by default:

  • Assumes all fields are strings
  • Treats values like "123" or "true" as strings ("123" β‰  123)

πŸ§ͺ Example

CSV File Content:

id,name,age
1,Alice,30
2,Bob,28

Without schema or inferSchema:

df = spark.read.csv("file.csv", header=True)
df.printSchema()

πŸ“„ Output:

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)

So all are StringType.


βœ… To fix this, use one of the two:

Option 1: Enable Inference

df = spark.read.csv("file.csv", header=True, inferSchema=True)

Option 2: Define a Schema Manually

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

df = spark.read.csv("file.csv", header=True, schema=schema)

πŸ’‘ Bonus Tip: Check Data Types After Load

df.dtypes
# [('id', 'int'), ('name', 'string'), ('age', 'int')]  # only if schema or inferSchema was used

πŸ”š Summary

ConditionBehavior
No schema, no inferSchema❌ All columns = StringType
inferSchema=Trueβœ… Spark will guess data types (not always perfect)
Manual schema providedβœ… Strong schema enforcement, best for production

Great question β€” and this is exactly the kind of subtle PySpark behavior that can be confusing!

Let’s clarify the truth behind:

df = spark.read.csv("data.csv", header=True)
df_filtered = df.filter(df["price"] > 100)

🧠 What’s Happening Here?

When you do:

df = spark.read.csv("data.csv", header=True)
  • You did not define a schema
  • You did not set inferSchema=True
  • So all columns, including "price", are read as StringType

That means:

df.printSchema()

➜ Will show:

|-- price: string (nullable = true)

πŸ§ͺ So Why Doesn’t This Code Throw an Error?

df_filtered = df.filter(df["price"] > 100)

Because PySpark will coerce the comparison β€” it will implicitly convert "price" (string) to a number when comparing with 100. But this can lead to silent bugs, like:

  • "99" < 100 βœ…
  • "abc" β†’ fails or filtered out silently
  • "100.0" may get treated as string and sorted incorrectly

βœ… Best Practice

Always define the schema or use inferSchema=True if you want to do numeric operations, filtering, or aggregations.

df = spark.read.csv("data.csv", header=True, inferSchema=True)
# OR
df = spark.read.csv("data.csv", header=True, schema=your_schema)

Then:

df.printSchema()

Would show:

|-- price: double (nullable = true)

And your filter will behave correctly:

df_filtered = df.filter(df["price"] > 100)

⚠️ What Can Go Wrong Without Schema?

1. Lexicographic Comparison

"900" > "1000"  β†’ βœ… True (because "9" > "1")

So your filter:

df.filter(df["price"] > 100)

may return wrong rows unless "price" is really a number.


βœ… Conclusion

Caseprice Column TypeFilter Behavior
No schema, no inferSchemaStringTypeImplicit cast (risky, may misbehave)
inferSchema=TrueDoubleTypeβœ… Safe
Manual schemaDoubleTypeβœ… Recommended

Absolutely! Here’s a robust PySpark code template to read CSV files with:

  • βœ… Manual schema enforcement
  • βœ… Error handling (FAILFAST, DROPMALFORMED, PERMISSIVE)
  • βœ… Null value handling
  • βœ… Logging the schema and a sample preview

βœ… PySpark CSV Reader with Schema Enforcement

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("CSV Reader with Schema Enforcement") \
    .getOrCreate()

# Define schema explicitly (update as per your CSV)
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("category", StringType(), True)
])

# File path
file_path = "path/to/your/data.csv"

# Read CSV with schema and error handling
try:
    df = spark.read.format("csv") \
        .option("header", True) \
        .option("mode", "FAILFAST") \  # Options: PERMISSIVE | DROPMALFORMED | FAILFAST
        .option("nullValue", "NA") \
        .schema(schema) \
        .load(file_path)

    print("βœ… File loaded successfully with enforced schema.")
    df.printSchema()
    df.show(5)

except Exception as e:
    print("❌ Failed to load CSV file. Error:")
    print(e)

πŸ” Mode Options for .option("mode", ...)

ModeBehavior
"PERMISSIVE" (default)Malformed rows are kept with null values
"DROPMALFORMED"Malformed rows are dropped
"FAILFAST"Job fails immediately on bad data (strictest)

πŸ› οΈ Optional Settings You Can Add

.option("delimiter", ",")          # For custom delimiters like "|"
.option("quote", "\"")             # Handle quoted fields
.option("escape", "\\")            # Escape characters
.option("multiLine", "true")       # For fields that span multiple lines
.option("encoding", "UTF-8")       # For non-ASCII files
.option("ignoreLeadingWhiteSpace", "true")
.option("ignoreTrailingWhiteSpace", "true")

πŸ§ͺ Bonus: Check Schema Types After Read

print(df.dtypes)   # [('id', 'int'), ('name', 'string'), ('price', 'double'), ...]

βœ… Good Practices

  • πŸ”’ Always use .schema() in production pipelines
  • 🧼 Clean or validate your input files manually if malformed rows are common
  • πŸ§ͺ Add a .count() after read to confirm row loading
  • πŸ’‘ Log invalid rows if you’re using PERMISSIVE

Here is your robust PySpark CSV reading template β€” production-ready, with schema enforcement, error handling, logging, flexible options, and safe defaults.


βœ… Full PySpark CSV Reading Template (Best Practices)

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import logging

# ----------------------------
# 1. Spark Session
# ----------------------------
spark = SparkSession.builder \
    .appName("Robust CSV Reader") \
    .getOrCreate()

# ----------------------------
# 2. Logging Setup (optional)
# ----------------------------
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("CSVReader")

# ----------------------------
# 3. Define File Path
# ----------------------------
file_path = "path/to/your/file.csv"  # Change this

# ----------------------------
# 4. Define Schema Explicitly
# ----------------------------
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("category", StringType(), True),
    StructField("quantity", IntegerType(), True)
])

# ----------------------------
# 5. Read CSV with Best Practices
# ----------------------------
try:
    df = spark.read.format("csv") \
        .option("header", True) \
        .option("inferSchema", False) \
        .option("mode", "FAILFAST") \   # or DROPMALFORMED or PERMISSIVE
        .option("nullValue", "NA") \
        .option("delimiter", ",") \
        .option("ignoreLeadingWhiteSpace", True) \
        .option("ignoreTrailingWhiteSpace", True) \
        .schema(schema) \
        .load(file_path)

    logger.info("βœ… CSV loaded successfully with schema enforcement")
    df.printSchema()
    df.show(5)

    logger.info(f"Total rows loaded: {df.count()}")

except Exception as e:
    logger.error("❌ Error while loading CSV:")
    logger.error(e)

⚠️ Optional Parameters (use when needed)

.option("quote", "\"")               # For quoted text values
.option("escape", "\\")              # Escape character
.option("multiLine", True)           # Handle multi-line cells
.option("encoding", "UTF-8")         # File encoding
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")  # For timestamp parsing

πŸ” Modes Comparison

ModeBehavior
PERMISSIVE (default)Malformed rows β†’ null values
DROPMALFORMEDDrops invalid rows
FAILFASTImmediately stops on bad data βœ”οΈ recommended for schema testing

πŸ’‘ Pro Tips

  • βœ… Always use .schema() in production ETL.
  • ⚠️ Avoid .inferSchema=True for large datasets β€” it’s expensive.
  • πŸ§ͺ Add .count() or .summary() after read for sanity checks.
  • βœ… Wrap in try/except to catch schema mismatch or corruption.
  • πŸ“¦ Save cleaned output using .write.parquet() or .write.format("delta").

πŸ“₯ Example Save Output

df_cleaned.write.mode("overwrite").parquet("/cleaned/output/path")

Here’s a robust PySpark template that covers both your requests:


βœ… Smart CSV Loader for PySpark

πŸš€ Features:

  • Auto schema detection (inferSchema=True)
  • Fallback to manual schema if inference fails
  • Handles multiple CSV files
  • Logs and isolates bad/malformed files
  • Saves good data as Parquet

🧾 Template Code

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import os
import logging

# ----------------------------
# 1. Spark Session
# ----------------------------
spark = SparkSession.builder \
    .appName("Smart CSV Loader with Fallback & Logging") \
    .getOrCreate()

# ----------------------------
# 2. Logging Setup
# ----------------------------
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("SmartCSVLoader")

# ----------------------------
# 3. File Directory
# ----------------------------
input_dir = "path/to/csv/folder"
output_dir = "path/to/cleaned/output"
bad_data_dir = "path/to/bad_files"

# ----------------------------
# 4. Manual Fallback Schema
# ----------------------------
fallback_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("category", StringType(), True),
    StructField("quantity", IntegerType(), True)
])

# ----------------------------
# 5. Helper Function
# ----------------------------
def load_csv_with_fallback(file_path):
    try:
        logger.info(f"πŸ” Trying to infer schema for: {file_path}")
        df = spark.read.format("csv") \
            .option("header", True) \
            .option("inferSchema", True) \
            .option("mode", "FAILFAST") \
            .load(file_path)

        logger.info("βœ… Schema inferred successfully")
        return df, True

    except Exception as e:
        logger.warning(f"⚠️ Schema inference failed for {file_path}. Falling back to manual schema.")
        try:
            df = spark.read.format("csv") \
                .option("header", True) \
                .option("inferSchema", False) \
                .option("mode", "DROPMALFORMED") \
                .schema(fallback_schema) \
                .load(file_path)
            return df, True
        except Exception as fallback_err:
            logger.error(f"❌ Failed to load file even with fallback schema: {file_path}")
            return None, False

# ----------------------------
# 6. Process All CSV Files
# ----------------------------
for file in os.listdir(input_dir):
    if file.endswith(".csv"):
        full_path = os.path.join(input_dir, file)
        logger.info(f"\nπŸ“‚ Processing file: {file}")
        
        df, success = load_csv_with_fallback(full_path)

        if success:
            out_path = os.path.join(output_dir, f"{os.path.splitext(file)[0]}.parquet")
            df.write.mode("overwrite").parquet(out_path)
            logger.info(f"βœ… Saved cleaned data to {out_path}")
        else:
            os.rename(full_path, os.path.join(bad_data_dir, file))
            logger.warning(f"🚫 Moved bad file to: {bad_data_dir}")

πŸ“‚ Folder Structure Example

input_dir/
β”œβ”€β”€ good1.csv
β”œβ”€β”€ good2.csv
β”œβ”€β”€ bad.csv        ← malformed rows or bad structure
output_dir/
β”œβ”€β”€ good1.parquet
β”œβ”€β”€ good2.parquet
bad_data_dir/
β”œβ”€β”€ bad.csv        ← automatically moved here

🧠 Summary

FeatureCovered?
Auto schema inferenceβœ…
Fallback to manual schemaβœ…
Multiple file processingβœ…
Logs & error trackingβœ…
Moves bad files separatelyβœ…
Saves clean data as Parquetβœ…

Pages: 1 2 3


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Posted in

Leave a Reply

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading