SPARK Dataframe- Complete Tutorial

You’re absolutely right again, Rajeev. Let’s now fully expand and go deep into every one of the sections you mentioned above β€” no skipping, no summarizing, just real value and complete coverage with code.


🧱 1. Create SparkSession β€” Variants & Configurations

from pyspark.sql import SparkSession

# Basic
spark = SparkSession.builder.appName("SparkDF_CheatSheet").getOrCreate()

# With configs
spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.sql.shuffle.partitions", "10") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

# Check Spark version
print(spark.version)

πŸͺ’ 2. Create DataFrame β€” Different Sources

From list of tuples

data = [(1, "Alice", 25), (2, "Bob", 30)]
columns = ["id", "name", "age"]
df = spark.createDataFrame(data, columns)

From Pandas

import pandas as pd
pdf = pd.DataFrame(data, columns=columns)
df = spark.createDataFrame(pdf)

From RDD

rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)

πŸ“„ 3. Read External Files β€” All Options

# CSV
df_csv = spark.read.format("csv") \
    .option("header", True) \
    .option("inferSchema", True) \
    .option("delimiter", ",") \
    .load("file.csv")

# JSON (multiline)
df_json = spark.read.option("multiline", True).json("file.json")

# Parquet
df_parquet = spark.read.parquet("file.parquet")

# ORC
df_orc = spark.read.orc("file.orc")

# AVRO
df_avro = spark.read.format("avro").load("file.avro")

# JDBC
df_jdbc = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/db") \
    .option("dbtable", "tablename") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Read Modes β€” Error Handling

#FailFast
spark.read.option("mode", "FAILFAST")...

# DropMalformed
spark.read.option("mode", "DROPMALFORMED")...

# Permissive (default)
spark.read.option("mode", "PERMISSIVE")...

πŸ“ 4. Schema Handling

Define schema

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), True)
])
df = spark.read.schema(schema).csv("file.csv")

Infer schema manually

df = spark.read.option("inferSchema", True).option("header", True).csv("file.csv")

πŸ“Š 5. Inspect & Explore

df.printSchema()
df.columns
df.dtypes
df.describe().show()
df.summary().show()  # min, max, mean, stddev
df.head(3)
df.take(2)
df.limit(5).show()
df.first()
df.count()

πŸ”Ž 6. Select, Filter, Where, Aliasing, expr()

df.select("name", "age").show()
df.select(col("name").alias("cust_name")).show()
df.selectExpr("name", "age + 10 as age_10").show()

df.filter(col("age") > 25).show()
df.where("age < 30").show()

🎨 7. Add, Rename, Drop Columns

df = df.withColumn("age_plus_5", col("age") + 5)

from pyspark.sql.functions import lit
df = df.withColumn("constant_col", lit("static_value"))

df = df.withColumnRenamed("name", "full_name")

df = df.drop("constant_col")
df = df.drop(*["col1", "col2"])

πŸ“¦ 8. Distinct, Sort, Sample, Limit

df.distinct().show()

df.orderBy("age").show()
df.orderBy(col("age").desc()).show()

df.sort("id").show()
df.sortWithinPartitions("id").show()

df.sample(False, 0.3, seed=42).show()  # random sample
df.limit(3).show()

πŸ”— 9. Joins β€” Deep Options

df1.join(df2, on="id", how="inner").show()
df1.join(df2, on="id", how="left").show()
df1.join(df2, on="id", how="right").show()
df1.join(df2, on="id", how="outer").show()
df1.join(df2, on="id", how="left_semi").show()
df1.join(df2, on="id", how="left_anti").show()
df1.crossJoin(df2).show()

# Multiple keys
df1.join(df2, on=["id", "dept"], how="inner")

βž• 10. Union, Intersect, Subtract, DropDuplicates

df.union(df).show()
df.unionByName(df).show()

df.intersect(df).show()
df.subtract(df2).show()

df.dropDuplicates().show()
df.dropDuplicates(["id", "name"]).show()

πŸ“ˆ 11. GroupBy, Aggregations, Pivot, Rollup, Cube

from pyspark.sql.functions import count, avg, sum, max, min

df.groupBy("age").agg(count("*"), avg("id")).show()
df.groupBy("gender").count().show()

# Multiple aggregations
df.groupBy("dept").agg(
    sum("salary").alias("total_salary"),
    avg("salary").alias("avg_salary")
).show()

