Here’s a detailed and interview-optimized answer sheet for each of your 8 questions (Q1–Q8), covering PySpark coding, Data Quality (DQ), SCD, optimization, and Spark architecture (AQE) — exactly how you’d want to respond in a technical interview:


Q1. Extract dates from lines using string methods (not regex) and return those with year > 2018

lines = [
    "Transaction on 2022-04-15 completed",
    "Last login was 2017-10-30",
    "User joined on 2019-01-05"
]

result = []
for line in lines:
    parts = line.split()
    for part in parts:
        if '-' in part and len(part) == 10:
            year = int(part[:4])
            if year > 2018:
                result.append(part)

print(result)  # ['2022-04-15', '2019-01-05']

Q2. PySpark DQ check: Ensure the date column is in yyyy-MM-dd format

from pyspark.sql.functions import to_date, col

# Add a column to check validity
df = df.withColumn("valid_date", to_date(col("date"), "yyyy-MM-dd"))

# Filter invalid rows
invalid_dates = df.filter(col("valid_date").isNull())

🔎 Use this for reporting or failing data.


Q3. PySpark DQ check: Ensure employee_name has only alphabets

from pyspark.sql.functions import col

# Keep only alphabetic names
clean_df = df.filter(col("employee_name").rlike("^[A-Za-z]+$"))

🔍 Discards: vivek123, vivek&, john_doe1


Q4. Implement SCD Type 1 and SCD Type 2 in PySpark

🔹 SCD Type 1 (overwrite old record)

from pyspark.sql.functions import col

# join source and target on key
updated_df = source_df.alias("src") \
    .join(target_df.alias("tgt"), "id") \
    .select("src.*")  # overwrite values

# overwrite entire dimension table
updated_df.write.mode("overwrite").saveAsTable("dim_table")

🔸 SCD Type 2 (track history using start_date, end_date, is_current)

from pyspark.sql.functions import current_date, lit

# Step 1: Expire current records
expired = target_df.join(source_df, "id") \
    .filter("tgt.column1 != src.column1") \
    .withColumn("end_date", current_date()) \
    .withColumn("is_current", lit(False))

# Step 2: Insert new record
new_record = source_df.withColumn("start_date", current_date()) \
    .withColumn("end_date", lit(None)) \
    .withColumn("is_current", lit(True))

# Step 3: Union and write
final_df = expired.unionByName(new_record)
final_df.write.mode("append").saveAsTable("dim_table")

Q5. PySpark Optimization Techniques

TechniqueExplanation
PartitioningUse .repartition() to increase parallelism, .coalesce() to reduce small file writes
Broadcast joinsFor small dimension tables, avoid shuffle
Predicate PushdownEnable by default in Parquet/ORC
Cache / persistStore reused DataFrames in memory (.cache() / .persist())
Avoid wide transformationsMinimize shuffle-heavy ops like groupBy on large skewed data
Use DataFrames over RDDsCatalyst and Tungsten optimize DataFrame API
Enable AQEDynamic optimization at runtime
Use efficient file formatsPrefer Parquet/ORC with Snappy compression

Q6. Using REST APIs in Databricks

Use CaseMethod
Calling external APIs from notebooksUse requests Python module
import requests
response = requests.get("https://api.example.com/data")
data = response.json()
Use CaseMethod
Triggering Databricks jobs from external toolsUse Databricks REST API
  • Requires Bearer token
  • Sample:
curl -X POST https://<workspace-url>/api/2.1/jobs/run-now \
-H "Authorization: Bearer <TOKEN>" \
-d '{ "job_id": 123 }'

Docs: https://docs.databricks.com/api


Q7. What is Spark Adaptive Query Execution (AQE)?

FeatureExplanation
WhatRuntime optimization feature in Spark 3.x+
How to enablespark.sql.adaptive.enabled = true
When triggeredAfter shuffle stage planning but before execution
Benefits
  • Dynamically adjusts number of shuffle partitions
  • Auto-switches join strategy (broadcast ↔ shuffle)
  • Handles skewed joins better

📌 Use in dynamic, unpredictable workloads (e.g., joins, groupBy on varying data volumes)


Q8. How do you handle nulls and duplicates in Spark?

🔹 Null Handling

df = df.na.fill({"column1": "NA"})  # Replace nulls
df = df.dropna(subset=["column1"])  # Drop rows with nulls

🔹 Duplicate Handling

df = df.dropDuplicates(["id"])  # Keep first by default

🔍 In interviews:

“We applied null checks in ETL pipelines using dropna, fillna, and custom rules via when/otherwise. For deduplication, we use dropDuplicates on natural keys. For CDC, we use row_hashing + windowing to isolate true deltas.”


Pages: 1 2 3 4


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Posted in

Leave a Reply

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading