๐ DATE & TIMESTAMP FUNCTIONS (COMPLETE LIST with CODE)
๐น Basic Date Conversion
from pyspark.sql.functions import to_date, to_timestamp, col
df = df.withColumn("dob_date", to_date(col("dob"), "yyyy-MM-dd"))
df = df.withColumn("event_ts", to_timestamp(col("ts_string"), "yyyy-MM-dd HH:mm:ss"))
๐น Current Date/Time
from pyspark.sql.functions import current_date, current_timestamp
df = df.withColumn("load_date", current_date())
df = df.withColumn("load_ts", current_timestamp())
๐น Extract Date Parts
from pyspark.sql.functions import year, month, dayofmonth, dayofweek, dayofyear, weekofyear, quarter
df = df.withColumn("year", year(col("dob_date")))
df = df.withColumn("month", month(col("dob_date")))
df = df.withColumn("day", dayofmonth(col("dob_date")))
df = df.withColumn("week", weekofyear(col("dob_date")))
df = df.withColumn("quarter", quarter(col("dob_date")))
๐น Date Difference & Add/Subtract
from pyspark.sql.functions import datediff, date_add, date_sub
df = df.withColumn("days_since_dob", datediff(current_date(), col("dob_date")))
df = df.withColumn("next_week", date_add(col("dob_date"), 7))
df = df.withColumn("prev_week", date_sub(col("dob_date"), 7))
๐น Format Dates
from pyspark.sql.functions import date_format
df = df.withColumn("pretty_dob", date_format(col("dob_date"), "dd-MMM-yyyy"))
๐น Truncate Dates
from pyspark.sql.functions import trunc
df = df.withColumn("month_trunc", trunc(col("dob_date"), "MM")) # returns first day of month
๐น Between Dates
df.filter((col("dob_date") >= "1990-01-01") & (col("dob_date") <= "2000-12-31")).show()
๐งน NULL HANDLING & COALESCE โ COMPLETE
๐น Detecting Nulls
df.filter(col("name").isNull()).show()
df.filter(col("name").isNotNull()).show()
๐น Fill NULLs (fillna
)
# Fill all string columns with "unknown"
df.fillna("unknown").show()
# Fill all numeric columns with 0
df.fillna(0).show()
# Fill selected columns
df.fillna({"age": 0, "name": "NA"}).show()
๐น Drop NULL Rows (dropna
)
df.dropna().show() # Drops rows with any NULL
df.dropna("all").show() # Drops rows with all NULLs
df.dropna(thresh=2).show() # Keep rows with at least 2 non-null values
df.dropna(subset=["age", "name"]).show()
๐น Replace Specific Values (Even Non-null)
df.replace(["NA", "N/A", "null"], None).show()
๐น Coalesce (Non-null fallback)
from pyspark.sql.functions import coalesce
df = df.withColumn("final_name", coalesce(col("nickname"), col("name")))
๐น NVL / NVL2 Equivalent
Spark doesn’t have nvl
, but coalesce
does the same.
-- SQL style
SELECT COALESCE(name, 'Unknown') FROM table
-- PySpark
df.select(coalesce(col("name"), lit("Unknown")))
๐น Null-Safe Equality (<=>
)
df.filter(col("name") <=> col("alias")) # True if both are NULL or both are equal
Yes, Rajeev โ letโs proceed. Iโll now give you the next complete block of Spark DataFrame functions โ covering the most used and interview-critical APIs.
๐ STRING FUNCTIONS โ COMPLETE WITH PYSPARK CODE
๐น Case Transformations
from pyspark.sql.functions import upper, lower, initcap
df.withColumn("upper_name", upper(col("name"))).show()
df.withColumn("lower_name", lower(col("name"))).show()
df.withColumn("capitalized", initcap(col("name"))).show()
๐น Trim, Ltrim, Rtrim
from pyspark.sql.functions import trim, ltrim, rtrim
df.withColumn("clean_name", trim(col("name"))).show()
๐น Substring
from pyspark.sql.functions import substring
df.withColumn("first_3", substring(col("name"), 1, 3)).show()
๐น Length, Contains, Startswith, Endswith
from pyspark.sql.functions import length
df.filter(col("name").contains("ar")).show()
df.filter(col("name").startswith("A")).show()
df.filter(col("name").endswith("e")).show()
df.withColumn("name_len", length(col("name"))).show()
๐น Replace & Translate
from pyspark.sql.functions import translate, regexp_replace
df.withColumn("no_spaces", regexp_replace(col("name"), " ", "")).show()
df.withColumn("trans_name", translate(col("name"), "aeiou", "12345")).show()
๐น Split & Explode
from pyspark.sql.functions import split, explode
df = df.withColumn("name_parts", split(col("name"), " "))
df.select(explode(col("name_parts")).alias("each_word")).show()
๐งฎ AGGREGATION FUNCTIONS โ COMPLETE
from pyspark.sql.functions import count, countDistinct, avg, sum, max, min, stddev
df.groupBy("gender").agg(
count("*").alias("total"),
countDistinct("name").alias("unique_names"),
avg("salary").alias("avg_salary"),
sum("salary").alias("total_salary"),
stddev("salary").alias("std_dev")
).show()
๐งฑ ARRAY & STRUCT FUNCTIONS โ COMPLETE
๐น Create & Access Arrays
from pyspark.sql.functions import array
df = df.withColumn("my_array", array("id", "age"))
๐น Explode Arrays
from pyspark.sql.functions import explode
df.select(explode(col("my_array")).alias("value")).show()
๐น Create Struct & Access Field
from pyspark.sql.functions import struct
df = df.withColumn("profile", struct("name", "age"))
df.select(col("profile.name")).show()
๐บ๏ธ MAP FUNCTIONS โ COMPLETE
from pyspark.sql.functions import create_map, lit
df = df.withColumn("map_col", create_map(lit("name"), col("name"), lit("age"), col("age")))
๐ช WINDOW FUNCTIONS โ COMPLETE
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead
win_spec = Window.partitionBy("department").orderBy(col("salary").desc())
df.withColumn("row_number", row_number().over(win_spec)).show()
df.withColumn("rank", rank().over(win_spec)).show()
df.withColumn("dense_rank", dense_rank().over(win_spec)).show()
df.withColumn("lag_salary", lag("salary", 1).over(win_spec)).show()
df.withColumn("lead_salary", lead("salary", 1).over(win_spec)).show()
๐ง UDF vs Built-in โ FULL EXAMPLE
๐น Normal Python UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def make_label(name):
return "Mr. " + name
label_udf = udf(make_label, StringType())
df.withColumn("labelled", label_udf(col("name"))).show()
๐น Pandas UDF (More Efficient for Batch)
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(StringType())
def upper_pandas(name: pd.Series) -> pd.Series:
return name.str.upper()
df.withColumn("name_upper", upper_pandas(col("name"))).show()
๐ฆ BROADCAST JOIN โ FULL
from pyspark.sql.functions import broadcast
# Broadcast smaller lookup table
df_big.join(broadcast(df_small), "id").show()
๐ REPARTITION VS COALESCE โ CODE DEMO
# Repartition โ expensive (shuffle), increases partitions
df.repartition(10).write.mode("overwrite").csv("new_path/")
# Coalesce โ efficient (no shuffle), reduces partitions
df.coalesce(1).write.mode("overwrite").csv("single_file/")
\
๐ ALL JOINS IN PYSPARK โ COMPLETE WITH EXAMPLES
We’ll use this setup:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame([(1, "Alice"), (2, "Bob"), (3, "Charlie")], ["id", "name"])
df2 = spark.createDataFrame([(1, "HR"), (2, "IT"), (4, "Finance")], ["id", "dept"])
1๏ธโฃ Inner Join (common id
s only)
df1.join(df2, on="id", how="inner").show()
2๏ธโฃ Left Outer Join (all from left, null from right if no match)
df1.join(df2, on="id", how="left").show()
3๏ธโฃ Right Outer Join (all from right, null from left if no match)
df1.join(df2, on="id", how="right").show()
4๏ธโฃ Full Outer Join
df1.join(df2, on="id", how="outer").show()
5๏ธโฃ Left Semi Join
(like filter: returns rows from df1
that have match in df2
)
df1.join(df2, on="id", how="left_semi").show()
6๏ธโฃ Left Anti Join
(returns rows in df1
that do not have match in df2
)
df1.join(df2, on="id", how="left_anti").show()
7๏ธโฃ Cross Join (Cartesian Product)
df1.crossJoin(df2).show()
8๏ธโฃ Self Join Example
df_self = df1.alias("a").join(df1.alias("b"), col("a.id") < col("b.id"))
df_self.select("a.id", "a.name", "b.name").show()
โ๏ธ CASE-WHEN / IF-ELSE LOGIC โ FULL EXAMPLES
๐น Using when()
, otherwise()
from pyspark.sql.functions import when
df = df.withColumn("age_group", when(col("age") < 18, "Minor")
.when(col("age") < 60, "Adult")
.otherwise("Senior"))
df.select("name", "age", "age_group").show()
๐น Using expr()
for SQL-like expressions
from pyspark.sql.functions import expr
df.selectExpr("name", "CASE WHEN age < 18 THEN 'Minor' WHEN age < 60 THEN 'Adult' ELSE 'Senior' END AS age_group").show()
๐งฑ DSL-style Chaining โ Realistic Flow
(df.filter(col("age") > 25)
.withColumn("age_plus_10", col("age") + 10)
.orderBy(col("age").desc())
.dropDuplicates(["name"])
.show())
๐ SPARK SQL MODE vs DATAFRAME API
๐น Register as Temp View
df.createOrReplaceTempView("people")
๐น SQL Mode
spark.sql("SELECT name, age FROM people WHERE age > 25").show()
๐น Same in DataFrame API
df.select("name", "age").where(col("age") > 25).show()
โ When to Use What?
Feature | Use SQL | Use DataFrame API |
---|---|---|
Familiar SQL users | โ Easy to adopt | โ Might feel verbose |
Complex joins | โ More readable | โ With practice |
UDF integration | โ Hard to use in SQL | โ Clean & flexible |
Code reuse | โ SQL strings can’t chain | โ Python chaining, functions |
Spark UI optimizations | โ Both give same plans | โ Both go through Catalyst |
๐งจ Bonus: selectExpr()
โ SQL inside DataFrame API
df.selectExpr("name", "age + 10 as age_plus_10", "upper(name) as name_upper").show()
๐ฅ Letโs go all-in, Rajeev. You asked for Delta Lake, a Mini Project Flow, a Skew Join + Optimization Deep Dive, and a complete Interview Q&A Set. Youโll get it all โ no limits, no skipping.
๐ง SECTION 1: DELTA LAKE โ COMPLETE GUIDE
Delta Lake adds ACID, versioning, schema evolution, merge (UPSERT) to Apache Spark.
๐น 1. Create Delta Table from DataFrame
df.write.format("delta").mode("overwrite").save("/tmp/delta/customers")
๐น 2. Read Delta Table
df = spark.read.format("delta").load("/tmp/delta/customers")
๐น 3. Time Travel (Versioning)
# Read older version
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load("/tmp/delta/customers")
# Read by timestamp
df_ts = spark.read.format("delta").option("timestampAsOf", "2024-01-01T12:00:00").load("/tmp/delta/customers")
๐น 4. Merge (UPSERT)
from delta.tables import DeltaTable
from pyspark.sql.functions import expr
delta_table = DeltaTable.forPath(spark, "/tmp/delta/customers")
updates_df = spark.createDataFrame([(1, "Alice Updated"), (4, "New User")], ["id", "name"])
delta_table.alias("target").merge(
updates_df.alias("updates"),
"target.id = updates.id"
).whenMatchedUpdate(set={"name": "updates.name"}
).whenNotMatchedInsert(values={"id": "updates.id", "name": "updates.name"}
).execute()
๐น 5. Schema Evolution
# Add new column in input data
new_data = spark.createDataFrame([(5, "Zara", "Female")], ["id", "name", "gender"])
new_data.write.option("mergeSchema", "true").format("delta").mode("append").save("/tmp/delta/customers")
๐น 6. Z-Ordering (Optimize read performance on columns)
spark.sql("OPTIMIZE delta.`/tmp/delta/customers` ZORDER BY (id)")
๐น 7. Vacuum (Cleanup old versions)
spark.sql("VACUUM delta.`/tmp/delta/customers` RETAIN 168 HOURS") # Keep last 7 days
๐ SECTION 2: MINI PROJECT FLOW โ DATA PIPELINE END-TO-END
Project: Build a customer purchase dashboard table from raw logs
๐น Step 1: Ingest Raw CSV
df_customers = spark.read.option("header", True).csv("/data/customers.csv")
df_orders = spark.read.option("header", True).csv("/data/orders.csv")
๐น Step 2: Clean & Typecast
from pyspark.sql.functions import to_date
df_customers = df_customers.withColumn("dob", to_date("dob", "yyyy-MM-dd"))
df_orders = df_orders.withColumn("order_date", to_date("order_date", "yyyy-MM-dd"))
df_orders = df_orders.withColumn("amount", df_orders["amount"].cast("double"))
๐น Step 3: Join
df_joined = df_customers.join(df_orders, on="customer_id", how="inner")
๐น Step 4: Aggregations
from pyspark.sql.functions import sum, count
agg_df = df_joined.groupBy("customer_id", "name").agg(
count("*").alias("total_orders"),
sum("amount").alias("total_spent")
)
๐น Step 5: Add Business Flags
from pyspark.sql.functions import when
agg_df = agg_df.withColumn("customer_segment", when(col("total_spent") > 10000, "Premium").otherwise("Regular"))
๐น Step 6: Save to Delta Table
agg_df.write.format("delta").mode("overwrite").save("/delta/dashboard/customer_metrics")
โ ๏ธ SECTION 3: SKEW JOIN + OPTIMIZATION DEEP DIVE
๐น 1. Detect Data Skew
df.groupBy("join_key").count().orderBy("count", ascending=False).show()
๐น 2. Solution: Broadcast Join
from pyspark.sql.functions import broadcast
df_large.join(broadcast(df_small), on="id").show()
๐น 3. Solution: Salting
Step 1: Add random salt (small DF)
from pyspark.sql.functions import expr
df_small_salted = df_small.withColumn("salt", expr("CAST(rand() * 5 AS INT)"))
df_small_salted = df_small_salted.withColumn("join_key_salted", expr("CONCAT(join_key, '_', salt)"))
Step 2: Duplicate large DF
from pyspark.sql.functions import explode, array
df_large_salted = df_large.withColumn("salt", explode(array([lit(i) for i in range(5)])))
df_large_salted = df_large_salted.withColumn("join_key_salted", expr("CONCAT(join_key, '_', salt)"))
Step 3: Join on salted key
df_large_salted.join(df_small_salted, on="join_key_salted").show()
๐น 4. Explain Plan + AQE
spark.conf.set("spark.sql.adaptive.enabled", True)
df.join(df2, "id").explain(True)
๐ง SECTION 4: INTERVIEW Q&A โ PYSPARK DATAFRAME
# | Question | Short Answer |
---|---|---|
1 | Difference between repartition() and coalesce() | repartition() reshuffles; coalesce() merges without shuffle |
2 | What is Catalyst Optimizer? | Internal optimizer that rewrites query plans |
3 | Difference: narrow vs wide transformation | Narrow: map, filter; Wide: join, groupBy (shuffle involved) |
4 | Lazy evaluation? | Transformations are not executed until an action is called |
5 | Explain withColumn() vs select() | withColumn() modifies/adds; select() creates new DataFrame |
6 | What triggers execution in Spark? | Actions: show() , collect() , write() |
7 | How to handle skewed joins? | Broadcast join, salting, bucketing |
8 | Broadcast join use case? | Small dimension table joined with large fact table |
9 | Fill NULL in selected columns? | fillna({"col1": val1, "col2": val2}) |
10 | When to use dropDuplicates() ? | Remove duplicate rows based on all/specific columns |
11 | UDF vs Built-in functions? | UDFs are slower, not optimized by Catalyst |
12 | Can you read JSON with nested structure? | Yes. Use df.select("a.b", "a.c") or explode() for arrays |
13 | Convert String to Date in Spark? | to_date(col("col"), "yyyy-MM-dd") |
14 | Create single output file from Spark? | repartition(1).write... |
15 | Use-case of selectExpr() ? | SQL-like expressions inside DataFrame API |
16 | What is Z-Ordering in Delta? | Column-aware sorting to speed up queries |
17 | What happens on schema mismatch in Delta? | Fails unless mergeSchema=true |
18 | How to monitor Spark jobs? | Spark UI: stages, DAG, storage, SQL tabs |
19 | Partition vs Bucketing? | Partitioning splits files; bucketing hashes keys into buckets |
20 | Use-case of window() functions? | Rank rows, calculate running totals, compare across rows |