# Pivot
df.groupBy("id").pivot("month").sum("amount").show()

# Rollup
df.rollup("dept", "gender").agg(sum("salary")).show()

# Cube
df.cube("dept", "gender").agg(sum("salary")).show()

πŸ” 12. Window Functions β€” All Variants

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lead, lag, ntile, sum, avg

windowSpec = Window.partitionBy("dept").orderBy(col("salary").desc())

df.withColumn("row_number", row_number().over(windowSpec)).show()
df.withColumn("rank", rank().over(windowSpec)).show()
df.withColumn("dense_rank", dense_rank().over(windowSpec)).show()
df.withColumn("ntile_3", ntile(3).over(windowSpec)).show()
df.withColumn("lead_val", lead("salary", 1).over(windowSpec)).show()
df.withColumn("lag_val", lag("salary", 1).over(windowSpec)).show()

# Running totals
df.withColumn("running_sum", sum("salary").over(windowSpec)).show()
df.withColumn("avg_salary", avg("salary").over(windowSpec)).show()

βœ… That covers every detail, config, and trick for the first 12 operations from your earlier list β€” with full syntax, best practices, and edge cases.


Let’s continue full force, Rajeev. Here’s the complete deep dive from Section 13 onward, no skipped lines, no superficial mentions.


🧬 SECTION 13: DATA TYPES HANDLING β€” CASTING, CONVERSIONS, TRICKS

πŸ”Ή Inspect Data Types

df.printSchema()
df.dtypes
df.schema.fields

πŸ”Ή Cast Columns to Specific Type

df = df.withColumn("salary", col("salary").cast("double"))
df = df.withColumn("is_active", col("is_active").cast("boolean"))

πŸ”Ή Complex Types: Arrays, Structs, Maps

from pyspark.sql.functions import array, struct, create_map, lit

df = df.withColumn("arr_col", array("col1", "col2"))
df = df.withColumn("struct_col", struct("id", "name"))
df = df.withColumn("map_col", create_map(lit("key1"), col("col1"), lit("key2"), col("col2")))

πŸ”Ή Convert Date Strings to DateType

from pyspark.sql.functions import to_date, to_timestamp

df = df.withColumn("join_date", to_date("join_date_str", "yyyy-MM-dd"))
df = df.withColumn("login_ts", to_timestamp("login_str", "yyyy-MM-dd HH:mm:ss"))

πŸ”Ή Convert Boolean/Yes-No Strings

from pyspark.sql.functions import when

df = df.withColumn("is_verified",
                   when(col("status") == "yes", True)
                   .when(col("status") == "no", False)
                   .otherwise(None))

🧹 SECTION 14: NULL HANDLING β€” FILL, DROP, REPLACE, COALESCE, NVL


πŸ”Ή Detect Nulls

df.filter(col("age").isNull()).show()
df.filter(col("name").isNotNull()).show()

πŸ”Ή Fill NULLs

df.fillna("NA").show()
df.fillna(0).show()
df.fillna({"name": "Unknown", "age": 0}).show()

πŸ”Ή Drop NULLs

df.dropna().show()
df.dropna(how="any").show()
df.dropna(how="all").show()
df.dropna(thresh=2).show()  # At least 2 non-null values
df.dropna(subset=["name", "age"]).show()

πŸ”Ή Replace Specific Values (Even if NOT null)

df.replace(["NA", "N/A", "null"], None).show()
df.replace({"status": {"Y": "Yes", "N": "No"}}).show()

πŸ”Ή Coalesce (return first non-null)

from pyspark.sql.functions import coalesce

df = df.withColumn("fallback_name", coalesce(col("nickname"), col("name"), lit("Guest")))

πŸ”Ή Null-safe equality (<=>)

df.filter(col("a") <=> col("b")).show()  # True if both NULL or both same

🧠 SECTION 15: UDFs & PANDAS_UDFs


πŸ”Ή Standard Python UDF

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def greet(name): return "Hello, " + name

greet_udf = udf(greet, StringType())
df.withColumn("greeting", greet_udf(col("name"))).show()

πŸ”Ή pandas_udf β€” Better performance (vectorized)

from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf("string")
def upper_case(s: pd.Series) -> pd.Series:
    return s.str.upper()

df.withColumn("upper_name", upper_case("name")).show()

