Step-by-Step Roadmap tailored for Data Engineer target stack (SQL, PySpark, Python, AWS S3, Hadoop, Bitbucket)

Perfect! Let’s continue with the advanced and interesting SQL questions—tailored to your target stack (PySpark, Hadoop, AWS, and resume-worthy scenarios).

We’ll follow this flow:

Today’s Focus: Advanced SQL Scenarios (with PySpark and Big Data Angle)

TopicChallenge Style
1. Ranking, DeduplicationRow_number, Dense_rank
2. Complex Joins + FiltersMulti-condition joins
3. Aggregations Over GroupsRolling sums, group-wise max/min
4. Conditional AggregationCase When inside aggregation
5. SCD (Slowly Changing Dimension)Resume-worthy transformation logic
6. Nested Window FunctionsUse of lead/lag + partitioning
7. Percentile, Median, NTILEReal-world analytics style

🧠 Challenge 1: Remove Duplicates Keeping the Latest Record

Table: user_activity(user_id, login_time, source)

Question:

Return the latest login record for each user.

✅ SQL Version:

SELECT user_id, login_time, source
FROM (
  SELECT *,
         ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY login_time DESC) as rn
  FROM user_activity
) t
WHERE rn = 1;

🔁 PySpark Version:

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.partitionBy("user_id").orderBy(col("login_time").desc())

df_latest = df.withColumn("rn", row_number().over(window_spec)) \
              .filter("rn = 1") \
              .drop("rn")

🧠 Challenge 2: Users Who Booked in Last 7 Days but Not in Previous 7

Table: bookings(user_id, booking_date)

Goal: Identify users active this week but not in the previous week.

✅ SQL Version:

WITH last_week AS (
  SELECT DISTINCT user_id FROM bookings
  WHERE booking_date BETWEEN current_date - INTERVAL 14 DAY AND current_date - INTERVAL 8 DAY
),
this_week AS (
  SELECT DISTINCT user_id FROM bookings
  WHERE booking_date BETWEEN current_date - INTERVAL 7 DAY AND current_date
)
SELECT user_id FROM this_week
WHERE user_id NOT IN (SELECT user_id FROM last_week);

🔁 PySpark Version:

from pyspark.sql.functions import current_date, date_sub

this_week = df.filter((col("booking_date") >= date_sub(current_date(), 7)))
last_week = df.filter((col("booking_date") >= date_sub(current_date(), 14)) & 
                      (col("booking_date") <= date_sub(current_date(), 8)))

this_week_users = this_week.select("user_id").distinct()
last_week_users = last_week.select("user_id").distinct()

new_users = this_week_users.join(last_week_users, "user_id", "left_anti")

🧠 Challenge 3: Rolling Sum Over 3 Previous Orders

Table: orders(user_id, order_date, amount)

Goal: Calculate rolling sum of current and 2 previous orders for each user (by order_date).

✅ SQL:

SELECT user_id, order_date, amount,
       SUM(amount) OVER (
         PARTITION BY user_id ORDER BY order_date
         ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
       ) AS rolling_sum
FROM orders;

🔁 PySpark:

from pyspark.sql.functions import sum as _sum
from pyspark.sql.window import Window

win = Window.partitionBy("user_id").orderBy("order_date").rowsBetween(-2, 0)

df_roll = df.withColumn("rolling_sum", _sum("amount").over(win))

🧠 Challenge 4: Find Top 2 Products Per Category by Revenue

Table: sales(product_id, category, revenue)

✅ SQL:

SELECT * FROM (
  SELECT *, 
         DENSE_RANK() OVER (PARTITION BY category ORDER BY revenue DESC) as rnk
  FROM sales
) t
WHERE rnk <= 2;

🔁 PySpark:

from pyspark.sql.functions import dense_rank
window_spec = Window.partitionBy("category").orderBy(col("revenue").desc())

df_top2 = df.withColumn("rnk", dense_rank().over(window_spec)) \
            .filter("rnk <= 2")

Great choice! These three topics are high-value, often asked in interviews, and are perfect for both SQL and PySpark.


🔍 Deep Dive: Advanced SQL & PySpark Aggregation Patterns


✅ 1. CASE-WHEN Inside Aggregations

This is used to calculate conditional metrics (e.g., count only “active” users or sum revenue from a specific country).

🧠 Use Case:

Count total users and number of users from “India”.

✅ SQL:

SELECT 
  COUNT(*) AS total_users,
  COUNT(CASE WHEN country = 'India' THEN 1 END) AS india_users
FROM users;

🔁 PySpark:

from pyspark.sql.functions import count, when

df.groupBy().agg(
    count("*").alias("total_users"),
    count(when(col("country") == "India", True)).alias("india_users")
)

🧠 Another Use Case:

Average salary by department, only for employees > 30 years old.

SELECT department,
       AVG(CASE WHEN age > 30 THEN salary ELSE NULL END) AS avg_salary_30plus
FROM employees
GROUP BY department;
df.groupBy("department").agg(
    avg(when(col("age") > 30, col("salary"))).alias("avg_salary_30plus")
)

✅ 2. LEAD / LAG + Change Detection

Used to track changes, detect status shifts, or compare rows in time.

🧠 Use Case:

Detect when a user’s status changed (e.g., active → inactive).

Table: user_status(user_id, status, update_date)

✅ SQL:

SELECT *,
       LAG(status) OVER (PARTITION BY user_id ORDER BY update_date) AS prev_status
FROM user_status

Add change detection:

SELECT *,
       CASE WHEN status != LAG(status) OVER (PARTITION BY user_id ORDER BY update_date)
            THEN 1 ELSE 0 END AS status_changed
FROM user_status;

🔁 PySpark:

from pyspark.sql.window import Window
from pyspark.sql.functions import lag, when

w = Window.partitionBy("user_id").orderBy("update_date")

df_changes = df.withColumn("prev_status", lag("status").over(w)) \
               .withColumn("status_changed", when(col("status") != col("prev_status"), 1).otherwise(0))

✅ 3. Percentiles, Quartiles, NTILE – Analytics-style

🧠 Use Case:

Assign users into 4 quartiles based on their total spend.

✅ SQL with NTILE:

SELECT user_id, total_spend,
       NTILE(4) OVER (ORDER BY total_spend DESC) AS quartile
FROM user_spend;

🔁 PySpark:

from pyspark.sql.functions import ntile

window_spec = Window.orderBy(col("total_spend").desc())

df_quartiles = df.withColumn("quartile", ntile(4).over(window_spec))

🧠 Use Case: Calculate Median/Percentile

✅ SQL (PostgreSQL or Presto-style):

SELECT PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY total_spend) AS median_spend
FROM user_spend;

🔁 PySpark:

df.approxQuantile("total_spend", [0.25, 0.5, 0.75], 0.01)
# returns [Q1, Median, Q3]

For grouped median:

from pyspark.sql.functions import expr

df.groupBy("country").agg(
    expr("percentile_approx(total_spend, 0.5)").alias("median_spend")
)

🚀 What’s Next?

Would you like to:

  1. Practice these via 4–5 real-world challenge questions?
  2. Move to PySpark-S3-Bitbucket pipeline challenges?
  3. Continue SQL deep dive with Time-based Analysis, Grouping Sets, or Recursive CTEs?

Let me know, and I’ll queue it up.

Perfect plan. Yes, we can do both in parallel—we’ll solve the 5 real-world SQL + PySpark problems (Stage 1) while also preparing the structure for the S3 + Spark mini-project (Stage 2) alongside.


✅ Stage 1: Real-World SQL + PySpark Challenge Set


🔹 Challenge 1: CASE-WHEN + Aggregation

Problem:
From transactions(user_id, amount, country), find for each user:

  • Total transactions
  • Total amount from India
  • Total amount from US

✅ SQL:

SELECT 
  user_id,
  COUNT(*) AS total_txn,
  SUM(CASE WHEN country = 'India' THEN amount ELSE 0 END) AS india_amount,
  SUM(CASE WHEN country = 'US' THEN amount ELSE 0 END) AS us_amount
