SPARK Dataframe- Complete Tutorial


๐Ÿ“… DATE & TIMESTAMP FUNCTIONS (COMPLETE LIST with CODE)

๐Ÿ”น Basic Date Conversion

from pyspark.sql.functions import to_date, to_timestamp, col

df = df.withColumn("dob_date", to_date(col("dob"), "yyyy-MM-dd"))
df = df.withColumn("event_ts", to_timestamp(col("ts_string"), "yyyy-MM-dd HH:mm:ss"))

๐Ÿ”น Current Date/Time

from pyspark.sql.functions import current_date, current_timestamp

df = df.withColumn("load_date", current_date())
df = df.withColumn("load_ts", current_timestamp())

๐Ÿ”น Extract Date Parts

from pyspark.sql.functions import year, month, dayofmonth, dayofweek, dayofyear, weekofyear, quarter

df = df.withColumn("year", year(col("dob_date")))
df = df.withColumn("month", month(col("dob_date")))
df = df.withColumn("day", dayofmonth(col("dob_date")))
df = df.withColumn("week", weekofyear(col("dob_date")))
df = df.withColumn("quarter", quarter(col("dob_date")))

๐Ÿ”น Date Difference & Add/Subtract

from pyspark.sql.functions import datediff, date_add, date_sub

df = df.withColumn("days_since_dob", datediff(current_date(), col("dob_date")))
df = df.withColumn("next_week", date_add(col("dob_date"), 7))
df = df.withColumn("prev_week", date_sub(col("dob_date"), 7))

๐Ÿ”น Format Dates

from pyspark.sql.functions import date_format

df = df.withColumn("pretty_dob", date_format(col("dob_date"), "dd-MMM-yyyy"))

๐Ÿ”น Truncate Dates

from pyspark.sql.functions import trunc

df = df.withColumn("month_trunc", trunc(col("dob_date"), "MM"))  # returns first day of month

๐Ÿ”น Between Dates

df.filter((col("dob_date") >= "1990-01-01") & (col("dob_date") <= "2000-12-31")).show()

๐Ÿงน NULL HANDLING & COALESCE โ€” COMPLETE


๐Ÿ”น Detecting Nulls

df.filter(col("name").isNull()).show()
df.filter(col("name").isNotNull()).show()

๐Ÿ”น Fill NULLs (fillna)

# Fill all string columns with "unknown"
df.fillna("unknown").show()

# Fill all numeric columns with 0
df.fillna(0).show()

# Fill selected columns
df.fillna({"age": 0, "name": "NA"}).show()

๐Ÿ”น Drop NULL Rows (dropna)

df.dropna().show()                    # Drops rows with any NULL
df.dropna("all").show()              # Drops rows with all NULLs
df.dropna(thresh=2).show()           # Keep rows with at least 2 non-null values
df.dropna(subset=["age", "name"]).show()

๐Ÿ”น Replace Specific Values (Even Non-null)

df.replace(["NA", "N/A", "null"], None).show()

๐Ÿ”น Coalesce (Non-null fallback)

from pyspark.sql.functions import coalesce

df = df.withColumn("final_name", coalesce(col("nickname"), col("name")))

๐Ÿ”น NVL / NVL2 Equivalent

Spark doesn’t have nvl, but coalesce does the same.

-- SQL style
SELECT COALESCE(name, 'Unknown') FROM table

-- PySpark
df.select(coalesce(col("name"), lit("Unknown")))

๐Ÿ”น Null-Safe Equality (<=>)

df.filter(col("name") <=> col("alias"))  # True if both are NULL or both are equal

Yes, Rajeev โ€” letโ€™s proceed. Iโ€™ll now give you the next complete block of Spark DataFrame functions โ€” covering the most used and interview-critical APIs.


๐Ÿ”  STRING FUNCTIONS โ€” COMPLETE WITH PYSPARK CODE

๐Ÿ”น Case Transformations

from pyspark.sql.functions import upper, lower, initcap

df.withColumn("upper_name", upper(col("name"))).show()
df.withColumn("lower_name", lower(col("name"))).show()
df.withColumn("capitalized", initcap(col("name"))).show()

๐Ÿ”น Trim, Ltrim, Rtrim

from pyspark.sql.functions import trim, ltrim, rtrim

df.withColumn("clean_name", trim(col("name"))).show()

๐Ÿ”น Substring

from pyspark.sql.functions import substring

df.withColumn("first_3", substring(col("name"), 1, 3)).show()

๐Ÿ”น Length, Contains, Startswith, Endswith

from pyspark.sql.functions import length

