Perfect! Let’s continue with the advanced and interesting SQL questions—tailored to your target stack (PySpark, Hadoop, AWS, and resume-worthy scenarios).

We’ll follow this flow:

Today’s Focus: Advanced SQL Scenarios (with PySpark and Big Data Angle)

TopicChallenge Style
1. Ranking, DeduplicationRow_number, Dense_rank
2. Complex Joins + FiltersMulti-condition joins
3. Aggregations Over GroupsRolling sums, group-wise max/min
4. Conditional AggregationCase When inside aggregation
5. SCD (Slowly Changing Dimension)Resume-worthy transformation logic
6. Nested Window FunctionsUse of lead/lag + partitioning
7. Percentile, Median, NTILEReal-world analytics style

🧠 Challenge 1: Remove Duplicates Keeping the Latest Record

Table: user_activity(user_id, login_time, source)

Question:

Return the latest login record for each user.

✅ SQL Version:

SELECT user_id, login_time, source
FROM (
  SELECT *,
         ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY login_time DESC) as rn
  FROM user_activity
) t
WHERE rn = 1;

🔁 PySpark Version:

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.partitionBy("user_id").orderBy(col("login_time").desc())

df_latest = df.withColumn("rn", row_number().over(window_spec)) \
              .filter("rn = 1") \
              .drop("rn")

🧠 Challenge 2: Users Who Booked in Last 7 Days but Not in Previous 7

Table: bookings(user_id, booking_date)

Goal: Identify users active this week but not in the previous week.

✅ SQL Version:

WITH last_week AS (
  SELECT DISTINCT user_id FROM bookings
  WHERE booking_date BETWEEN current_date - INTERVAL 14 DAY AND current_date - INTERVAL 8 DAY
),
this_week AS (
  SELECT DISTINCT user_id FROM bookings
  WHERE booking_date BETWEEN current_date - INTERVAL 7 DAY AND current_date
)
SELECT user_id FROM this_week
WHERE user_id NOT IN (SELECT user_id FROM last_week);

🔁 PySpark Version:

from pyspark.sql.functions import current_date, date_sub

this_week = df.filter((col("booking_date") >= date_sub(current_date(), 7)))
last_week = df.filter((col("booking_date") >= date_sub(current_date(), 14)) & 
                      (col("booking_date") <= date_sub(current_date(), 8)))

this_week_users = this_week.select("user_id").distinct()
last_week_users = last_week.select("user_id").distinct()

new_users = this_week_users.join(last_week_users, "user_id", "left_anti")

🧠 Challenge 3: Rolling Sum Over 3 Previous Orders

Table: orders(user_id, order_date, amount)

Goal: Calculate rolling sum of current and 2 previous orders for each user (by order_date).

✅ SQL:

SELECT user_id, order_date, amount,
       SUM(amount) OVER (
         PARTITION BY user_id ORDER BY order_date
         ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
       ) AS rolling_sum
FROM orders;

🔁 PySpark:

from pyspark.sql.functions import sum as _sum
from pyspark.sql.window import Window

win = Window.partitionBy("user_id").orderBy("order_date").rowsBetween(-2, 0)

df_roll = df.withColumn("rolling_sum", _sum("amount").over(win))

🧠 Challenge 4: Find Top 2 Products Per Category by Revenue

Table: sales(product_id, category, revenue)

✅ SQL:

SELECT * FROM (
  SELECT *, 
         DENSE_RANK() OVER (PARTITION BY category ORDER BY revenue DESC) as rnk
  FROM sales
) t
WHERE rnk <= 2;

🔁 PySpark:

from pyspark.sql.functions import dense_rank
window_spec = Window.partitionBy("category").orderBy(col("revenue").desc())

df_top2 = df.withColumn("rnk", dense_rank().over(window_spec)) \
            .filter("rnk <= 2")

Great choice! These three topics are high-value, often asked in interviews, and are perfect for both SQL and PySpark.


🔍 Deep Dive: Advanced SQL & PySpark Aggregation Patterns


