Here are Spark SQL join questions that are complex, interview-oriented, and hands-on — each with sample data and expected output to test real-world logic.


✅ Setup: Sample DataFrames

🔹 Employee Table (emp)

+-----+---------+--------+------------+
|emp_id| emp_name|dept_id |  salary    |
+-----+---------+--------+------------+
| 101 |  Alice  |   1    |   50000     |
| 102 |  Bob    |   2    |   60000     |
| 103 |  Carol  |   1    |   55000     |
| 104 |  David  |   3    |   40000     |
| 105 |  Eve    |  null  |   70000     |
+-----+---------+--------+------------+

🔹 Department Table (dept)

+--------+-------------+
|dept_id | dept_name   |
+--------+-------------+
| 1      | HR          |
| 2      | Engineering |
| 3      | Sales       |
| 4      | Finance     |
+--------+-------------+

🧠 1. Find all employees, including those without a department. Show department name as Unknown if not available.

🧩 Query:

SELECT 
  e.emp_id, 
  e.emp_name, 
  e.salary, 
  COALESCE(d.dept_name, 'Unknown') AS dept_name
FROM emp e
LEFT JOIN dept d
ON e.dept_id = d.dept_id;

🧠 2. Find departments that have no employees assigned.

🧩 Query:

SELECT 
  d.dept_id, 
  d.dept_name
FROM dept d
LEFT ANTI JOIN emp e
ON d.dept_id = e.dept_id;

🧠 3. Find top paid employee in each department (with dept name).

🧩 Query:

WITH ranked AS (
  SELECT 
    e.*, 
    d.dept_name,
    RANK() OVER (PARTITION BY e.dept_id ORDER BY e.salary DESC) AS rnk
  FROM emp e
  LEFT JOIN dept d ON e.dept_id = d.dept_id
)
SELECT emp_id, emp_name, dept_name, salary
FROM ranked
WHERE rnk = 1;

🧠 4. Find employees who earn more than the average salary of their department.

🧩 Query:

WITH avg_sal AS (
  SELECT dept_id, AVG(salary) AS avg_dept_salary
  FROM emp
  GROUP BY dept_id
)
SELECT 
  e.emp_id, 
  e.emp_name, 
  e.salary,
  a.avg_dept_salary
FROM emp e
JOIN avg_sal a
ON e.dept_id = a.dept_id
WHERE e.salary > a.avg_dept_salary;

🧠 5. Self-join to find all employee pairs from the same department (excluding self-pairing).

🧩 Query:

SELECT 
  e1.emp_name AS emp1, 
  e2.emp_name AS emp2, 
  d.dept_name
FROM emp e1
JOIN emp e2 
  ON e1.dept_id = e2.dept_id AND e1.emp_id < e2.emp_id
JOIN dept d 
  ON e1.dept_id = d.dept_id;

🧠 6. Identify departments with more than 1 employee. Show department name and count.

🧩 Query:

SELECT 
  d.dept_name,
  COUNT(e.emp_id) AS emp_count
FROM dept d
JOIN emp e ON d.dept_id = e.dept_id
GROUP BY d.dept_name
HAVING COUNT(e.emp_id) > 1;

🧠 7. Perform a full outer join between emp and dept. Show unmatched rows too.

🧩 Query:

SELECT 
  e.emp_id, 
  e.emp_name, 
  e.dept_id AS emp_dept_id,
  d.dept_id AS dept_dept_id,
  d.dept_name
FROM emp e
FULL OUTER JOIN dept d
ON e.dept_id = d.dept_id;

💡 Bonus: Create Views and Run Spark SQL

Register temp views in PySpark:

emp_df.createOrReplaceTempView("emp")
dept_df.createOrReplaceTempView("dept")

spark.sql("""
  -- paste any of the above queries here
""").show()

Perfect. Let’s now turn the SQL join examples into full PySpark DataFrame API code, with:

  1. ✅ Sample data loading script
  2. 🔁 Equivalent DataFrame join logic for each query
  3. ⚡ Performance tips on joins and partitioning

