Perfect! Let’s continue with the advanced and interesting SQL questions—tailored to your target stack (PySpark, Hadoop, AWS, and resume-worthy scenarios).
We’ll follow this flow:
✅ Today’s Focus: Advanced SQL Scenarios (with PySpark and Big Data Angle)
Topic | Challenge Style |
---|---|
1. Ranking, Deduplication | Row_number, Dense_rank |
2. Complex Joins + Filters | Multi-condition joins |
3. Aggregations Over Groups | Rolling sums, group-wise max/min |
4. Conditional Aggregation | Case When inside aggregation |
5. SCD (Slowly Changing Dimension) | Resume-worthy transformation logic |
6. Nested Window Functions | Use of lead/lag + partitioning |
7. Percentile, Median, NTILE | Real-world analytics style |
🧠 Challenge 1: Remove Duplicates Keeping the Latest Record
Table: user_activity(user_id, login_time, source)
Question:
Return the latest login record for each user.
✅ SQL Version:
SELECT user_id, login_time, source
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY login_time DESC) as rn
FROM user_activity
) t
WHERE rn = 1;
🔁 PySpark Version:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
window_spec = Window.partitionBy("user_id").orderBy(col("login_time").desc())
df_latest = df.withColumn("rn", row_number().over(window_spec)) \
.filter("rn = 1") \
.drop("rn")
🧠 Challenge 2: Users Who Booked in Last 7 Days but Not in Previous 7
Table: bookings(user_id, booking_date)
Goal: Identify users active this week but not in the previous week.
✅ SQL Version:
WITH last_week AS (
SELECT DISTINCT user_id FROM bookings
WHERE booking_date BETWEEN current_date - INTERVAL 14 DAY AND current_date - INTERVAL 8 DAY
),
this_week AS (
SELECT DISTINCT user_id FROM bookings
WHERE booking_date BETWEEN current_date - INTERVAL 7 DAY AND current_date
)
SELECT user_id FROM this_week
WHERE user_id NOT IN (SELECT user_id FROM last_week);
🔁 PySpark Version:
from pyspark.sql.functions import current_date, date_sub
this_week = df.filter((col("booking_date") >= date_sub(current_date(), 7)))
last_week = df.filter((col("booking_date") >= date_sub(current_date(), 14)) &
(col("booking_date") <= date_sub(current_date(), 8)))
this_week_users = this_week.select("user_id").distinct()
last_week_users = last_week.select("user_id").distinct()
new_users = this_week_users.join(last_week_users, "user_id", "left_anti")
🧠 Challenge 3: Rolling Sum Over 3 Previous Orders
Table: orders(user_id, order_date, amount)
Goal: Calculate rolling sum of current and 2 previous orders for each user (by order_date).
✅ SQL:
SELECT user_id, order_date, amount,
SUM(amount) OVER (
PARTITION BY user_id ORDER BY order_date
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) AS rolling_sum
FROM orders;
🔁 PySpark:
from pyspark.sql.functions import sum as _sum
from pyspark.sql.window import Window
win = Window.partitionBy("user_id").orderBy("order_date").rowsBetween(-2, 0)
df_roll = df.withColumn("rolling_sum", _sum("amount").over(win))
🧠 Challenge 4: Find Top 2 Products Per Category by Revenue
Table: sales(product_id, category, revenue)
✅ SQL:
SELECT * FROM (
SELECT *,
DENSE_RANK() OVER (PARTITION BY category ORDER BY revenue DESC) as rnk
FROM sales
) t
WHERE rnk <= 2;
🔁 PySpark:
from pyspark.sql.functions import dense_rank
window_spec = Window.partitionBy("category").orderBy(col("revenue").desc())
df_top2 = df.withColumn("rnk", dense_rank().over(window_spec)) \
.filter("rnk <= 2")
Great choice! These three topics are high-value, often asked in interviews, and are perfect for both SQL and PySpark.
🔍 Deep Dive: Advanced SQL & PySpark Aggregation Patterns
✅ 1. CASE-WHEN Inside Aggregations
This is used to calculate conditional metrics (e.g., count only “active” users or sum revenue from a specific country).
🧠 Use Case:
Count total users and number of users from “India”.
✅ SQL:
SELECT
COUNT(*) AS total_users,
COUNT(CASE WHEN country = 'India' THEN 1 END) AS india_users
FROM users;
🔁 PySpark:
from pyspark.sql.functions import count, when
df.groupBy().agg(
count("*").alias("total_users"),
count(when(col("country") == "India", True)).alias("india_users")
)
🧠 Another Use Case:
Average salary by department, only for employees > 30 years old.
SELECT department,
AVG(CASE WHEN age > 30 THEN salary ELSE NULL END) AS avg_salary_30plus
FROM employees
GROUP BY department;
df.groupBy("department").agg(
avg(when(col("age") > 30, col("salary"))).alias("avg_salary_30plus")
)
✅ 2. LEAD / LAG + Change Detection
Used to track changes, detect status shifts, or compare rows in time.
🧠 Use Case:
Detect when a user’s status changed (e.g., active → inactive).
Table: user_status(user_id, status, update_date)
✅ SQL:
SELECT *,
LAG(status) OVER (PARTITION BY user_id ORDER BY update_date) AS prev_status
FROM user_status
Add change detection:
SELECT *,
CASE WHEN status != LAG(status) OVER (PARTITION BY user_id ORDER BY update_date)
THEN 1 ELSE 0 END AS status_changed
FROM user_status;
🔁 PySpark:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, when
w = Window.partitionBy("user_id").orderBy("update_date")
df_changes = df.withColumn("prev_status", lag("status").over(w)) \
.withColumn("status_changed", when(col("status") != col("prev_status"), 1).otherwise(0))
✅ 3. Percentiles, Quartiles, NTILE – Analytics-style
🧠 Use Case:
Assign users into 4 quartiles based on their total spend.
✅ SQL with NTILE:
SELECT user_id, total_spend,
NTILE(4) OVER (ORDER BY total_spend DESC) AS quartile
FROM user_spend;
🔁 PySpark:
from pyspark.sql.functions import ntile
window_spec = Window.orderBy(col("total_spend").desc())
df_quartiles = df.withColumn("quartile", ntile(4).over(window_spec))
🧠 Use Case: Calculate Median/Percentile
✅ SQL (PostgreSQL or Presto-style):
SELECT PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY total_spend) AS median_spend
FROM user_spend;
🔁 PySpark:
df.approxQuantile("total_spend", [0.25, 0.5, 0.75], 0.01)
# returns [Q1, Median, Q3]
For grouped median:
from pyspark.sql.functions import expr
df.groupBy("country").agg(
expr("percentile_approx(total_spend, 0.5)").alias("median_spend")
)
🚀 What’s Next?
Would you like to:
- Practice these via 4–5 real-world challenge questions?
- Move to PySpark-S3-Bitbucket pipeline challenges?
- Continue SQL deep dive with Time-based Analysis, Grouping Sets, or Recursive CTEs?
Let me know, and I’ll queue it up.
Perfect plan. Yes, we can do both in parallel—we’ll solve the 5 real-world SQL + PySpark problems (Stage 1) while also preparing the structure for the S3 + Spark mini-project (Stage 2) alongside.
✅ Stage 1: Real-World SQL + PySpark Challenge Set
🔹 Challenge 1: CASE-WHEN + Aggregation
Problem:
From transactions(user_id, amount, country)
, find for each user:
- Total transactions
- Total amount from India
- Total amount from US
✅ SQL:
SELECT
user_id,
COUNT(*) AS total_txn,
SUM(CASE WHEN country = 'India' THEN amount ELSE 0 END) AS india_amount,
SUM(CASE WHEN country = 'US' THEN amount ELSE 0 END) AS us_amount
FROM transactions
GROUP BY user_id;
🔁 PySpark:
from pyspark.sql.functions import count, sum, when
df.groupBy("user_id").agg(
count("*").alias("total_txn"),
sum(when(col("country") == "India", col("amount")).otherwise(0)).alias("india_amount"),
sum(when(col("country") == "US", col("amount")).otherwise(0)).alias("us_amount")
)
🔹 Challenge 2: LAG / Change Detection
Problem:
From employee_status(emp_id, status, update_date)
, detect when an employee moved from 'probation' → 'confirmed'
.
✅ SQL:
SELECT emp_id, update_date AS status_change_date
FROM (
SELECT *,
LAG(status) OVER (PARTITION BY emp_id ORDER BY update_date) AS prev_status
FROM employee_status
) t
WHERE prev_status = 'probation' AND status = 'confirmed';
🔁 PySpark:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag
w = Window.partitionBy("emp_id").orderBy("update_date")
df_status = df.withColumn("prev_status", lag("status").over(w)) \
.filter((col("prev_status") == "probation") & (col("status") == "confirmed")) \
.select("emp_id", col("update_date").alias("status_change_date"))
🔹 Challenge 3: NTILE Percentile Banding
Problem:
From user_scores(user_id, score)
, assign users into NTILE(10) bands based on score.
✅ SQL:
SELECT user_id, score,
NTILE(10) OVER (ORDER BY score DESC) AS percentile_rank
FROM user_scores;
🔁 PySpark:
from pyspark.sql.functions import ntile
from pyspark.sql.window import Window
w = Window.orderBy(col("score").desc())
df_ntile = df.withColumn("percentile_rank", ntile(10).over(w))
🔹 Challenge 4: LEAD + Gap Detection
Problem:
From subscriptions(user_id, start_date, end_date)
, detect gaps > 30 days between consecutive subscriptions.
✅ SQL:
SELECT user_id, end_date, next_start_date, DATEDIFF(next_start_date, end_date) AS gap_days
FROM (
SELECT *,
LEAD(start_date) OVER (PARTITION BY user_id ORDER BY start_date) AS next_start_date
FROM subscriptions
) t
WHERE DATEDIFF(next_start_date, end_date) > 30;
🔁 PySpark:
from pyspark.sql.functions import lead, datediff
w = Window.partitionBy("user_id").orderBy("start_date")
df_gap = df.withColumn("next_start_date", lead("start_date").over(w)) \
.withColumn("gap_days", datediff(col("next_start_date"), col("end_date"))) \
.filter(col("gap_days") > 30) \
.select("user_id", "end_date", "next_start_date", "gap_days")
🔹 Challenge 5: Grouped Median Using percentile_approx
Problem:
From sales(region, revenue)
, compute median revenue per region.
✅ SQL (Hive/Presto-style):
SELECT region,
percentile_approx(revenue, 0.5) AS median_revenue
FROM sales
GROUP BY region;
🔁 PySpark:
df.groupBy("region").agg(
expr("percentile_approx(revenue, 0.5)").alias("median_revenue")
)
🧩 All 5 Challenges Completed!
Now moving to…
🚀 Stage 2: PySpark + AWS S3 + Bitbucket Mini Project Setup
⚙ Project Plan:
Step | Task |
---|---|
✅ 1 | Ingest CSV/Parquet data from S3 |
✅ 2 | Transform with PySpark (apply logic from Stage 1!) |
✅ 3 | Write results back to S3 |
✅ 4 | Use Git + Bitbucket for version control |
✅ 5 | Optional: Add Airflow or Bash script for scheduling |
📁 Folder Structure:
project-spark-s3/
├── src/
│ ├── ingest.py # S3 read logic
│ ├── transform.py # SQL logic using DataFrames
│ ├── write.py # Write to S3
├── conf/
│ └── config.json # S3 paths, credentials, options
├── test/
│ └── test_cases.py # Optional test logic
├── notebooks/
│ └── demo.ipynb # PySpark + S3 demo
├── README.md
├── .gitignore