✅ 1. CASE-WHEN Inside Aggregations

This is used to calculate conditional metrics (e.g., count only “active” users or sum revenue from a specific country).

🧠 Use Case:

Count total users and number of users from “India”.

✅ SQL:

SELECT 
  COUNT(*) AS total_users,
  COUNT(CASE WHEN country = 'India' THEN 1 END) AS india_users
FROM users;

🔁 PySpark:

from pyspark.sql.functions import count, when

df.groupBy().agg(
    count("*").alias("total_users"),
    count(when(col("country") == "India", True)).alias("india_users")
)

🧠 Another Use Case:

Average salary by department, only for employees > 30 years old.

SELECT department,
       AVG(CASE WHEN age > 30 THEN salary ELSE NULL END) AS avg_salary_30plus
FROM employees
GROUP BY department;
df.groupBy("department").agg(
    avg(when(col("age") > 30, col("salary"))).alias("avg_salary_30plus")
)

✅ 2. LEAD / LAG + Change Detection

Used to track changes, detect status shifts, or compare rows in time.

🧠 Use Case:

Detect when a user’s status changed (e.g., active → inactive).

Table: user_status(user_id, status, update_date)

✅ SQL:

SELECT *,
       LAG(status) OVER (PARTITION BY user_id ORDER BY update_date) AS prev_status
FROM user_status

Add change detection:

SELECT *,
       CASE WHEN status != LAG(status) OVER (PARTITION BY user_id ORDER BY update_date)
            THEN 1 ELSE 0 END AS status_changed
FROM user_status;

🔁 PySpark:

from pyspark.sql.window import Window
from pyspark.sql.functions import lag, when

w = Window.partitionBy("user_id").orderBy("update_date")

df_changes = df.withColumn("prev_status", lag("status").over(w)) \
               .withColumn("status_changed", when(col("status") != col("prev_status"), 1).otherwise(0))

✅ 3. Percentiles, Quartiles, NTILE – Analytics-style

🧠 Use Case:

Assign users into 4 quartiles based on their total spend.

✅ SQL with NTILE:

SELECT user_id, total_spend,
       NTILE(4) OVER (ORDER BY total_spend DESC) AS quartile
FROM user_spend;

🔁 PySpark:

from pyspark.sql.functions import ntile

window_spec = Window.orderBy(col("total_spend").desc())

df_quartiles = df.withColumn("quartile", ntile(4).over(window_spec))

🧠 Use Case: Calculate Median/Percentile

✅ SQL (PostgreSQL or Presto-style):

SELECT PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY total_spend) AS median_spend
FROM user_spend;

🔁 PySpark:

df.approxQuantile("total_spend", [0.25, 0.5, 0.75], 0.01)
# returns [Q1, Median, Q3]

For grouped median:

from pyspark.sql.functions import expr

df.groupBy("country").agg(
    expr("percentile_approx(total_spend, 0.5)").alias("median_spend")
)

🚀 What’s Next?

Would you like to:

  1. Practice these via 4–5 real-world challenge questions?
  2. Move to PySpark-S3-Bitbucket pipeline challenges?
  3. Continue SQL deep dive with Time-based Analysis, Grouping Sets, or Recursive CTEs?

Let me know, and I’ll queue it up.

Perfect plan. Yes, we can do both in parallel—we’ll solve the 5 real-world SQL + PySpark problems (Stage 1) while also preparing the structure for the S3 + Spark mini-project (Stage 2) alongside.


✅ Stage 1: Real-World SQL + PySpark Challenge Set


🔹 Challenge 1: CASE-WHEN + Aggregation

Problem:
From transactions(user_id, amount, country), find for each user:

  • Total transactions
  • Total amount from India
  • Total amount from US

✅ SQL:

SELECT 
  user_id,
  COUNT(*) AS total_txn,
  SUM(CASE WHEN country = 'India' THEN amount ELSE 0 END) AS india_amount,
  SUM(CASE WHEN country = 'US' THEN amount ELSE 0 END) AS us_amount
FROM transactions
GROUP BY user_id;

