Here are Spark SQL join questions that are complex, interview-oriented, and hands-on — each with sample data and expected output to test real-world logic.
✅ Setup: Sample DataFrames
🔹 Employee Table (emp
)
+-----+---------+--------+------------+
|emp_id| emp_name|dept_id | salary |
+-----+---------+--------+------------+
| 101 | Alice | 1 | 50000 |
| 102 | Bob | 2 | 60000 |
| 103 | Carol | 1 | 55000 |
| 104 | David | 3 | 40000 |
| 105 | Eve | null | 70000 |
+-----+---------+--------+------------+
🔹 Department Table (dept
)
+--------+-------------+
|dept_id | dept_name |
+--------+-------------+
| 1 | HR |
| 2 | Engineering |
| 3 | Sales |
| 4 | Finance |
+--------+-------------+
🧠 1. Find all employees, including those without a department. Show department name as Unknown
if not available.
🧩 Query:
SELECT
e.emp_id,
e.emp_name,
e.salary,
COALESCE(d.dept_name, 'Unknown') AS dept_name
FROM emp e
LEFT JOIN dept d
ON e.dept_id = d.dept_id;
🧠 2. Find departments that have no employees assigned.
🧩 Query:
SELECT
d.dept_id,
d.dept_name
FROM dept d
LEFT ANTI JOIN emp e
ON d.dept_id = e.dept_id;
🧠 3. Find top paid employee in each department (with dept name).
🧩 Query:
WITH ranked AS (
SELECT
e.*,
d.dept_name,
RANK() OVER (PARTITION BY e.dept_id ORDER BY e.salary DESC) AS rnk
FROM emp e
LEFT JOIN dept d ON e.dept_id = d.dept_id
)
SELECT emp_id, emp_name, dept_name, salary
FROM ranked
WHERE rnk = 1;
🧠 4. Find employees who earn more than the average salary of their department.
🧩 Query:
WITH avg_sal AS (
SELECT dept_id, AVG(salary) AS avg_dept_salary
FROM emp
GROUP BY dept_id
)
SELECT
e.emp_id,
e.emp_name,
e.salary,
a.avg_dept_salary
FROM emp e
JOIN avg_sal a
ON e.dept_id = a.dept_id
WHERE e.salary > a.avg_dept_salary;
🧠 5. Self-join to find all employee pairs from the same department (excluding self-pairing).
🧩 Query:
SELECT
e1.emp_name AS emp1,
e2.emp_name AS emp2,
d.dept_name
FROM emp e1
JOIN emp e2
ON e1.dept_id = e2.dept_id AND e1.emp_id < e2.emp_id
JOIN dept d
ON e1.dept_id = d.dept_id;
🧠 6. Identify departments with more than 1 employee. Show department name and count.
🧩 Query:
SELECT
d.dept_name,
COUNT(e.emp_id) AS emp_count
FROM dept d
JOIN emp e ON d.dept_id = e.dept_id
GROUP BY d.dept_name
HAVING COUNT(e.emp_id) > 1;
🧠 7. Perform a full outer join between emp
and dept
. Show unmatched rows too.
🧩 Query:
SELECT
e.emp_id,
e.emp_name,
e.dept_id AS emp_dept_id,
d.dept_id AS dept_dept_id,
d.dept_name
FROM emp e
FULL OUTER JOIN dept d
ON e.dept_id = d.dept_id;
💡 Bonus: Create Views and Run Spark SQL
Register temp views in PySpark:
emp_df.createOrReplaceTempView("emp")
dept_df.createOrReplaceTempView("dept")
spark.sql("""
-- paste any of the above queries here
""").show()
Perfect. Let’s now turn the SQL join examples into full PySpark DataFrame API code, with:
- ✅ Sample data loading script
- 🔁 Equivalent DataFrame join logic for each query
- ⚡ Performance tips on joins and partitioning
✅ 1. Data Loading: Sample emp
and dept
DataFrames
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, coalesce, avg, rank, count
from pyspark.sql.window import Window
spark = SparkSession.builder.getOrCreate()
# Employee Data
emp_data = [
(101, "Alice", 1, 50000),
(102, "Bob", 2, 60000),
(103, "Carol", 1, 55000),
(104, "David", 3, 40000),
(105, "Eve", None, 70000)
]
emp_cols = ["emp_id", "emp_name", "dept_id", "salary"]
emp_df = spark.createDataFrame(emp_data, emp_cols)
# Department Data
dept_data = [
(1, "HR"),
(2, "Engineering"),
(3, "Sales"),
(4, "Finance")
]
dept_cols = ["dept_id", "dept_name"]
dept_df = spark.createDataFrame(dept_data, dept_cols)
🔁 2. Queries in PySpark DataFrame API
🧠 Query 1: Employees with dept name (Unknown
if missing)
emp_df.join(dept_df, on="dept_id", how="left") \
.select("emp_id", "emp_name", "salary", coalesce("dept_name", lit("Unknown")).alias("dept_name")) \
.show()
🧠 Query 2: Departments with no employees (LEFT ANTI JOIN)
dept_df.join(emp_df, on="dept_id", how="left_anti").show()
🧠 Query 3: Top paid employee per department
window_spec = Window.partitionBy("dept_id").orderBy(col("salary").desc())
emp_df.join(dept_df, "dept_id", "left") \
.withColumn("rnk", rank().over(window_spec)) \
.filter(col("rnk") == 1) \
.select("emp_id", "emp_name", "dept_name", "salary") \
.show()
🧠 Query 4: Employees earning more than dept average
avg_sal_df = emp_df.groupBy("dept_id").agg(avg("salary").alias("avg_salary"))
emp_df.join(avg_sal_df, "dept_id") \
.filter(col("salary") > col("avg_salary")) \
.select("emp_id", "emp_name", "salary", "avg_salary") \
.show()
🧠 Query 5: Employee pairs from the same dept (Self Join)
emp_df.alias("e1") \
.join(emp_df.alias("e2"),
(col("e1.dept_id") == col("e2.dept_id")) & (col("e1.emp_id") < col("e2.emp_id"))) \
.join(dept_df, col("e1.dept_id") == dept_df.dept_id) \
.select(col("e1.emp_name").alias("emp1"),
col("e2.emp_name").alias("emp2"),
col("dept_name")) \
.show()
🧠 Query 6: Departments with more than 1 employee
emp_df.join(dept_df, "dept_id") \
.groupBy("dept_name") \
.agg(count("emp_id").alias("emp_count")) \
.filter(col("emp_count") > 1) \
.show()
🧠 Query 7: Full outer join showing unmatched rows too
emp_df.alias("e") \
.join(dept_df.alias("d"), col("e.dept_id") == col("d.dept_id"), how="outer") \
.select("emp_id", "emp_name", col("e.dept_id").alias("emp_dept_id"),
col("d.dept_id").alias("dept_dept_id"), "dept_name") \
.show()
⚡ Performance Tips on Joins in PySpark
Join Type | When to Use | Tips |
---|---|---|
Broadcast | When one table is small (< 10MB) | Use broadcast(df) to force small table to be broadcast |
Shuffle Hash Join | Default for large tables (non-skewed) | Repartition large tables on join key for better parallelism |
Sort Merge Join | Best when both sides are large and sorted | Requires sorting but scales better for large data |
Skew Join | When one join key has high frequency | Use Salting or Skew hints to mitigate performance bottlenecks |
Partition Pruning | When filtering on partitioned column | Push filters down early, avoid loading full data unnecessarily |
🔧 How to Broadcast Join:
from pyspark.sql.functions import broadcast
emp_df.join(broadcast(dept_df), "dept_id").show()
🔧 How to Repartition Before Join:
emp_df.repartition("dept_id") \
.join(dept_df.repartition("dept_id"), "dept_id") \
.show()
Leave a Reply