# π PySpark All-In-One Coding Revision Notebook for Data Engineering Interviews
# Covers: Data Reading, RDDs, DataFrames, Null Handling, Joins, Aggregations, Windows, Caching, Partitions, File Ops
# Aim: Complete revision in 30β60 minutes with hands-on patterns + interview questions
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark import StorageLevel
spark = SparkSession.builder \
.appName("FullPySparkInterviewPrep") \
.master("local[*]") \
.config("spark.sql.shuffle.partitions", "4") \
.getOrCreate()
sc = spark.sparkContext
# ====================================
# β
DATA READING & RDD / DATAFRAME CREATION
# ====================================
# Reading data from CSV, JSON, Parquet, and Text
csv_df = spark.read.option("header", True).csv("/path/to/sample.csv")
json_df = spark.read.option("multiline", True).json("/path/to/sample.json")
parquet_df = spark.read.parquet("/path/to/sample.parquet")
text_rdd = sc.textFile("/path/to/sample.txt")
# From list to RDD and DataFrame
sample_list = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
rdd_from_list = sc.parallelize(sample_list)
df_from_list = spark.createDataFrame(sample_list, ["id", "name"])
# Show examples
df_from_list.show()
csv_df.show(1)
json_df.printSchema()
parquet_df.select("column_name").show()
# Sample RDD Transformation
text_rdd.flatMap(lambda x: x.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).collect()
# β Practice Q: Read a CSV, filter nulls, and count rows
# df = spark.read.option("header", True).csv("/path/to/file.csv")
# df.filter(col("some_column").isNotNull()).count()
# ====================================
# β
JSON FLATTENING + SCHEMA HANDLING
# ====================================
# Example Nested JSON Schema
data = [
{
"id": 1,
"name": {"first": "Alice", "last": "Smith"},
"contact": {"email": "alice@example.com", "phone": "1234567890"}
},
{
"id": 2,
"name": {"first": "Bob", "last": "Jones"},
"contact": {"email": "bob@example.com"}
}
]
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StructType([
StructField("first", StringType(), True),
StructField("last", StringType(), True)
])),
StructField("contact", StructType([
StructField("email", StringType(), True),
StructField("phone", StringType(), True)
]))
])
json_nested_df = spark.createDataFrame(data, schema)
# Flatten nested fields
json_flat = json_nested_df.select(
col("id"),
col("name.first").alias("first_name"),
col("name.last").alias("last_name"),
col("contact.email"),
col("contact.phone")
)
json_flat.show(truncate=False)
# Schema Inference
# spark.read.json("path", schema=optional_schema) OR .option("inferSchema", True)
# Schema Merging
# When reading partitioned Parquet files with schema evolution
# spark.read.option("mergeSchema", "true").parquet("/path/with/schema/variations")
# Example:
# df1 = spark.createDataFrame([(1, "A")], ["id", "val"])
# df2 = spark.createDataFrame([(2, "B", 100)], ["id", "val", "score"])
# df1.write.parquet("/tmp/schema_merge/part1")
# df2.write.parquet("/tmp/schema_merge/part2")
# merged_df = spark.read.option("mergeSchema", True).parquet("/tmp/schema_merge")
# merged_df.printSchema()
# ====================================
# β
RDD SECTION
# ====================================
data = ["apple", "banana", "carrot", "banana"]
rdd1 = sc.parallelize(data)
rdd2 = sc.parallelize(["banana", "dragonfruit", "apple"])
print("RDD Union:", rdd1.union(rdd2).distinct().collect())
print("RDD Intersect:", rdd1.intersection(rdd2).collect())
print("RDD Subtract:", rdd1.subtract(rdd2).collect())
rdd = sc.parallelize(["user1", "user2", "user1", "user3"])
rdd_distinct = rdd.distinct()
print(rdd_distinct.collect())
# Output: ['user1', 'user2', 'user3']
rdd = sc.parallelize(["1,John,30", "2,Alice,25", "3,Bob,40"])
rdd_transformed = rdd.map(lambda x: (x.split(",")[1], int(x.split(",")[2])))
print(rdd_transformed.collect())
# Output: [('John', 30), ('Alice', 25), ('Bob', 40)]
rdd = sc.parallelize(["INFO: System started", "ERROR: Disk failure", "INFO: User login"])
rdd_errors = rdd.filter(lambda x: "ERROR" in x)
print(rdd_errors.collect())
# Output: ['ERROR: Disk failure']
rdd = sc.parallelize(["Hello World", "Spark is fast"])
rdd_flat = rdd.flatMap(lambda x: x.split(" "))
print(rdd_flat.collect())
# Output: ['Hello', 'World', 'Spark', 'is', 'fast']
rdd = sc.parallelize([("Alice", 85), ("Bob", 90), ("Alice", 95)])
rdd_grouped = rdd.groupByKey().mapValues(list)
print(rdd_grouped.collect())
# Output: [('Alice', [85, 95]), ('Bob', [90])]
rdd = sc.parallelize([("apple", 5), ("banana", 10), ("apple", 7)])
rdd_reduced = rdd.reduceByKey(lambda x, y: x + y)
print(rdd_reduced.collect())
# Output: [('apple', 12), ('banana', 10)]
rdd = sc.parallelize([(85, "Alice"), (90, "Bob"), (75, "Charlie")])
rdd_sorted = rdd.sortByKey(ascending=False)
print(rdd_sorted.collect())
# Output: [(90, 'Bob'), (85, 'Alice'), (75, 'Charlie')]
rdd = sc.parallelize([("Alice", 85), ("Bob", 90)])
rdd_scaled = rdd.mapValues(lambda x: "A" if x > 80 else "B")
print(rdd_scaled.collect())
# Output: [('Alice', 'A'), ('Bob', 'A')]
rdd_names = sc.parallelize([("101", "Alice"), ("102", "Bob")])
rdd_scores = sc.parallelize([("101", 85), ("102", 90)])
rdd_joined = rdd_names.join(rdd_scores)
print(rdd_joined.collect())
# Output: [('101', ('Alice', 85)), ('102', ('Bob', 90))]
rdd = sc.parallelize(range(100), numSlices=10)
rdd_coalesced = rdd.coalesce(5)
print(rdd_coalesced.getNumPartitions())
# Output: 5
rdd = sc.parallelize(range(100), numSlices=5)
rdd_repartitioned = rdd.repartition(10)
print(rdd_repartitioned.getNumPartitions())
# Output: 10
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize(["Apple", "Banana", "Cherry"])
rdd_zipped = rdd1.zip(rdd2)
print(rdd_zipped.collect())
# Output: [(1, 'Apple'), (2, 'Banana'), (3, 'Cherry')]
# β Practice Q: Word Count
text_rdd = sc.parallelize(["hello spark", "hello world"])
word_count = text_rdd.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)
print("Word Count:", word_count.collect())
# ====================================
# β
DATAFRAME CORE OPS
# ====================================
df = spark.createDataFrame([(1, "Alice", 29), (2, "Bob", None), (3, None, 32)], ["id", "name", "age"])
df.show()
df.show(truncate=False)
df.show(n=5, vertical=True)
# πΉ Handling Nulls
df.dropna().show()
df.dropna(how="any").show()
df.fillna({"name": "Unknown", "age": 0}).show()
df.na.replace([None], "NA").show()
# β Practice Q2: Fill missing names with "Unknown" and calculate avg age
filled_df = df.fillna({"name": "Unknown"})
filled_df.groupBy("name").agg(avg("age")).show()
# πΉ Column Operations (Rename, Add, Drop)
df_renamed = df.withColumnRenamed("name", "full_name")
df_added = df.withColumn("age_plus_10", col("age") + 10)
df_dropped = df.drop("age")
df_dynamic = df.select([col(c).alias(f"col_{i}") for i, c in enumerate(df.columns)])
print("Renamed:"); df_renamed.show()
print("Added:"); df_added.show()
print("Dropped:"); df_dropped.show()
print("Dynamic Rename:"); df_dynamic.show()
# β Practice Q3: Dynamically rename all columns to snake_case
snake_df = df.select([col(c).alias(c.lower().replace(" ", "_")) for c in df.columns])
snake_df.printSchema()
# πΉ Array/Map/Struct Ops
complex_df = spark.createDataFrame([(1, ["a", "b"], {"x": 1}), (2, ["c"], {"y": 2})], ["id", "arr", "kv"])
complex_df.select(size("arr"), explode("arr"), col("kv.x")).show()
# ====================================
# β
JOINS + NULL-SAFE
# ====================================
df1 = spark.createDataFrame([(1, "A"), (2, "B"), (3, None)], ["id", "val"])
df2 = spark.createDataFrame([(2, "B"), (3, "C"), (4, None)], ["id", "val"])
df1.join(df2, on="id", how="inner").show()
df1.join(df2, df1.val <=> df2.val, how="inner").show()
# β Practice Q4: Get rows in df1 not present in df2
result = df1.join(df2, on="id", how="left_anti")
result.show()
# ====================================
# β
WINDOW FUNCTIONS (FULL)
# ====================================
wdf = spark.createDataFrame([
(1, "HR", 100), (2, "HR", 200), (3, "IT", 300),
(4, "HR", 150), (5, "IT", 100)
], ["id", "dept", "salary"])
win_spec = Window.partitionBy("dept").orderBy(col("salary").desc())
win_spec_rows = Window.partitionBy("dept").rowsBetween(Window.unboundedPreceding, Window.currentRow)
wdf.select(
"*",
row_number().over(win_spec).alias("row_number"),
rank().over(win_spec).alias("rank"),
dense_rank().over(win_spec).alias("dense_rank"),
ntile(2).over(win_spec).alias("ntile"),
lag("salary", 1).over(win_spec).alias("lag"),
lead("salary", 1).over(win_spec).alias("lead"),
first("salary").over(win_spec).alias("first"),
last("salary").over(win_spec).alias("last"),
avg("salary").over(win_spec_rows).alias("running_avg"),
sum("salary").over(win_spec_rows).alias("running_sum"),
min("salary").over(win_spec_rows).alias("running_min"),
max("salary").over(win_spec_rows).alias("running_max")
).show()
# β Practice Q5: Running average and salary rank by department
# ====================================
# β
AGGREGATIONS
# ====================================
ag_df = wdf.groupBy("dept").agg(count("id"), avg("salary"), max("salary"), collect_list("salary"))
ag_df.show()
# ====================================
# β
PARTITIONING / PERSISTENCE
# ====================================
df = df.repartition(4)
print("Partitions:", df.rdd.getNumPartitions())
df = df.coalesce(2)
df.cache().count()
df.persist(StorageLevel.MEMORY_AND_DISK)
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
df_cp = df.checkpoint(); df_cp.count()
# ====================================
# β
FILE FORMATS, BUCKETING, WRITING
# ====================================
wdf.write.mode("overwrite").parquet("/tmp/wdf")
spark.read.parquet("/tmp/wdf").explain(True)
bucket_df = spark.createDataFrame([(1, "A"), (2, "B"), (3, "C")], ["id", "name"])
bucket_df.write.bucketBy(2, "id").sortBy("id").mode("overwrite").saveAsTable("bucket_table")
# ====================================
# β
ADVANCED PRACTICE INTERVIEW QUESTIONS
# ====================================
df_dupe = spark.createDataFrame([(1, "X"), (1, "X"), (2, "Y")], ["id", "val"])
df_dupe.groupBy("id", "val").count().filter("count > 1").show()
# Q: How to detect skewed joins or fix them?
# β Use broadcast joins or apply salting for large key skew
# Q: Difference between repartition and coalesce?
# β Repartition increases partitions via shuffle, coalesce merges them with no shuffle
# Q: What happens when you cache a DF but never trigger action?
# β Lazy evaluation β caching only happens when an action forces execution
# β
Stop Spark
spark.stop()