🔁 PySpark:

from pyspark.sql.functions import count, sum, when

df.groupBy("user_id").agg(
    count("*").alias("total_txn"),
    sum(when(col("country") == "India", col("amount")).otherwise(0)).alias("india_amount"),
    sum(when(col("country") == "US", col("amount")).otherwise(0)).alias("us_amount")
)

🔹 Challenge 2: LAG / Change Detection

Problem:
From employee_status(emp_id, status, update_date), detect when an employee moved from 'probation' → 'confirmed'.


✅ SQL:

SELECT emp_id, update_date AS status_change_date
FROM (
  SELECT *,
         LAG(status) OVER (PARTITION BY emp_id ORDER BY update_date) AS prev_status
  FROM employee_status
) t
WHERE prev_status = 'probation' AND status = 'confirmed';

🔁 PySpark:

from pyspark.sql.window import Window
from pyspark.sql.functions import lag

w = Window.partitionBy("emp_id").orderBy("update_date")

df_status = df.withColumn("prev_status", lag("status").over(w)) \
              .filter((col("prev_status") == "probation") & (col("status") == "confirmed")) \
              .select("emp_id", col("update_date").alias("status_change_date"))

🔹 Challenge 3: NTILE Percentile Banding

Problem:
From user_scores(user_id, score), assign users into NTILE(10) bands based on score.


✅ SQL:

SELECT user_id, score,
       NTILE(10) OVER (ORDER BY score DESC) AS percentile_rank
FROM user_scores;

🔁 PySpark:

from pyspark.sql.functions import ntile
from pyspark.sql.window import Window

w = Window.orderBy(col("score").desc())

df_ntile = df.withColumn("percentile_rank", ntile(10).over(w))

🔹 Challenge 4: LEAD + Gap Detection

Problem:
From subscriptions(user_id, start_date, end_date), detect gaps > 30 days between consecutive subscriptions.


✅ SQL:

SELECT user_id, end_date, next_start_date, DATEDIFF(next_start_date, end_date) AS gap_days
FROM (
  SELECT *,
         LEAD(start_date) OVER (PARTITION BY user_id ORDER BY start_date) AS next_start_date
  FROM subscriptions
) t
WHERE DATEDIFF(next_start_date, end_date) > 30;

🔁 PySpark:

from pyspark.sql.functions import lead, datediff

w = Window.partitionBy("user_id").orderBy("start_date")

df_gap = df.withColumn("next_start_date", lead("start_date").over(w)) \
           .withColumn("gap_days", datediff(col("next_start_date"), col("end_date"))) \
           .filter(col("gap_days") > 30) \
           .select("user_id", "end_date", "next_start_date", "gap_days")

🔹 Challenge 5: Grouped Median Using percentile_approx

Problem:
From sales(region, revenue), compute median revenue per region.


✅ SQL (Hive/Presto-style):

SELECT region,
       percentile_approx(revenue, 0.5) AS median_revenue
FROM sales
GROUP BY region;

🔁 PySpark:

df.groupBy("region").agg(
    expr("percentile_approx(revenue, 0.5)").alias("median_revenue")
)

🧩 All 5 Challenges Completed!

Now moving to…


🚀 Stage 2: PySpark + AWS S3 + Bitbucket Mini Project Setup

⚙ Project Plan:

StepTask
✅ 1Ingest CSV/Parquet data from S3
✅ 2Transform with PySpark (apply logic from Stage 1!)
✅ 3Write results back to S3
✅ 4Use Git + Bitbucket for version control
✅ 5Optional: Add Airflow or Bash script for scheduling

📁 Folder Structure:

project-spark-s3/
├── src/
│   ├── ingest.py         # S3 read logic
│   ├── transform.py      # SQL logic using DataFrames
│   ├── write.py          # Write to S3
├── conf/
│   └── config.json       # S3 paths, credentials, options
├── test/
│   └── test_cases.py     # Optional test logic
├── notebooks/
│   └── demo.ipynb        # PySpark + S3 demo
├── README.md
├── .gitignore