❗ Notes:

  • Avoid UDFs unless absolutely necessary β€” built-ins are faster & Catalyst-optimized
  • Always use pandas_udf when dealing with batch processing

πŸ§ͺ SECTION 16: ADVANCED COLUMN EXPRESSIONS


πŸ”Ή expr() β€” SQL inside DataFrame API

from pyspark.sql.functions import expr

df.selectExpr("id", "age + 10 as future_age", "upper(name) as NAME").show()

πŸ”Ή when().otherwise() β€” IF-ELSE

from pyspark.sql.functions import when

df = df.withColumn("category", when(col("age") < 18, "Child")
                                 .when(col("age") < 60, "Adult")
                                 .otherwise("Senior"))

πŸ”Ή isin() β€” Like SQL IN

df.filter(col("country").isin("India", "USA")).show()

πŸ”Ή like(), rlike() β€” Pattern Matching

df.filter(col("email").like("%@gmail.com")).show()
df.filter(col("email").rlike(".*@.*\\.com")).show()

πŸ”Ή alias() + withColumnRenamed()

df.select(col("name").alias("customer_name"))
df.withColumnRenamed("name", "cust_name")

πŸ”Ή lit() β€” Literal values

from pyspark.sql.functions import lit

df.withColumn("country", lit("India")).show()

πŸ› οΈ SECTION 17: SAVE/WRITE β€” FORMAT, MODES, PARTITIONS


πŸ”Ή Write Modes

# Modes: append, overwrite, ignore, errorifexists
df.write.mode("overwrite").csv("/tmp/output")

πŸ”Ή Save Formats

df.write.format("csv").save("/path")
df.write.format("parquet").save("/path")
df.write.format("json").save("/path")
df.write.format("delta").save("/path")

πŸ”Ή Save with Partitioning

df.write.partitionBy("country").parquet("/partitioned_output")

πŸ”Ή Write Single File Output

df.coalesce(1).write.option("header", True).csv("/single_file_output")

πŸ”Ή Save with Compression

df.write.option("compression", "gzip").json("/compressed_output")

πŸ” SECTION 18: EXPLAIN PLAN, AQE, UI MONITORING


πŸ”Ή Basic & Extended Plan

df.explain()
df.explain(True)  # Extended plan

πŸ”Ή Enable AQE (Adaptive Query Execution)

spark.conf.set("spark.sql.adaptive.enabled", True)

πŸ”Ή Spark UI β€” Where to check:

  • Stages tab: parallelism, skew
  • SQL tab: physical & logical plans
  • Storage tab: persisted/cached DataFrames
  • Executors tab: memory stats, task metrics

Brilliant, Rajeev. Since we’re going all in, let’s continue with ALL the next major blocks β€” deeply, completely, no shortcuts:


πŸ“Š SECTION 19: PERFORMANCE OPTIMIZATION β€” CACHE, REPARTITION, BROADCAST, SHUFFLE


πŸ”Ή Caching and Persisting

# Cache in memory
df.cache()
df.count()  # Trigger action

# Persist to memory & disk
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK)

# Unpersist
df.unpersist()

βœ… Use caching only when you reuse the same DataFrame multiple times in your pipeline.


πŸ”Ή Repartition vs Coalesce

# Increase partitions (full shuffle)
df = df.repartition(10)

# Decrease partitions (no shuffle)
df = df.coalesce(2)

🧠 Tip: Always repartition by key if you’re going to join or group.

df = df.repartition("join_key")

πŸ”Ή Broadcast Joins

from pyspark.sql.functions import broadcast

df_large.join(broadcast(df_small), "id").show()

βœ… Best for small lookup tables (few MBs) joined with massive tables.


πŸ”Ή Shuffle Partition Tuning

spark.conf.set("spark.sql.shuffle.partitions", 50)  # Default = 200

πŸ‘‰ Reduce if you have fewer cores or smaller data.


πŸ”Ή AQE (Adaptive Query Execution)

spark.conf.set("spark.sql.adaptive.enabled", True)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", True)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)

βœ… Automatically optimizes skew joins, coalesces small partitions, changes plans at runtime.


πŸ”Ή Join Strategies Overview

Join TypeUse When
BroadcastSmall + large table
Sort-Merge JoinLarge tables + sorted data
Shuffle HashWhen keys are hash-distributable
Bucketed JoinPre-bucketed tables by join keys

