Interview Coding Questions solved in Pyspark sql, dataframe Api and Python Pandas

Employees Earning More than Their Managers– in Pyspark sql and dataframe api and python pandas?

Here’s how to solve the classic problem:

β€œFind employees who earn more than their managers”

We’ll solve it in three ways:

  1. βœ… PySpark SQL
  2. βœ… PySpark DataFrame API
  3. βœ… Pandas (Python)

Assume your data is in a table or DataFrame with this schema:

+----+----------+--------+-----------+
| id | name     | salary | managerId|
+----+----------+--------+-----------+

βœ… 1. PySpark SQL Approach

πŸ”Ή Setup

emp_df.createOrReplaceTempView("employees")

πŸ”Ή Query

SELECT e.name AS employee_name, e.salary, m.name AS manager_name, m.salary AS manager_salary
FROM employees e
JOIN employees m
  ON e.managerId = m.id
WHERE e.salary > m.salary

πŸ”Ή Run the Query

spark.sql("""
    SELECT e.name AS employee_name, e.salary, m.name AS manager_name, m.salary AS manager_salary
    FROM employees e
    JOIN employees m
      ON e.managerId = m.id
    WHERE e.salary > m.salary
""").show()

βœ… 2. PySpark DataFrame API Approach

from pyspark.sql.functions import col

e = emp_df.alias("e")
m = emp_df.alias("m")

result = (
    e.join(m, col("e.managerId") == col("m.id"))
     .where(col("e.salary") > col("m.salary"))
     .select(
         col("e.name").alias("employee_name"),
         col("e.salary").alias("employee_salary"),
         col("m.name").alias("manager_name"),
         col("m.salary").alias("manager_salary")
     )
)

result.show()

βœ… 3. Pandas Approach

πŸ”Ή Sample Data

import pandas as pd

data = [
    {"id": 1, "name": "Alice", "salary": 50000, "managerId": 3},
    {"id": 2, "name": "Bob", "salary": 40000, "managerId": 3},
    {"id": 3, "name": "Charlie", "salary": 45000, "managerId": None},
]

df = pd.DataFrame(data)

πŸ”Ή Self-Join on managerId

merged = df.merge(df, left_on="managerId", right_on="id", suffixes=("_emp", "_mgr"))
result = merged[merged["salary_emp"] > merged["salary_mgr"]][
    ["name_emp", "salary_emp", "name_mgr", "salary_mgr"]
]

result.columns = ["employee_name", "employee_salary", "manager_name", "manager_salary"]
print(result)

βœ… Sample Output

employee_nameemployee_salarymanager_namemanager_salary
Alice50000Charlie45000

To find the second-highest salary for each department in spark sql and pyspark dataframe API and python pandas

Here’s how to find the second-highest salary in each department using:

  • βœ… Spark SQL
  • βœ… PySpark DataFrame API
  • βœ… Python Pandas

🧱 Sample Schema

Assume the following columns:

+----+----------+--------+------------+
| id | name     | salary | department |
+----+----------+--------+------------+

βœ… 1. Spark SQL

πŸ“Œ Register Temp Table

df.createOrReplaceTempView("employees")

πŸ“Œ SQL Query using DENSE_RANK()

SELECT *
FROM (
    SELECT *,
           DENSE_RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank
    FROM employees
) ranked
WHERE rank = 2

πŸ“Œ Run in PySpark

spark.sql("""
    SELECT *
    FROM (
        SELECT *,
               DENSE_RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank
        FROM employees
    ) ranked
    WHERE rank = 2
""").show()

βœ… 2. PySpark DataFrame API

from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank

windowSpec = Window.partitionBy("department").orderBy(col("salary").desc())

df_with_rank = df.withColumn("rank", dense_rank().over(windowSpec))

second_highest_df = df_with_rank.filter(col("rank") == 2)
second_highest_df.show()

βœ… 3. Python Pandas