df.filter(col("name").contains("ar")).show()
df.filter(col("name").startswith("A")).show()
df.filter(col("name").endswith("e")).show()
df.withColumn("name_len", length(col("name"))).show()

๐Ÿ”น Replace & Translate

from pyspark.sql.functions import translate, regexp_replace

df.withColumn("no_spaces", regexp_replace(col("name"), " ", "")).show()
df.withColumn("trans_name", translate(col("name"), "aeiou", "12345")).show()

๐Ÿ”น Split & Explode

from pyspark.sql.functions import split, explode

df = df.withColumn("name_parts", split(col("name"), " "))
df.select(explode(col("name_parts")).alias("each_word")).show()

๐Ÿงฎ AGGREGATION FUNCTIONS โ€” COMPLETE

from pyspark.sql.functions import count, countDistinct, avg, sum, max, min, stddev

df.groupBy("gender").agg(
    count("*").alias("total"),
    countDistinct("name").alias("unique_names"),
    avg("salary").alias("avg_salary"),
    sum("salary").alias("total_salary"),
    stddev("salary").alias("std_dev")
).show()

๐Ÿงฑ ARRAY & STRUCT FUNCTIONS โ€” COMPLETE

๐Ÿ”น Create & Access Arrays

from pyspark.sql.functions import array

df = df.withColumn("my_array", array("id", "age"))

๐Ÿ”น Explode Arrays

from pyspark.sql.functions import explode

df.select(explode(col("my_array")).alias("value")).show()

๐Ÿ”น Create Struct & Access Field

from pyspark.sql.functions import struct

df = df.withColumn("profile", struct("name", "age"))
df.select(col("profile.name")).show()

๐Ÿ—บ๏ธ MAP FUNCTIONS โ€” COMPLETE

from pyspark.sql.functions import create_map, lit

df = df.withColumn("map_col", create_map(lit("name"), col("name"), lit("age"), col("age")))

๐ŸชŸ WINDOW FUNCTIONS โ€” COMPLETE

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead

win_spec = Window.partitionBy("department").orderBy(col("salary").desc())

df.withColumn("row_number", row_number().over(win_spec)).show()
df.withColumn("rank", rank().over(win_spec)).show()
df.withColumn("dense_rank", dense_rank().over(win_spec)).show()
df.withColumn("lag_salary", lag("salary", 1).over(win_spec)).show()
df.withColumn("lead_salary", lead("salary", 1).over(win_spec)).show()

๐Ÿง  UDF vs Built-in โ€” FULL EXAMPLE

๐Ÿ”น Normal Python UDF

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def make_label(name):
    return "Mr. " + name

label_udf = udf(make_label, StringType())
df.withColumn("labelled", label_udf(col("name"))).show()

๐Ÿ”น Pandas UDF (More Efficient for Batch)

from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf(StringType())
def upper_pandas(name: pd.Series) -> pd.Series:
    return name.str.upper()

df.withColumn("name_upper", upper_pandas(col("name"))).show()

๐Ÿ“ฆ BROADCAST JOIN โ€” FULL

from pyspark.sql.functions import broadcast

# Broadcast smaller lookup table
df_big.join(broadcast(df_small), "id").show()

๐Ÿ”„ REPARTITION VS COALESCE โ€” CODE DEMO

# Repartition โ€” expensive (shuffle), increases partitions
df.repartition(10).write.mode("overwrite").csv("new_path/")

# Coalesce โ€” efficient (no shuffle), reduces partitions
df.coalesce(1).write.mode("overwrite").csv("single_file/")

\


๐Ÿ”— ALL JOINS IN PYSPARK โ€” COMPLETE WITH EXAMPLES

We’ll use this setup:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.getOrCreate()

df1 = spark.createDataFrame([(1, "Alice"), (2, "Bob"), (3, "Charlie")], ["id", "name"])
df2 = spark.createDataFrame([(1, "HR"), (2, "IT"), (4, "Finance")], ["id", "dept"])

1๏ธโƒฃ Inner Join (common ids only)

df1.join(df2, on="id", how="inner").show()

2๏ธโƒฃ Left Outer Join (all from left, null from right if no match)

df1.join(df2, on="id", how="left").show()

3๏ธโƒฃ Right Outer Join (all from right, null from left if no match)

df1.join(df2, on="id", how="right").show()

4๏ธโƒฃ Full Outer Join

df1.join(df2, on="id", how="outer").show()

5๏ธโƒฃ Left Semi Join

(like filter: returns rows from df1 that have match in df2)

df1.join(df2, on="id", how="left_semi").show()

6๏ธโƒฃ Left Anti Join