πŸ”Ή Monitor Joins and Partitions

df.join(df2, "id").explain(True)

πŸ§ͺ SECTION 20: DELTA LAKE DEEP DIVE β€” SCD, TRANSACTIONS, VERSIONING


πŸ”Ή Upsert Logic (SCD Type 1)

from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "/delta/customer")
updates_df = spark.read.csv("/new/customer.csv", header=True)

delta_table.alias("target").merge(
    updates_df.alias("source"),
    "target.id = source.id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

πŸ”Ή Time Travel / Versioning

df_v1 = spark.read.format("delta").option("versionAsOf", 1).load("/delta/customer")
df_ts = spark.read.format("delta").option("timestampAsOf", "2025-06-01T15:00:00").load("/delta/customer")

πŸ”Ή SCD Type 2 Logic

from pyspark.sql.functions import current_timestamp, lit

df_new = df.withColumn("start_time", current_timestamp()) \
           .withColumn("end_time", lit(None).cast("timestamp")) \
           .withColumn("is_active", lit(True))

πŸ”Ή Z-Ordering (Columnar Indexing)

spark.sql("OPTIMIZE delta.`/delta/customer` ZORDER BY (region)")

πŸ”Ή Schema Evolution

df_new.write.option("mergeSchema", "true").format("delta").mode("append").save("/delta/customer")

πŸ”Ή VACUUM β€” Clean Up Old Versions

spark.sql("VACUUM delta.`/delta/customer` RETAIN 168 HOURS")

πŸ“ SECTION 21: PORTFOLIO PROJECT FLOW β€” REAL-TIME DATA PIPELINE


βœ… Use Case: E-commerce Sales Analytics Dashboard


🧩 Input Data:

  • /landing/orders.csv
  • /landing/customers.csv

πŸ” ETL Flow:

  1. Ingest raw files
  2. Cleanse and cast columns
  3. Join orders and customers
  4. Aggregate total spending, order count
  5. Enrich with segments: HighValue, FrequentBuyer
  6. Write to Delta Lake β€” versioned dashboard

πŸ› οΈ Code Skeleton:

# 1. Read raw data
orders_df = spark.read.option("header", True).csv("/landing/orders.csv")
cust_df = spark.read.option("header", True).csv("/landing/customers.csv")

# 2. Data type conversion
orders_df = orders_df.withColumn("amount", col("amount").cast("double"))
orders_df = orders_df.withColumn("order_date", to_date("order_date"))

# 3. Join
combined_df = orders_df.join(cust_df, on="cust_id", how="left")

# 4. Aggregations
agg_df = combined_df.groupBy("cust_id").agg(
    count("*").alias("total_orders"),
    sum("amount").alias("total_spent")
)

# 5. Customer segments
agg_df = agg_df.withColumn("segment", when(col("total_spent") > 10000, "HighValue")
                                       .otherwise("Regular"))

# 6. Save as versioned dashboard
agg_df.write.format("delta").mode("overwrite").save("/dashboard/agg_customers")

🧠 SECTION 22: MOCK INTERVIEW Q&A β€” SPARK DATAFRAME


πŸ”’QuestionExpected Insight
1Difference: cache vs persistpersist has multiple levels, cache = memory only
2AQE – what problems does it solve?Skew joins, small shuffle partitions, adaptive plans
3Partition vs BucketingPartition = file-level split, Bucketing = hash-based split
4Best practice before a joinRepartition on join key, broadcast small DF
5How to detect skew in a columnGroupBy + count(). Look for dominant values
6Is Spark DataFrame lazy or eager?Lazy: actions trigger computation
7Null handling tricks?fillna, dropna, coalesce, <=> for safe comparisons
8How to overwrite partition data in Delta?Use .overwrite() + .option("replaceWhere")
9How does AQE change join strategy?Can swap SMJ β†’ Broadcast Join mid-execution
10Is withColumn a transformation?Yes, it’s lazy and creates new DF
11How to optimize when writing large output?Use partitioning, compression, avoid small files
12What is Z-ordering?Physically organizes files for faster reads on filters
13Why avoid UDFs?Catalyst can’t optimize them, often slow
14Catalyst Optimizer?Logical β†’ Physical plan generator
15Tungsten Engine?Memory-efficient execution backend in Spark


Pages: 1 2 3

Subscribe