Here’s a detailed and interview-optimized answer sheet for each of your 8 questions (Q1–Q8), covering PySpark coding, Data Quality (DQ), SCD, optimization, and Spark architecture (AQE) — exactly how you’d want to respond in a technical interview:
✅ Q1. Extract dates from lines using string methods (not regex) and return those with year > 2018
lines = [
"Transaction on 2022-04-15 completed",
"Last login was 2017-10-30",
"User joined on 2019-01-05"
]
result = []
for line in lines:
parts = line.split()
for part in parts:
if '-' in part and len(part) == 10:
year = int(part[:4])
if year > 2018:
result.append(part)
print(result) # ['2022-04-15', '2019-01-05']
✅ Q2. PySpark DQ check: Ensure the date
column is in yyyy-MM-dd
format
from pyspark.sql.functions import to_date, col
# Add a column to check validity
df = df.withColumn("valid_date", to_date(col("date"), "yyyy-MM-dd"))
# Filter invalid rows
invalid_dates = df.filter(col("valid_date").isNull())
🔎 Use this for reporting or failing data.
✅ Q3. PySpark DQ check: Ensure employee_name
has only alphabets
from pyspark.sql.functions import col
# Keep only alphabetic names
clean_df = df.filter(col("employee_name").rlike("^[A-Za-z]+$"))
🔍 Discards: vivek123
, vivek&
, john_doe1
✅ Q4. Implement SCD Type 1 and SCD Type 2 in PySpark
🔹 SCD Type 1 (overwrite old record)
from pyspark.sql.functions import col
# join source and target on key
updated_df = source_df.alias("src") \
.join(target_df.alias("tgt"), "id") \
.select("src.*") # overwrite values
# overwrite entire dimension table
updated_df.write.mode("overwrite").saveAsTable("dim_table")
🔸 SCD Type 2 (track history using start_date
, end_date
, is_current
)
from pyspark.sql.functions import current_date, lit
# Step 1: Expire current records
expired = target_df.join(source_df, "id") \
.filter("tgt.column1 != src.column1") \
.withColumn("end_date", current_date()) \
.withColumn("is_current", lit(False))
# Step 2: Insert new record
new_record = source_df.withColumn("start_date", current_date()) \
.withColumn("end_date", lit(None)) \
.withColumn("is_current", lit(True))
# Step 3: Union and write
final_df = expired.unionByName(new_record)
final_df.write.mode("append").saveAsTable("dim_table")
✅ Q5. PySpark Optimization Techniques
Technique | Explanation |
---|---|
Partitioning | Use .repartition() to increase parallelism, .coalesce() to reduce small file writes |
Broadcast joins | For small dimension tables, avoid shuffle |
Predicate Pushdown | Enable by default in Parquet/ORC |
Cache / persist | Store reused DataFrames in memory (.cache() / .persist() ) |
Avoid wide transformations | Minimize shuffle-heavy ops like groupBy on large skewed data |
Use DataFrames over RDDs | Catalyst and Tungsten optimize DataFrame API |
Enable AQE | Dynamic optimization at runtime |
Use efficient file formats | Prefer Parquet/ORC with Snappy compression |
✅ Q6. Using REST APIs in Databricks
Use Case | Method |
---|---|
Calling external APIs from notebooks | Use requests Python module |
import requests
response = requests.get("https://api.example.com/data")
data = response.json()
Use Case | Method |
---|---|
Triggering Databricks jobs from external tools | Use Databricks REST API |
- Requires
Bearer token
- Sample:
curl -X POST https://<workspace-url>/api/2.1/jobs/run-now \
-H "Authorization: Bearer <TOKEN>" \
-d '{ "job_id": 123 }'
Docs: https://docs.databricks.com/api
✅ Q7. What is Spark Adaptive Query Execution (AQE)?
Feature | Explanation |
---|---|
What | Runtime optimization feature in Spark 3.x+ |
How to enable | spark.sql.adaptive.enabled = true |
When triggered | After shuffle stage planning but before execution |
Benefits |
- Dynamically adjusts number of shuffle partitions
- Auto-switches join strategy (broadcast ↔ shuffle)
- Handles skewed joins better
📌 Use in dynamic, unpredictable workloads (e.g., joins, groupBy on varying data volumes)
✅ Q8. How do you handle nulls and duplicates in Spark?
🔹 Null Handling
df = df.na.fill({"column1": "NA"}) # Replace nulls
df = df.dropna(subset=["column1"]) # Drop rows with nulls
🔹 Duplicate Handling
df = df.dropDuplicates(["id"]) # Keep first by default
🔍 In interviews:
“We applied null checks in ETL pipelines using
dropna
,fillna
, and custom rules viawhen/otherwise
. For deduplication, we usedropDuplicates
on natural keys. For CDC, we use row_hashing + windowing to isolate true deltas.”
Leave a Reply