(returns rows in df1 that do not have match in df2)

df1.join(df2, on="id", how="left_anti").show()

7๏ธโƒฃ Cross Join (Cartesian Product)

df1.crossJoin(df2).show()

8๏ธโƒฃ Self Join Example

df_self = df1.alias("a").join(df1.alias("b"), col("a.id") < col("b.id"))
df_self.select("a.id", "a.name", "b.name").show()

โš–๏ธ CASE-WHEN / IF-ELSE LOGIC โ€” FULL EXAMPLES

๐Ÿ”น Using when(), otherwise()

from pyspark.sql.functions import when

df = df.withColumn("age_group", when(col("age") < 18, "Minor")
                                 .when(col("age") < 60, "Adult")
                                 .otherwise("Senior"))
df.select("name", "age", "age_group").show()

๐Ÿ”น Using expr() for SQL-like expressions

from pyspark.sql.functions import expr

df.selectExpr("name", "CASE WHEN age < 18 THEN 'Minor' WHEN age < 60 THEN 'Adult' ELSE 'Senior' END AS age_group").show()

๐Ÿงฑ DSL-style Chaining โ€” Realistic Flow

(df.filter(col("age") > 25)
   .withColumn("age_plus_10", col("age") + 10)
   .orderBy(col("age").desc())
   .dropDuplicates(["name"])
   .show())

๐Ÿ†š SPARK SQL MODE vs DATAFRAME API

๐Ÿ”น Register as Temp View

df.createOrReplaceTempView("people")

๐Ÿ”น SQL Mode

spark.sql("SELECT name, age FROM people WHERE age > 25").show()

๐Ÿ”น Same in DataFrame API

df.select("name", "age").where(col("age") > 25).show()

โœ… When to Use What?

FeatureUse SQLUse DataFrame API
Familiar SQL usersโœ… Easy to adoptโŒ Might feel verbose
Complex joinsโœ… More readableโœ… With practice
UDF integrationโŒ Hard to use in SQLโœ… Clean & flexible
Code reuseโŒ SQL strings can’t chainโœ… Python chaining, functions
Spark UI optimizationsโœ… Both give same plansโœ… Both go through Catalyst

๐Ÿงจ Bonus: selectExpr() โ€” SQL inside DataFrame API

df.selectExpr("name", "age + 10 as age_plus_10", "upper(name) as name_upper").show()

๐Ÿ”ฅ Letโ€™s go all-in, Rajeev. You asked for Delta Lake, a Mini Project Flow, a Skew Join + Optimization Deep Dive, and a complete Interview Q&A Set. Youโ€™ll get it all โ€” no limits, no skipping.


๐ŸงŠ SECTION 1: DELTA LAKE โ€” COMPLETE GUIDE

Delta Lake adds ACID, versioning, schema evolution, merge (UPSERT) to Apache Spark.


๐Ÿ”น 1. Create Delta Table from DataFrame

df.write.format("delta").mode("overwrite").save("/tmp/delta/customers")

๐Ÿ”น 2. Read Delta Table

df = spark.read.format("delta").load("/tmp/delta/customers")

๐Ÿ”น 3. Time Travel (Versioning)

# Read older version
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load("/tmp/delta/customers")

# Read by timestamp
df_ts = spark.read.format("delta").option("timestampAsOf", "2024-01-01T12:00:00").load("/tmp/delta/customers")

๐Ÿ”น 4. Merge (UPSERT)

from delta.tables import DeltaTable
from pyspark.sql.functions import expr

delta_table = DeltaTable.forPath(spark, "/tmp/delta/customers")

updates_df = spark.createDataFrame([(1, "Alice Updated"), (4, "New User")], ["id", "name"])

delta_table.alias("target").merge(
    updates_df.alias("updates"),
    "target.id = updates.id"
).whenMatchedUpdate(set={"name": "updates.name"}
).whenNotMatchedInsert(values={"id": "updates.id", "name": "updates.name"}
).execute()

๐Ÿ”น 5. Schema Evolution

# Add new column in input data
new_data = spark.createDataFrame([(5, "Zara", "Female")], ["id", "name", "gender"])

new_data.write.option("mergeSchema", "true").format("delta").mode("append").save("/tmp/delta/customers")

๐Ÿ”น 6. Z-Ordering (Optimize read performance on columns)

spark.sql("OPTIMIZE delta.`/tmp/delta/customers` ZORDER BY (id)")

๐Ÿ”น 7. Vacuum (Cleanup old versions)

spark.sql("VACUUM delta.`/tmp/delta/customers` RETAIN 168 HOURS")  # Keep last 7 days

๐Ÿš€ SECTION 2: MINI PROJECT FLOW โ€” DATA PIPELINE END-TO-END

Project: Build a customer purchase dashboard table from raw logs


๐Ÿ”น Step 1: Ingest Raw CSV

df_customers = spark.read.option("header", True).csv("/data/customers.csv")
df_orders = spark.read.option("header", True).csv("/data/orders.csv")

๐Ÿ”น Step 2: Clean & Typecast

from pyspark.sql.functions import to_date

df_customers = df_customers.withColumn("dob", to_date("dob", "yyyy-MM-dd"))
df_orders = df_orders.withColumn("order_date", to_date("order_date", "yyyy-MM-dd"))
df_orders = df_orders.withColumn("amount", df_orders["amount"].cast("double"))

๐Ÿ”น Step 3: Join

df_joined = df_customers.join(df_orders, on="customer_id", how="inner")

๐Ÿ”น Step 4: Aggregations

from pyspark.sql.functions import sum, count

agg_df = df_joined.groupBy("customer_id", "name").agg(
    count("*").alias("total_orders"),
    sum("amount").alias("total_spent")
)

๐Ÿ”น Step 5: Add Business Flags

from pyspark.sql.functions import when

agg_df = agg_df.withColumn("customer_segment", when(col("total_spent") > 10000, "Premium").otherwise("Regular"))

๐Ÿ”น Step 6: Save to Delta Table

agg_df.write.format("delta").mode("overwrite").save("/delta/dashboard/customer_metrics")

โš ๏ธ SECTION 3: SKEW JOIN + OPTIMIZATION DEEP DIVE


๐Ÿ”น 1. Detect Data Skew

df.groupBy("join_key").count().orderBy("count", ascending=False).show()

๐Ÿ”น 2. Solution: Broadcast Join

from pyspark.sql.functions import broadcast

df_large.join(broadcast(df_small), on="id").show()

๐Ÿ”น 3. Solution: Salting

Step 1: Add random salt (small DF)

from pyspark.sql.functions import expr

df_small_salted = df_small.withColumn("salt", expr("CAST(rand() * 5 AS INT)"))
df_small_salted = df_small_salted.withColumn("join_key_salted", expr("CONCAT(join_key, '_', salt)"))

Step 2: Duplicate large DF

from pyspark.sql.functions import explode, array

df_large_salted = df_large.withColumn("salt", explode(array([lit(i) for i in range(5)])))
df_large_salted = df_large_salted.withColumn("join_key_salted", expr("CONCAT(join_key, '_', salt)"))

Step 3: Join on salted key

df_large_salted.join(df_small_salted, on="join_key_salted").show()

๐Ÿ”น 4. Explain Plan + AQE

spark.conf.set("spark.sql.adaptive.enabled", True)
df.join(df2, "id").explain(True)

๐Ÿง  SECTION 4: INTERVIEW Q&A โ€” PYSPARK DATAFRAME


#QuestionShort Answer
1Difference between repartition() and coalesce()repartition() reshuffles; coalesce() merges without shuffle
2What is Catalyst Optimizer?Internal optimizer that rewrites query plans
3Difference: narrow vs wide transformationNarrow: map, filter; Wide: join, groupBy (shuffle involved)
4Lazy evaluation?Transformations are not executed until an action is called
5Explain withColumn() vs select()withColumn() modifies/adds; select() creates new DataFrame
6What triggers execution in Spark?Actions: show(), collect(), write()
7How to handle skewed joins?Broadcast join, salting, bucketing
8Broadcast join use case?Small dimension table joined with large fact table
9Fill NULL in selected columns?fillna({"col1": val1, "col2": val2})
10When to use dropDuplicates()?Remove duplicate rows based on all/specific columns
11UDF vs Built-in functions?UDFs are slower, not optimized by Catalyst
12Can you read JSON with nested structure?Yes. Use df.select("a.b", "a.c") or explode() for arrays
13Convert String to Date in Spark?to_date(col("col"), "yyyy-MM-dd")
14Create single output file from Spark?repartition(1).write...
15Use-case of selectExpr()?SQL-like expressions inside DataFrame API
16What is Z-Ordering in Delta?Column-aware sorting to speed up queries
17What happens on schema mismatch in Delta?Fails unless mergeSchema=true
18How to monitor Spark jobs?Spark UI: stages, DAG, storage, SQL tabs
19Partition vs Bucketing?Partitioning splits files; bucketing hashes keys into buckets
20Use-case of window() functions?Rank rows, calculate running totals, compare across rows

Pages: 1 2 3