FROM transactions
GROUP BY user_id;

🔁 PySpark:

from pyspark.sql.functions import count, sum, when

df.groupBy("user_id").agg(
    count("*").alias("total_txn"),
    sum(when(col("country") == "India", col("amount")).otherwise(0)).alias("india_amount"),
    sum(when(col("country") == "US", col("amount")).otherwise(0)).alias("us_amount")
)

🔹 Challenge 2: LAG / Change Detection

Problem:
From employee_status(emp_id, status, update_date), detect when an employee moved from 'probation' → 'confirmed'.


✅ SQL:

SELECT emp_id, update_date AS status_change_date
FROM (
  SELECT *,
         LAG(status) OVER (PARTITION BY emp_id ORDER BY update_date) AS prev_status
  FROM employee_status
) t
WHERE prev_status = 'probation' AND status = 'confirmed';

🔁 PySpark:

from pyspark.sql.window import Window
from pyspark.sql.functions import lag

w = Window.partitionBy("emp_id").orderBy("update_date")

df_status = df.withColumn("prev_status", lag("status").over(w)) \
              .filter((col("prev_status") == "probation") & (col("status") == "confirmed")) \
              .select("emp_id", col("update_date").alias("status_change_date"))

🔹 Challenge 3: NTILE Percentile Banding

Problem:
From user_scores(user_id, score), assign users into NTILE(10) bands based on score.


✅ SQL:

SELECT user_id, score,
       NTILE(10) OVER (ORDER BY score DESC) AS percentile_rank
FROM user_scores;

🔁 PySpark:

from pyspark.sql.functions import ntile
from pyspark.sql.window import Window

w = Window.orderBy(col("score").desc())

df_ntile = df.withColumn("percentile_rank", ntile(10).over(w))

🔹 Challenge 4: LEAD + Gap Detection

Problem:
From subscriptions(user_id, start_date, end_date), detect gaps > 30 days between consecutive subscriptions.


✅ SQL:

SELECT user_id, end_date, next_start_date, DATEDIFF(next_start_date, end_date) AS gap_days
FROM (
  SELECT *,
         LEAD(start_date) OVER (PARTITION BY user_id ORDER BY start_date) AS next_start_date
  FROM subscriptions
) t
WHERE DATEDIFF(next_start_date, end_date) > 30;

🔁 PySpark:

from pyspark.sql.functions import lead, datediff

w = Window.partitionBy("user_id").orderBy("start_date")

df_gap = df.withColumn("next_start_date", lead("start_date").over(w)) \
           .withColumn("gap_days", datediff(col("next_start_date"), col("end_date"))) \
           .filter(col("gap_days") > 30) \
           .select("user_id", "end_date", "next_start_date", "gap_days")

🔹 Challenge 5: Grouped Median Using percentile_approx

Problem:
From sales(region, revenue), compute median revenue per region.


✅ SQL (Hive/Presto-style):

SELECT region,
       percentile_approx(revenue, 0.5) AS median_revenue
FROM sales
GROUP BY region;

🔁 PySpark:

df.groupBy("region").agg(
    expr("percentile_approx(revenue, 0.5)").alias("median_revenue")
)

🧩 All 5 Challenges Completed!

Now moving to…


🚀 Stage 2: PySpark + AWS S3 + Bitbucket Mini Project Setup

⚙ Project Plan:

StepTask
✅ 1Ingest CSV/Parquet data from S3
✅ 2Transform with PySpark (apply logic from Stage 1!)
✅ 3Write results back to S3
✅ 4Use Git + Bitbucket for version control
✅ 5Optional: Add Airflow or Bash script for scheduling

📁 Folder Structure:

project-spark-s3/
├── src/
│   ├── ingest.py         # S3 read logic
│   ├── transform.py      # SQL logic using DataFrames
│   ├── write.py          # Write to S3
├── conf/
│   └── config.json       # S3 paths, credentials, options
├── test/
│   └── test_cases.py     # Optional test logic
├── notebooks/
│   └── demo.ipynb        # PySpark + S3 demo
├── README.md
├── .gitignore

Pages: 1 2 3