Employees Earning More than Their Managersβ in Pyspark sql and dataframe api and python pandas?
Hereβs how to solve the classic problem:
βFind employees who earn more than their managersβ
Weβll solve it in three ways:
- β PySpark SQL
- β PySpark DataFrame API
- β Pandas (Python)
Assume your data is in a table or DataFrame with this schema:
+----+----------+--------+-----------+ | id | name | salary | managerId| +----+----------+--------+-----------+
β 1. PySpark SQL Approach
πΉ Setup
emp_df.createOrReplaceTempView("employees")
πΉ Query
SELECT e.name AS employee_name, e.salary, m.name AS manager_name, m.salary AS manager_salary FROM employees e JOIN employees m ON e.managerId = m.id WHERE e.salary > m.salary
πΉ Run the Query
spark.sql(""" SELECT e.name AS employee_name, e.salary, m.name AS manager_name, m.salary AS manager_salary FROM employees e JOIN employees m ON e.managerId = m.id WHERE e.salary > m.salary """).show()
β 2. PySpark DataFrame API Approach
from pyspark.sql.functions import col e = emp_df.alias("e") m = emp_df.alias("m") result = ( e.join(m, col("e.managerId") == col("m.id")) .where(col("e.salary") > col("m.salary")) .select( col("e.name").alias("employee_name"), col("e.salary").alias("employee_salary"), col("m.name").alias("manager_name"), col("m.salary").alias("manager_salary") ) ) result.show()
β 3. Pandas Approach
πΉ Sample Data
import pandas as pd data = [ {"id": 1, "name": "Alice", "salary": 50000, "managerId": 3}, {"id": 2, "name": "Bob", "salary": 40000, "managerId": 3}, {"id": 3, "name": "Charlie", "salary": 45000, "managerId": None}, ] df = pd.DataFrame(data)
πΉ Self-Join on managerId
merged = df.merge(df, left_on="managerId", right_on="id", suffixes=("_emp", "_mgr")) result = merged[merged["salary_emp"] > merged["salary_mgr"]][ ["name_emp", "salary_emp", "name_mgr", "salary_mgr"] ] result.columns = ["employee_name", "employee_salary", "manager_name", "manager_salary"] print(result)
β Sample Output
employee_name employee_salary manager_name manager_salary Alice 50000 Charlie 45000
To find the second-highest salary for each department in spark sql and pyspark dataframe API and python pandas
Here’s how to find the second-highest salary in each department using:
- β Spark SQL
- β PySpark DataFrame API
- β Python Pandas
π§± Sample Schema
Assume the following columns:
+----+----------+--------+------------+
| id | name | salary | department |
+----+----------+--------+------------+
β 1. Spark SQL
π Register Temp Table
df.createOrReplaceTempView("employees")
π SQL Query using DENSE_RANK()
SELECT *
FROM (
SELECT *,
DENSE_RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank
FROM employees
) ranked
WHERE rank = 2
π Run in PySpark
spark.sql("""
SELECT *
FROM (
SELECT *,
DENSE_RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank
FROM employees
) ranked
WHERE rank = 2
""").show()
β 2. PySpark DataFrame API
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank
windowSpec = Window.partitionBy("department").orderBy(col("salary").desc())
df_with_rank = df.withColumn("rank", dense_rank().over(windowSpec))
second_highest_df = df_with_rank.filter(col("rank") == 2)
second_highest_df.show()
β 3. Python Pandas
import pandas as pd
data = [
{"id": 1, "name": "Alice", "salary": 70000, "department": "HR"},
{"id": 2, "name": "Bob", "salary": 50000, "department": "HR"},
{"id": 3, "name": "Charlie", "salary": 60000, "department": "HR"},
{"id": 4, "name": "David", "salary": 80000, "department": "IT"},
{"id": 5, "name": "Eve", "salary": 70000, "department": "IT"},
]
df = pd.DataFrame(data)
df["rank"] = df.groupby("department")["salary"].rank(method="dense", ascending=False)
second_highest = df[df["rank"] == 2]
print(second_highest)
β Output Example
id | name | salary | department | rank |
---|---|---|---|---|
3 | Charlie | 60000 | HR | 2 |
5 | Eve | 70000 | IT | 2 |
To find Nth-highest instead of second-highest
To find the Nth-highest salary in each department in:
- β PySpark SQL
- β PySpark DataFrame API
- β Pandas
we generalize the logic used for 2nd-highest by using a parameterized rank
(like DENSE_RANK()
or ROW_NUMBER()
).
π§± Sample DataFrame Schema
+----+----------+--------+------------+
| id | name | salary | department |
+----+----------+--------+------------+
β 1. PySpark SQL: Nth-highest salary
πΉ Replace 2
with a parameter like :n
n = 3 # Example: 3rd highest
query = f"""
SELECT *
FROM (
SELECT *,
DENSE_RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rnk
FROM employees
) ranked
WHERE rnk = {n}
"""
df.createOrReplaceTempView("employees")
spark.sql(query).show()
β 2. PySpark DataFrame API: Nth-highest salary
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank, col
n = 3 # Example: 3rd highest
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())
df_with_rank = df.withColumn("rnk", dense_rank().over(window_spec))
nth_highest_df = df_with_rank.filter(col("rnk") == n)
nth_highest_df.show()
β 3. Pandas: Nth-highest salary
import pandas as pd
data = [
{"id": 1, "name": "Alice", "salary": 70000, "department": "HR"},
{"id": 2, "name": "Bob", "salary": 50000, "department": "HR"},
{"id": 3, "name": "Charlie", "salary": 60000, "department": "HR"},
{"id": 4, "name": "David", "salary": 80000, "department": "IT"},
{"id": 5, "name": "Eve", "salary": 70000, "department": "IT"},
{"id": 6, "name": "Frank", "salary": 60000, "department": "IT"},
]
df = pd.DataFrame(data)
n = 3 # Example: 3rd highest
df["rnk"] = df.groupby("department")["salary"].rank(method="dense", ascending=False)
nth_highest = df[df["rnk"] == n]
print(nth_highest)
β οΈ Choose Between:
DENSE_RANK()
β if you want ties to be ranked the same (e.g., 70k, 70k = rank 1)ROW_NUMBER()
β if you want unique ordering even for duplicate salaries
π§ͺ Output Example (3rd-highest)
name | salary | department | rnk |
---|---|---|---|
Charlie | 60000 | HR | 3 |
Frank | 60000 | IT | 3 |
β BONUS: Reusable PySpark Function
def get_nth_highest_salary(df, n, col_dept="department", col_salary="salary"):
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank
window_spec = Window.partitionBy(col_dept).orderBy(col(col_salary).desc())
return (
df.withColumn("rank", dense_rank().over(window_spec))
.filter(col("rank") == n)
)
To calculate the following metrics:
- Total number of visits
- Total number of unique users
- Total number of page views
- Average time spent on each page
- Top 3 pages with the most page views
To calculate key web analytics metrics using:
- β PySpark SQL
- β PySpark DataFrame API
- β Pandas
we assume the dataset has the following schema:
π§± Sample Schema
+------------+--------+---------+-----------------+----------------+ | timestamp | userId | pageUrl | sessionId | timeSpentSecs | +------------+--------+---------+-----------------+----------------+
Where:
timestamp
: datetime of the visituserId
: unique user identifierpageUrl
: page visitedsessionId
: unique per visittimeSpentSecs
: time spent on the pageβ METRICS TO CALCULATE
- Total number of visits β count of
sessionId
- Total number of unique users β count distinct of
userId
- Total number of page views β count of all rows
- Average time spent on each page β avg of
timeSpentSecs
grouped bypageUrl
- Top 3 pages with most views β count grouped by
pageUrl
and top 3β 1. PySpark SQL
df.createOrReplaceTempView("web_logs") spark.sql(""" SELECT COUNT(DISTINCT sessionId) AS total_visits, COUNT(DISTINCT userId) AS unique_users, COUNT(*) AS total_page_views """).show() spark.sql(""" SELECT pageUrl, ROUND(AVG(timeSpentSecs), 2) AS avg_time_spent FROM web_logs GROUP BY pageUrl """).show() spark.sql(""" SELECT pageUrl, COUNT(*) AS views FROM web_logs GROUP BY pageUrl ORDER BY views DESC LIMIT 3 """).show()
β 2. PySpark DataFrame API
from pyspark.sql.functions import countDistinct, avg, count, round # Total visits, users, page views df.agg( countDistinct("sessionId").alias("total_visits"), countDistinct("userId").alias("unique_users"), count("*").alias("total_page_views") ).show() # Average time per page df.groupBy("pageUrl") \ .agg(round(avg("timeSpentSecs"), 2).alias("avg_time_spent")) \ .show() # Top 3 pages by views df.groupBy("pageUrl") \ .count() \ .withColumnRenamed("count", "views") \ .orderBy("views", ascending=False) \ .limit(3) \ .show()
β 3. Pandas
import pandas as pd # Sample DataFrame # df = pd.DataFrame(...) # Metrics total_visits = df['sessionId'].nunique() unique_users = df['userId'].nunique() total_page_views = len(df) print("Total Visits:", total_visits) print("Unique Users:", unique_users) print("Total Page Views:", total_page_views) # Avg time spent on each page avg_time = df.groupby("pageUrl")["timeSpentSecs"].mean().round(2) print("Average Time per Page:") print(avg_time) # Top 3 pages with most views top_pages = df["pageUrl"].value_counts().head(3) print("Top 3 Pages by Views:") print(top_pages)
β Sample Output
Total Visits: 9341 Unique Users: 4120 Total Page Views: 13240 Average Time per Page: /home 45.32 /about 32.55 /contact 29.00 Top 3 Pages by Views: /home 2400 /about 1890 /products 1675
β Bonus:
Want to group these metrics by day, by device type, or compute bounce rate?
Let me know and Iβll tailor it.
Great! Let’s extend the analytics by:
β Adding Time & Device Dimensions
And calculating:
- π Grouped by Day
- π± Grouped by Device Type (e.g., mobile, desktop, tablet)
- π Bounce Rate
β Assumed Extended Schema:
+------------+--------+---------+------------+----------------+-------------+
| timestamp | userId | pageUrl | sessionId | timeSpentSecs | deviceType |
+------------+--------+---------+------------+----------------+-------------+
Weβll need:
timestamp
β extract datesessionId
β to group visitspageUrl
β to track bouncesdeviceType
β for breakdowntimeSpentSecs
β optional for bounce logic (if single page not enough)
β 1. PySpark Grouped by Day & Device Type
from pyspark.sql.functions import to_date, countDistinct, count, avg, col, round
df = df.withColumn("visit_date", to_date("timestamp"))
# Basic grouped metrics
df.groupBy("visit_date", "deviceType").agg(
countDistinct("sessionId").alias("total_visits"),
countDistinct("userId").alias("unique_users"),
count("*").alias("total_page_views"),
round(avg("timeSpentSecs"), 2).alias("avg_time_spent")
).orderBy("visit_date", "deviceType").show()
β 2. PySpark Bounce Rate Calculation
π Definition:
A bounce is when a session includes only 1 page view.
So to calculate bounce rate:
- Count how many sessions have only 1
pageUrl
- Divide by total sessions
# Pages per session
from pyspark.sql.functions import count as _count
pages_per_session = df.groupBy("sessionId").agg(_count("pageUrl").alias("page_views"))
# Mark bounces
bounces_df = pages_per_session.filter(col("page_views") == 1)
# Bounce Rate
total_sessions = df.select("sessionId").distinct().count()
bounces = bounces_df.count()
bounce_rate = round((bounces / total_sessions) * 100, 2)
print(f"Bounce Rate: {bounce_rate}%")
π Bounce Rate by Device
from pyspark.sql.functions import countDistinct
# Join back session-device mapping
session_device = df.select("sessionId", "deviceType").distinct()
bounces_with_device = bounces_df.join(session_device, "sessionId")
bounces_with_device.groupBy("deviceType").agg(
count("*").alias("bounces")
).join(
df.select("sessionId", "deviceType").distinct()
.groupBy("deviceType")
.agg(count("*").alias("total_sessions")),
"deviceType"
).withColumn(
"bounce_rate", round((col("bounces") / col("total_sessions")) * 100, 2)
).select("deviceType", "bounce_rate").show()
β 3. Pandas Version
import pandas as pd
df['visit_date'] = pd.to_datetime(df['timestamp']).dt.date
# Group by day + device
metrics = df.groupby(['visit_date', 'deviceType']).agg({
'sessionId': pd.Series.nunique,
'userId': pd.Series.nunique,
'pageUrl': 'count',
'timeSpentSecs': 'mean'
}).rename(columns={
'sessionId': 'total_visits',
'userId': 'unique_users',
'pageUrl': 'page_views',
'timeSpentSecs': 'avg_time_spent'
}).round(2)
print(metrics)
# Bounce Rate
pages_per_session = df.groupby("sessionId")["pageUrl"].count().reset_index(name="page_views")
bounces = pages_per_session[pages_per_session["page_views"] == 1]
bounce_rate = round((len(bounces) / df["sessionId"].nunique()) * 100, 2)
print(f"Bounce Rate: {bounce_rate}%")
# Bounce Rate by Device
session_device = df.groupby("sessionId")["deviceType"].first().reset_index()
bounces_with_device = bounces.merge(session_device, on="sessionId")
total_sessions_by_device = session_device.groupby("deviceType")["sessionId"].count()
bounces_by_device = bounces_with_device.groupby("deviceType")["sessionId"].count()
bounce_rate_by_device = ((bounces_by_device / total_sessions_by_device) * 100).round(2)
print("Bounce Rate by Device:")
print(bounce_rate_by_device)
π Example Output
visit_date | deviceType | total_visits | unique_users | page_views | avg_time_spent |
---|---|---|---|---|---|
2024-06-10 | mobile | 2340 | 1200 | 3200 | 45.21 |
Bounce Rate: 33.2%
Bounce Rate by Device:
mobile: 35.6%
desktop: 28.1%
tablet: 40.9%