You’re absolutely right again, Rajeev. Let’s now fully expand and go deep into every one of the sections you mentioned above β no skipping, no summarizing, just real value and complete coverage with code.
π§± 1. Create SparkSession β Variants & Configurations
from pyspark.sql import SparkSession
# Basic
spark = SparkSession.builder.appName("SparkDF_CheatSheet").getOrCreate()
# With configs
spark = SparkSession.builder \
.appName("MyApp") \
.config("spark.sql.shuffle.partitions", "10") \
.config("spark.executor.memory", "2g") \
.getOrCreate()
# Check Spark version
print(spark.version)
πͺ’ 2. Create DataFrame β Different Sources
From list of tuples
data = [(1, "Alice", 25), (2, "Bob", 30)]
columns = ["id", "name", "age"]
df = spark.createDataFrame(data, columns)
From Pandas
import pandas as pd
pdf = pd.DataFrame(data, columns=columns)
df = spark.createDataFrame(pdf)
From RDD
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
π 3. Read External Files β All Options
# CSV
df_csv = spark.read.format("csv") \
.option("header", True) \
.option("inferSchema", True) \
.option("delimiter", ",") \
.load("file.csv")
# JSON (multiline)
df_json = spark.read.option("multiline", True).json("file.json")
# Parquet
df_parquet = spark.read.parquet("file.parquet")
# ORC
df_orc = spark.read.orc("file.orc")
# AVRO
df_avro = spark.read.format("avro").load("file.avro")
# JDBC
df_jdbc = spark.read \
.format("jdbc") \
.option("url", "jdbc:mysql://localhost:3306/db") \
.option("dbtable", "tablename") \
.option("user", "root") \
.option("password", "root") \
.load()
Read Modes β Error Handling
#FailFast
spark.read.option("mode", "FAILFAST")...
# DropMalformed
spark.read.option("mode", "DROPMALFORMED")...
# Permissive (default)
spark.read.option("mode", "PERMISSIVE")...
π 4. Schema Handling
Define schema
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), True)
])
df = spark.read.schema(schema).csv("file.csv")
Infer schema manually
df = spark.read.option("inferSchema", True).option("header", True).csv("file.csv")
π 5. Inspect & Explore
df.printSchema()
df.columns
df.dtypes
df.describe().show()
df.summary().show() # min, max, mean, stddev
df.head(3)
df.take(2)
df.limit(5).show()
df.first()
df.count()
π 6. Select, Filter, Where, Aliasing, expr()
df.select("name", "age").show()
df.select(col("name").alias("cust_name")).show()
df.selectExpr("name", "age + 10 as age_10").show()
df.filter(col("age") > 25).show()
df.where("age < 30").show()
π¨ 7. Add, Rename, Drop Columns
df = df.withColumn("age_plus_5", col("age") + 5)
from pyspark.sql.functions import lit
df = df.withColumn("constant_col", lit("static_value"))
df = df.withColumnRenamed("name", "full_name")
df = df.drop("constant_col")
df = df.drop(*["col1", "col2"])
π¦ 8. Distinct, Sort, Sample, Limit
df.distinct().show()
df.orderBy("age").show()
df.orderBy(col("age").desc()).show()
df.sort("id").show()
df.sortWithinPartitions("id").show()
df.sample(False, 0.3, seed=42).show() # random sample
df.limit(3).show()
π 9. Joins β Deep Options
df1.join(df2, on="id", how="inner").show()
df1.join(df2, on="id", how="left").show()
df1.join(df2, on="id", how="right").show()
df1.join(df2, on="id", how="outer").show()
df1.join(df2, on="id", how="left_semi").show()
df1.join(df2, on="id", how="left_anti").show()
df1.crossJoin(df2).show()
# Multiple keys
df1.join(df2, on=["id", "dept"], how="inner")
β 10. Union, Intersect, Subtract, DropDuplicates
df.union(df).show()
df.unionByName(df).show()
df.intersect(df).show()
df.subtract(df2).show()
df.dropDuplicates().show()
df.dropDuplicates(["id", "name"]).show()
π 11. GroupBy, Aggregations, Pivot, Rollup, Cube
from pyspark.sql.functions import count, avg, sum, max, min
df.groupBy("age").agg(count("*"), avg("id")).show()
df.groupBy("gender").count().show()
# Multiple aggregations
df.groupBy("dept").agg(
sum("salary").alias("total_salary"),
avg("salary").alias("avg_salary")
).show()
# Pivot
df.groupBy("id").pivot("month").sum("amount").show()
# Rollup
df.rollup("dept", "gender").agg(sum("salary")).show()
# Cube
df.cube("dept", "gender").agg(sum("salary")).show()
π 12. Window Functions β All Variants
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lead, lag, ntile, sum, avg
windowSpec = Window.partitionBy("dept").orderBy(col("salary").desc())
df.withColumn("row_number", row_number().over(windowSpec)).show()
df.withColumn("rank", rank().over(windowSpec)).show()
df.withColumn("dense_rank", dense_rank().over(windowSpec)).show()
df.withColumn("ntile_3", ntile(3).over(windowSpec)).show()
df.withColumn("lead_val", lead("salary", 1).over(windowSpec)).show()
df.withColumn("lag_val", lag("salary", 1).over(windowSpec)).show()
# Running totals
df.withColumn("running_sum", sum("salary").over(windowSpec)).show()
df.withColumn("avg_salary", avg("salary").over(windowSpec)).show()
β That covers every detail, config, and trick for the first 12 operations from your earlier list β with full syntax, best practices, and edge cases.
Let’s continue full force, Rajeev. Here’s the complete deep dive from Section 13 onward, no skipped lines, no superficial mentions.
𧬠SECTION 13: DATA TYPES HANDLING β CASTING, CONVERSIONS, TRICKS
πΉ Inspect Data Types
df.printSchema()
df.dtypes
df.schema.fields
πΉ Cast Columns to Specific Type
df = df.withColumn("salary", col("salary").cast("double"))
df = df.withColumn("is_active", col("is_active").cast("boolean"))
πΉ Complex Types: Arrays, Structs, Maps
from pyspark.sql.functions import array, struct, create_map, lit
df = df.withColumn("arr_col", array("col1", "col2"))
df = df.withColumn("struct_col", struct("id", "name"))
df = df.withColumn("map_col", create_map(lit("key1"), col("col1"), lit("key2"), col("col2")))
πΉ Convert Date Strings to DateType
from pyspark.sql.functions import to_date, to_timestamp
df = df.withColumn("join_date", to_date("join_date_str", "yyyy-MM-dd"))
df = df.withColumn("login_ts", to_timestamp("login_str", "yyyy-MM-dd HH:mm:ss"))
πΉ Convert Boolean/Yes-No Strings
from pyspark.sql.functions import when
df = df.withColumn("is_verified",
when(col("status") == "yes", True)
.when(col("status") == "no", False)
.otherwise(None))
π§Ή SECTION 14: NULL HANDLING β FILL, DROP, REPLACE, COALESCE, NVL
πΉ Detect Nulls
df.filter(col("age").isNull()).show()
df.filter(col("name").isNotNull()).show()
πΉ Fill NULLs
df.fillna("NA").show()
df.fillna(0).show()
df.fillna({"name": "Unknown", "age": 0}).show()
πΉ Drop NULLs
df.dropna().show()
df.dropna(how="any").show()
df.dropna(how="all").show()
df.dropna(thresh=2).show() # At least 2 non-null values
df.dropna(subset=["name", "age"]).show()
πΉ Replace Specific Values (Even if NOT null)
df.replace(["NA", "N/A", "null"], None).show()
df.replace({"status": {"Y": "Yes", "N": "No"}}).show()
πΉ Coalesce (return first non-null)
from pyspark.sql.functions import coalesce
df = df.withColumn("fallback_name", coalesce(col("nickname"), col("name"), lit("Guest")))
πΉ Null-safe equality (<=>)
df.filter(col("a") <=> col("b")).show() # True if both NULL or both same
π§ SECTION 15: UDFs & PANDAS_UDFs
πΉ Standard Python UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def greet(name): return "Hello, " + name
greet_udf = udf(greet, StringType())
df.withColumn("greeting", greet_udf(col("name"))).show()
πΉ pandas_udf β Better performance (vectorized)
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf("string")
def upper_case(s: pd.Series) -> pd.Series:
return s.str.upper()
df.withColumn("upper_name", upper_case("name")).show()
β Notes:
- Avoid UDFs unless absolutely necessary β built-ins are faster & Catalyst-optimized
- Always use
pandas_udfwhen dealing with batch processing
π§ͺ SECTION 16: ADVANCED COLUMN EXPRESSIONS
πΉ expr() β SQL inside DataFrame API
from pyspark.sql.functions import expr
df.selectExpr("id", "age + 10 as future_age", "upper(name) as NAME").show()
πΉ when().otherwise() β IF-ELSE
from pyspark.sql.functions import when
df = df.withColumn("category", when(col("age") < 18, "Child")
.when(col("age") < 60, "Adult")
.otherwise("Senior"))
πΉ isin() β Like SQL IN
df.filter(col("country").isin("India", "USA")).show()
πΉ like(), rlike() β Pattern Matching
df.filter(col("email").like("%@gmail.com")).show()
df.filter(col("email").rlike(".*@.*\\.com")).show()
πΉ alias() + withColumnRenamed()
df.select(col("name").alias("customer_name"))
df.withColumnRenamed("name", "cust_name")
πΉ lit() β Literal values
from pyspark.sql.functions import lit
df.withColumn("country", lit("India")).show()
π οΈ SECTION 17: SAVE/WRITE β FORMAT, MODES, PARTITIONS
πΉ Write Modes
# Modes: append, overwrite, ignore, errorifexists
df.write.mode("overwrite").csv("/tmp/output")
πΉ Save Formats
df.write.format("csv").save("/path")
df.write.format("parquet").save("/path")
df.write.format("json").save("/path")
df.write.format("delta").save("/path")
πΉ Save with Partitioning
df.write.partitionBy("country").parquet("/partitioned_output")
πΉ Write Single File Output
df.coalesce(1).write.option("header", True).csv("/single_file_output")
πΉ Save with Compression
df.write.option("compression", "gzip").json("/compressed_output")
π SECTION 18: EXPLAIN PLAN, AQE, UI MONITORING
πΉ Basic & Extended Plan
df.explain()
df.explain(True) # Extended plan
πΉ Enable AQE (Adaptive Query Execution)
spark.conf.set("spark.sql.adaptive.enabled", True)
πΉ Spark UI β Where to check:
- Stages tab: parallelism, skew
- SQL tab: physical & logical plans
- Storage tab: persisted/cached DataFrames
- Executors tab: memory stats, task metrics
Brilliant, Rajeev. Since weβre going all in, letβs continue with ALL the next major blocks β deeply, completely, no shortcuts:
π SECTION 19: PERFORMANCE OPTIMIZATION β CACHE, REPARTITION, BROADCAST, SHUFFLE
πΉ Caching and Persisting
# Cache in memory
df.cache()
df.count() # Trigger action
# Persist to memory & disk
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK)
# Unpersist
df.unpersist()
β Use caching only when you reuse the same DataFrame multiple times in your pipeline.
πΉ Repartition vs Coalesce
# Increase partitions (full shuffle)
df = df.repartition(10)
# Decrease partitions (no shuffle)
df = df.coalesce(2)
π§ Tip: Always repartition by key if you’re going to join or group.
df = df.repartition("join_key")
πΉ Broadcast Joins
from pyspark.sql.functions import broadcast
df_large.join(broadcast(df_small), "id").show()
β Best for small lookup tables (few MBs) joined with massive tables.
πΉ Shuffle Partition Tuning
spark.conf.set("spark.sql.shuffle.partitions", 50) # Default = 200
π Reduce if you have fewer cores or smaller data.
πΉ AQE (Adaptive Query Execution)
spark.conf.set("spark.sql.adaptive.enabled", True)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", True)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)
β Automatically optimizes skew joins, coalesces small partitions, changes plans at runtime.
πΉ Join Strategies Overview
| Join Type | Use When |
|---|---|
| Broadcast | Small + large table |
| Sort-Merge Join | Large tables + sorted data |
| Shuffle Hash | When keys are hash-distributable |
| Bucketed Join | Pre-bucketed tables by join keys |
πΉ Monitor Joins and Partitions
df.join(df2, "id").explain(True)
π§ͺ SECTION 20: DELTA LAKE DEEP DIVE β SCD, TRANSACTIONS, VERSIONING
πΉ Upsert Logic (SCD Type 1)
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "/delta/customer")
updates_df = spark.read.csv("/new/customer.csv", header=True)
delta_table.alias("target").merge(
updates_df.alias("source"),
"target.id = source.id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
πΉ Time Travel / Versioning
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load("/delta/customer")
df_ts = spark.read.format("delta").option("timestampAsOf", "2025-06-01T15:00:00").load("/delta/customer")
πΉ SCD Type 2 Logic
from pyspark.sql.functions import current_timestamp, lit
df_new = df.withColumn("start_time", current_timestamp()) \
.withColumn("end_time", lit(None).cast("timestamp")) \
.withColumn("is_active", lit(True))
πΉ Z-Ordering (Columnar Indexing)
spark.sql("OPTIMIZE delta.`/delta/customer` ZORDER BY (region)")
πΉ Schema Evolution
df_new.write.option("mergeSchema", "true").format("delta").mode("append").save("/delta/customer")
πΉ VACUUM β Clean Up Old Versions
spark.sql("VACUUM delta.`/delta/customer` RETAIN 168 HOURS")
π SECTION 21: PORTFOLIO PROJECT FLOW β REAL-TIME DATA PIPELINE
β Use Case: E-commerce Sales Analytics Dashboard
π§© Input Data:
/landing/orders.csv/landing/customers.csv
π ETL Flow:
- Ingest raw files
- Cleanse and cast columns
- Join orders and customers
- Aggregate total spending, order count
- Enrich with segments:
HighValue,FrequentBuyer - Write to Delta Lake β versioned dashboard
π οΈ Code Skeleton:
# 1. Read raw data
orders_df = spark.read.option("header", True).csv("/landing/orders.csv")
cust_df = spark.read.option("header", True).csv("/landing/customers.csv")
# 2. Data type conversion
orders_df = orders_df.withColumn("amount", col("amount").cast("double"))
orders_df = orders_df.withColumn("order_date", to_date("order_date"))
# 3. Join
combined_df = orders_df.join(cust_df, on="cust_id", how="left")
# 4. Aggregations
agg_df = combined_df.groupBy("cust_id").agg(
count("*").alias("total_orders"),
sum("amount").alias("total_spent")
)
# 5. Customer segments
agg_df = agg_df.withColumn("segment", when(col("total_spent") > 10000, "HighValue")
.otherwise("Regular"))
# 6. Save as versioned dashboard
agg_df.write.format("delta").mode("overwrite").save("/dashboard/agg_customers")
π§ SECTION 22: MOCK INTERVIEW Q&A β SPARK DATAFRAME
| π’ | Question | Expected Insight |
|---|---|---|
| 1 | Difference: cache vs persist | persist has multiple levels, cache = memory only |
| 2 | AQE β what problems does it solve? | Skew joins, small shuffle partitions, adaptive plans |
| 3 | Partition vs Bucketing | Partition = file-level split, Bucketing = hash-based split |
| 4 | Best practice before a join | Repartition on join key, broadcast small DF |
| 5 | How to detect skew in a column | GroupBy + count(). Look for dominant values |
| 6 | Is Spark DataFrame lazy or eager? | Lazy: actions trigger computation |
| 7 | Null handling tricks? | fillna, dropna, coalesce, <=> for safe comparisons |
| 8 | How to overwrite partition data in Delta? | Use .overwrite() + .option("replaceWhere") |
| 9 | How does AQE change join strategy? | Can swap SMJ β Broadcast Join mid-execution |
| 10 | Is withColumn a transformation? | Yes, it’s lazy and creates new DF |
| 11 | How to optimize when writing large output? | Use partitioning, compression, avoid small files |
| 12 | What is Z-ordering? | Physically organizes files for faster reads on filters |
| 13 | Why avoid UDFs? | Catalyst canβt optimize them, often slow |
| 14 | Catalyst Optimizer? | Logical β Physical plan generator |
| 15 | Tungsten Engine? | Memory-efficient execution backend in Spark |