import pandas as pd

data = [
    {"id": 1, "name": "Alice", "salary": 70000, "department": "HR"},
    {"id": 2, "name": "Bob", "salary": 50000, "department": "HR"},
    {"id": 3, "name": "Charlie", "salary": 60000, "department": "HR"},
    {"id": 4, "name": "David", "salary": 80000, "department": "IT"},
    {"id": 5, "name": "Eve", "salary": 70000, "department": "IT"},
]

df = pd.DataFrame(data)

df["rank"] = df.groupby("department")["salary"].rank(method="dense", ascending=False)

second_highest = df[df["rank"] == 2]
print(second_highest)

βœ… Output Example

idnamesalarydepartmentrank
3Charlie60000HR2
5Eve70000IT2

To find Nth-highest instead of second-highest

To find the Nth-highest salary in each department in:

  • βœ… PySpark SQL
  • βœ… PySpark DataFrame API
  • βœ… Pandas

we generalize the logic used for 2nd-highest by using a parameterized rank (like DENSE_RANK() or ROW_NUMBER()).


🧱 Sample DataFrame Schema

+----+----------+--------+------------+
| id | name     | salary | department |
+----+----------+--------+------------+

βœ… 1. PySpark SQL: Nth-highest salary

πŸ”Ή Replace 2 with a parameter like :n

n = 3  # Example: 3rd highest

query = f"""
    SELECT *
    FROM (
        SELECT *,
               DENSE_RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rnk
        FROM employees
    ) ranked
    WHERE rnk = {n}
"""

df.createOrReplaceTempView("employees")
spark.sql(query).show()

βœ… 2. PySpark DataFrame API: Nth-highest salary

from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank, col

n = 3  # Example: 3rd highest

window_spec = Window.partitionBy("department").orderBy(col("salary").desc())

df_with_rank = df.withColumn("rnk", dense_rank().over(window_spec))

nth_highest_df = df_with_rank.filter(col("rnk") == n)
nth_highest_df.show()

βœ… 3. Pandas: Nth-highest salary

import pandas as pd

data = [
    {"id": 1, "name": "Alice", "salary": 70000, "department": "HR"},
    {"id": 2, "name": "Bob", "salary": 50000, "department": "HR"},
    {"id": 3, "name": "Charlie", "salary": 60000, "department": "HR"},
    {"id": 4, "name": "David", "salary": 80000, "department": "IT"},
    {"id": 5, "name": "Eve", "salary": 70000, "department": "IT"},
    {"id": 6, "name": "Frank", "salary": 60000, "department": "IT"},
]

df = pd.DataFrame(data)
n = 3  # Example: 3rd highest

df["rnk"] = df.groupby("department")["salary"].rank(method="dense", ascending=False)

nth_highest = df[df["rnk"] == n]
print(nth_highest)

⚠️ Choose Between:

  • DENSE_RANK() – if you want ties to be ranked the same (e.g., 70k, 70k = rank 1)
  • ROW_NUMBER() – if you want unique ordering even for duplicate salaries

πŸ§ͺ Output Example (3rd-highest)

namesalarydepartmentrnk
Charlie60000HR3
Frank60000IT3

βœ… BONUS: Reusable PySpark Function

def get_nth_highest_salary(df, n, col_dept="department", col_salary="salary"):
    from pyspark.sql.window import Window
    from pyspark.sql.functions import dense_rank

    window_spec = Window.partitionBy(col_dept).orderBy(col(col_salary).desc())
    return (
        df.withColumn("rank", dense_rank().over(window_spec))
          .filter(col("rank") == n)
    )

To calculate the following metrics:

  1. Total number of visits
  2. Total number of unique users
  3. Total number of page views
  4. Average time spent on each page
  5. Top 3 pages with the most page views

To calculate key web analytics metrics using:

  • βœ… PySpark SQL
  • βœ… PySpark DataFrame API
  • βœ… Pandas

we assume the dataset has the following schema:


🧱 Sample Schema

+------------+--------+---------+-----------------+----------------+
| timestamp  | userId | pageUrl | sessionId       | timeSpentSecs  |
+------------+--------+---------+-----------------+----------------+

Where:

  • timestamp: datetime of the visit
  • userId: unique user identifier
  • pageUrl: page visited
  • sessionId: unique per visit
  • timeSpentSecs: time spent on the page

βœ… METRICS TO CALCULATE

  1. Total number of visits β†’ count of sessionId
  2. Total number of unique users β†’ count distinct of userId
  3. Total number of page views β†’ count of all rows
  4. Average time spent on each page β†’ avg of timeSpentSecs grouped by pageUrl
  5. Top 3 pages with most views β†’ count grouped by pageUrl and top 3

βœ… 1. PySpark SQL

df.createOrReplaceTempView("web_logs")

spark.sql("""
SELECT
    COUNT(DISTINCT sessionId) AS total_visits,
    COUNT(DISTINCT userId) AS unique_users,
    COUNT(*) AS total_page_views
""").show()

spark.sql("""
SELECT pageUrl,
       ROUND(AVG(timeSpentSecs), 2) AS avg_time_spent
FROM web_logs
GROUP BY pageUrl
""").show()

spark.sql("""
SELECT pageUrl,
       COUNT(*) AS views
FROM web_logs
GROUP BY pageUrl
ORDER BY views DESC
LIMIT 3
""").show()

βœ… 2. PySpark DataFrame API

from pyspark.sql.functions import countDistinct, avg, count, round

# Total visits, users, page views
df.agg(
    countDistinct("sessionId").alias("total_visits"),
    countDistinct("userId").alias("unique_users"),
    count("*").alias("total_page_views")
).show()

# Average time per page
df.groupBy("pageUrl") \
  .agg(round(avg("timeSpentSecs"), 2).alias("avg_time_spent")) \
  .show()

# Top 3 pages by views
df.groupBy("pageUrl") \
  .count() \
  .withColumnRenamed("count", "views") \
  .orderBy("views", ascending=False) \
  .limit(3) \
  .show()

βœ… 3. Pandas

import pandas as pd

# Sample DataFrame
# df = pd.DataFrame(...)

# Metrics
total_visits = df['sessionId'].nunique()
unique_users = df['userId'].nunique()
total_page_views = len(df)

print("Total Visits:", total_visits)
print("Unique Users:", unique_users)
print("Total Page Views:", total_page_views)

# Avg time spent on each page
avg_time = df.groupby("pageUrl")["timeSpentSecs"].mean().round(2)
print("Average Time per Page:")
print(avg_time)

# Top 3 pages with most views
top_pages = df["pageUrl"].value_counts().head(3)
print("Top 3 Pages by Views:")
print(top_pages)

βœ… Sample Output

Total Visits: 9341
Unique Users: 4120
Total Page Views: 13240

Average Time per Page:
/home      45.32
/about     32.55
/contact   29.00

Top 3 Pages by Views:
/home       2400
/about      1890
/products   1675

βœ… Bonus:

Want to group these metrics by day, by device type, or compute bounce rate?
Let me know and I’ll tailor it.

Great! Let’s extend the analytics by:


βœ… Adding Time & Device Dimensions

And calculating:

  1. πŸ“… Grouped by Day
  2. πŸ“± Grouped by Device Type (e.g., mobile, desktop, tablet)
  3. πŸ” Bounce Rate

βœ… Assumed Extended Schema:

+------------+--------+---------+------------+----------------+-------------+
| timestamp  | userId | pageUrl | sessionId  | timeSpentSecs  | deviceType  |
+------------+--------+---------+------------+----------------+-------------+

We’ll need:

  • timestamp β†’ extract date
  • sessionId β†’ to group visits
  • pageUrl β†’ to track bounces
  • deviceType β†’ for breakdown
  • timeSpentSecs β†’ optional for bounce logic (if single page not enough)

βœ… 1. PySpark Grouped by Day & Device Type

from pyspark.sql.functions import to_date, countDistinct, count, avg, col, round

df = df.withColumn("visit_date", to_date("timestamp"))

# Basic grouped metrics
df.groupBy("visit_date", "deviceType").agg(
    countDistinct("sessionId").alias("total_visits"),
    countDistinct("userId").alias("unique_users"),
    count("*").alias("total_page_views"),
    round(avg("timeSpentSecs"), 2).alias("avg_time_spent")
).orderBy("visit_date", "deviceType").show()

βœ… 2. PySpark Bounce Rate Calculation

πŸ“Œ Definition:

A bounce is when a session includes only 1 page view.

So to calculate bounce rate:

  • Count how many sessions have only 1 pageUrl
  • Divide by total sessions
# Pages per session
from pyspark.sql.functions import count as _count

pages_per_session = df.groupBy("sessionId").agg(_count("pageUrl").alias("page_views"))

# Mark bounces
bounces_df = pages_per_session.filter(col("page_views") == 1)

# Bounce Rate
total_sessions = df.select("sessionId").distinct().count()
bounces = bounces_df.count()

bounce_rate = round((bounces / total_sessions) * 100, 2)
print(f"Bounce Rate: {bounce_rate}%")

πŸ“Œ Bounce Rate by Device

from pyspark.sql.functions import countDistinct

# Join back session-device mapping
session_device = df.select("sessionId", "deviceType").distinct()

bounces_with_device = bounces_df.join(session_device, "sessionId")

bounces_with_device.groupBy("deviceType").agg(
    count("*").alias("bounces")
).join(
    df.select("sessionId", "deviceType").distinct()
      .groupBy("deviceType")
      .agg(count("*").alias("total_sessions")),
    "deviceType"
).withColumn(
    "bounce_rate", round((col("bounces") / col("total_sessions")) * 100, 2)
).select("deviceType", "bounce_rate").show()

βœ… 3. Pandas Version

import pandas as pd

df['visit_date'] = pd.to_datetime(df['timestamp']).dt.date

# Group by day + device
metrics = df.groupby(['visit_date', 'deviceType']).agg({
    'sessionId': pd.Series.nunique,
    'userId': pd.Series.nunique,
    'pageUrl': 'count',
    'timeSpentSecs': 'mean'
}).rename(columns={
    'sessionId': 'total_visits',
    'userId': 'unique_users',
    'pageUrl': 'page_views',
    'timeSpentSecs': 'avg_time_spent'
}).round(2)

print(metrics)

# Bounce Rate
pages_per_session = df.groupby("sessionId")["pageUrl"].count().reset_index(name="page_views")
bounces = pages_per_session[pages_per_session["page_views"] == 1]

bounce_rate = round((len(bounces) / df["sessionId"].nunique()) * 100, 2)
print(f"Bounce Rate: {bounce_rate}%")

# Bounce Rate by Device
session_device = df.groupby("sessionId")["deviceType"].first().reset_index()

bounces_with_device = bounces.merge(session_device, on="sessionId")
total_sessions_by_device = session_device.groupby("deviceType")["sessionId"].count()
bounces_by_device = bounces_with_device.groupby("deviceType")["sessionId"].count()

bounce_rate_by_device = ((bounces_by_device / total_sessions_by_device) * 100).round(2)
print("Bounce Rate by Device:")
print(bounce_rate_by_device)

πŸ“ˆ Example Output

visit_datedeviceTypetotal_visitsunique_userspage_viewsavg_time_spent
2024-06-10mobile23401200320045.21

Bounce Rate: 33.2%
Bounce Rate by Device:

mobile: 35.6%
desktop: 28.1%
tablet: 40.9%