✅ 1. Data Loading: Sample emp and dept DataFrames

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, coalesce, avg, rank, count
from pyspark.sql.window import Window

spark = SparkSession.builder.getOrCreate()

# Employee Data
emp_data = [
    (101, "Alice", 1, 50000),
    (102, "Bob", 2, 60000),
    (103, "Carol", 1, 55000),
    (104, "David", 3, 40000),
    (105, "Eve", None, 70000)
]
emp_cols = ["emp_id", "emp_name", "dept_id", "salary"]
emp_df = spark.createDataFrame(emp_data, emp_cols)

# Department Data
dept_data = [
    (1, "HR"),
    (2, "Engineering"),
    (3, "Sales"),
    (4, "Finance")
]
dept_cols = ["dept_id", "dept_name"]
dept_df = spark.createDataFrame(dept_data, dept_cols)

🔁 2. Queries in PySpark DataFrame API

🧠 Query 1: Employees with dept name (Unknown if missing)

emp_df.join(dept_df, on="dept_id", how="left") \
    .select("emp_id", "emp_name", "salary", coalesce("dept_name", lit("Unknown")).alias("dept_name")) \
    .show()

🧠 Query 2: Departments with no employees (LEFT ANTI JOIN)

dept_df.join(emp_df, on="dept_id", how="left_anti").show()

🧠 Query 3: Top paid employee per department

window_spec = Window.partitionBy("dept_id").orderBy(col("salary").desc())

emp_df.join(dept_df, "dept_id", "left") \
    .withColumn("rnk", rank().over(window_spec)) \
    .filter(col("rnk") == 1) \
    .select("emp_id", "emp_name", "dept_name", "salary") \
    .show()

🧠 Query 4: Employees earning more than dept average

avg_sal_df = emp_df.groupBy("dept_id").agg(avg("salary").alias("avg_salary"))

emp_df.join(avg_sal_df, "dept_id") \
    .filter(col("salary") > col("avg_salary")) \
    .select("emp_id", "emp_name", "salary", "avg_salary") \
    .show()

🧠 Query 5: Employee pairs from the same dept (Self Join)

emp_df.alias("e1") \
    .join(emp_df.alias("e2"),
          (col("e1.dept_id") == col("e2.dept_id")) & (col("e1.emp_id") < col("e2.emp_id"))) \
    .join(dept_df, col("e1.dept_id") == dept_df.dept_id) \
    .select(col("e1.emp_name").alias("emp1"),
            col("e2.emp_name").alias("emp2"),
            col("dept_name")) \
    .show()

🧠 Query 6: Departments with more than 1 employee

emp_df.join(dept_df, "dept_id") \
    .groupBy("dept_name") \
    .agg(count("emp_id").alias("emp_count")) \
    .filter(col("emp_count") > 1) \
    .show()

🧠 Query 7: Full outer join showing unmatched rows too

emp_df.alias("e") \
    .join(dept_df.alias("d"), col("e.dept_id") == col("d.dept_id"), how="outer") \
    .select("emp_id", "emp_name", col("e.dept_id").alias("emp_dept_id"),
            col("d.dept_id").alias("dept_dept_id"), "dept_name") \
    .show()

⚡ Performance Tips on Joins in PySpark

Join TypeWhen to UseTips
BroadcastWhen one table is small (< 10MB)Use broadcast(df) to force small table to be broadcast
Shuffle Hash JoinDefault for large tables (non-skewed)Repartition large tables on join key for better parallelism
Sort Merge JoinBest when both sides are large and sortedRequires sorting but scales better for large data
Skew JoinWhen one join key has high frequencyUse Salting or Skew hints to mitigate performance bottlenecks
Partition PruningWhen filtering on partitioned columnPush filters down early, avoid loading full data unnecessarily

🔧 How to Broadcast Join:

from pyspark.sql.functions import broadcast
emp_df.join(broadcast(dept_df), "dept_id").show()

🔧 How to Repartition Before Join:

emp_df.repartition("dept_id") \
    .join(dept_df.repartition("dept_id"), "dept_id") \
    .show()

Pages: 1 2 3


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Posted in

Leave a Reply

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading