Great question — understanding SparkSession
vs SparkContext
is essential, especially when dealing with RDDs, DataFrames, or any Spark internals.
🔍 TL;DR Difference
Feature | SparkContext | SparkSession (since Spark 2.0+) |
---|---|---|
Purpose | Low-level entry point to Spark functionality | Unified entry point to Spark: SQL, Streaming, Hive, RDD |
API Focus | RDDs only | DataFrames, Datasets, SQL, RDDs |
Usage (Modern) | Used through SparkSession.sparkContext | Recommended for all modern Spark apps |
Standalone? | Used in early Spark apps directly | Wraps SparkContext , SQLContext , HiveContext etc. |
🧠 Internal Creation: How They Work Together
✅ 1. SparkSession
includes SparkContext
When you create a SparkSession
, it internally creates a SparkContext
(or uses an existing one):
spark = SparkSession.builder \
.appName("MyApp") \
.getOrCreate()
sc = spark.sparkContext # <- This is your SparkContext
So:
SparkSession
is a wrapper/factory forSparkContext
,SQLContext
,HiveContext
, etc.SparkContext
is still used under the hood, especially when working with RDDs.
🧱 Dependency Graph
SparkSession
|
+-- sparkContext -> instance of SparkContext
+-- sqlContext -> for SQL/DataFrame APIs
+-- catalog
RDD API <--- uses ---> SparkContext
DF API <--- uses ---> SQLContext & SparkContext
🌀 RDD Context
If you need to work with RDDs, you still use the SparkContext
:
rdd = spark.sparkContext.parallelize([1, 2, 3])
rdd.map(lambda x: x * 2).collect()
Even in a DataFrame-based app, RDD actions are routed via
sparkContext
, which in turn communicates with the driver and cluster manager.
🎯 Real Use Case Difference
Task | Which One? | Why? |
---|---|---|
RDD transformation | SparkContext | RDDs are SparkContext-based |
DataFrame read/write | SparkSession | Unified entry point for IO |
SQL Queries | SparkSession | Has .sql() method |
Streaming (Structured) | SparkSession | Handles streaming DFs |
Spark on Hive Tables | SparkSession | Has Hive catalog support |
🛠️ Creation Example
Older (Pre-Spark 2.0)
from pyspark import SparkContext
sc = SparkContext(appName="OldApp")
rdd = sc.parallelize([1, 2, 3])
Modern (Post-Spark 2.0)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ModernApp").getOrCreate()
rdd = spark.sparkContext.parallelize([1, 2, 3])
⚠️ Note
- You cannot have more than one active
SparkContext
per JVM. - But you can call
SparkSession.builder.getOrCreate()
multiple times — it reuses the existingSparkContext
.
📌 Summary
Concept | Explanation |
---|---|
SparkContext | Backbone of Spark, low-level API, needed for RDDs |
SparkSession | Higher-level abstraction that includes SparkContext |
RDD Dependency | All RDD operations go through sparkContext |
Internally | SparkSession → creates or wraps → SparkContext |
Introduction to RDDs
Resilient Distributed Datasets (RDDs) are the fundamental data structure of Spark. They are:
- Immutable: Cannot be modified after creation
- Distributed: Data is partitioned across cluster
- Resilient: Can recompute lost partitions due to node failures
- Lazy evaluated: Computed only when actions are called
Key Characteristics:
- In-memory computation
- Fault tolerance through lineage
- Parallel operations
- Ability to use many data sources
RDD Lineage
RDD lineage is a graph of all the parent RDDs of an RDD. It’s built as a result of applying transformations to the RDD.
# Example showing lineage
rdd1 = sc.parallelize([1, 2, 3, 4])
rdd2 = rdd1.map(lambda x: x*2)
rdd3 = rdd2.filter(lambda x: x > 4)
rdd4 = rdd3.reduce(lambda a, b: a + b)
# To see lineage
print(rdd3.toDebugString().decode())
Output would show the transformation steps:
(2) PythonRDD[3] at RDD at PythonRDD.scala:53 []
| MapPartitionsRDD[2] at map at <stdin>:1 []
| ParallelCollectionRDD[0] at parallelize at <stdin>:1 []
Creating RDDs
From collections:
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
From external datasets:
# Text file
rdd = sc.textFile("hdfs://path/to/file.txt")
# Whole text files (returns filename, content pairs)
rdd = sc.wholeTextFiles("hdfs://path/to/directory")
From DataFrames:
df = spark.createDataFrame([(1, "A"), (2, "B")], ["id", "value"])
rdd = df.rdd # Convert to RDD of Rows
Transformations and Actions
Common Transformations:
Normal RDDs:
# Map
rdd.map(lambda x: x*2)
# Filter
rdd.filter(lambda x: x > 10)
# Distinct
rdd.distinct()
# FlatMap
rdd.flatMap(lambda x: x.split(" "))
# Sample
rdd.sample(withReplacement=False, fraction=0.5)
# Union
rdd.union(other_rdd)
# Intersection
rdd.intersection(other_rdd)
# Cartesian
rdd.cartesian(other_rdd)
Pair RDDs (covered in next section):
# reduceByKey
# groupByKey
# join, etc.
Common Actions:
# Collect (be careful with large datasets)
rdd.collect()
# Count
rdd.count()
# First
rdd.first()
# Take
rdd.take(5)
# Reduce
rdd.reduce(lambda a, b: a + b)
# Aggregate
rdd.aggregate((0, 0),
lambda acc, value: (acc[0] + value, acc[1] + 1),
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))
# Save
rdd.saveAsTextFile("output_path")
Pair RDDs
Pair RDDs are RDDs where each element is a key-value pair. They enable operations that act on each key.
Creating Pair RDDs:
# From a normal RDD
pair_rdd = rdd.map(lambda x: (x, 1))
# From a list of tuples
data = [("a", 1), ("b", 2)]
pair_rdd = sc.parallelize(data)
Key Transformations on Pair RDDs:
# reduceByKey - merge values for each key
pair_rdd.reduceByKey(lambda a, b: a + b)
# groupByKey - group values by key
pair_rdd.groupByKey()
# sortByKey - return RDD sorted by key
pair_rdd.sortByKey()
# join - inner join between two pair RDDs
rdd1.join(rdd2)
# leftOuterJoin / rightOuterJoin
rdd1.leftOuterJoin(rdd2)
# cogroup - group data from both RDDs sharing the same key
rdd1.cogroup(rdd2)
# subtractByKey - remove elements with a key present in other RDD
rdd1.subtractByKey(rdd2)
# keys - return RDD of just the keys
pair_rdd.keys()
# values - return RDD of just the values
pair_rdd.values()
Example: Word Count
text_rdd = sc.textFile("hdfs://path/to/textfile.txt")
words = text_rdd.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
word_counts.collect()
Advanced RDD Operations
Custom Partitioning:
# Partition by custom function
rdd = sc.parallelize(range(100))
rdd = rdd.partitionBy(4, lambda x: x % 4)
# Check partitions
rdd.glom().collect() # Shows data in each partition
Accumulators:
# Create accumulator
accum = sc.accumulator(0)
# Use in transformations
rdd = sc.parallelize([1, 2, 3, 4])
rdd.foreach(lambda x: accum.add(x))
print(accum.value) # 10
Broadcast Variables:
# Create broadcast variable
broadcast_var = sc.broadcast([1, 2, 3])
# Use in transformations
rdd = sc.parallelize([1, 2, 3])
rdd.map(lambda x: x + broadcast_var.value[0]).collect()
RDD Persistence
Caching RDDs in memory or disk for reuse:
rdd.persist(StorageLevel.MEMORY_ONLY) # Default
rdd.persist(StorageLevel.MEMORY_AND_DISK)
rdd.persist(StorageLevel.DISK_ONLY)
rdd.persist(StorageLevel.MEMORY_ONLY_SER) # Serialized
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
# Unpersist
rdd.unpersist()
RDDs vs DataFrames
Feature | RDD | DataFrame |
---|---|---|
Data Structure | Unstructured | Structured (named columns) |
Optimization | No built-in optimization | Catalyst optimizer |
Serialization | Java/Python objects | Binary format (Tungsten) |
API | Functional (map, reduce) | SQL-like |
Schema | No schema | Has schema |
Performance | Slower | Faster |
Use Case | Low-level, complex logic | Structured data operations |
In PySpark, DataFrames and RDDs (Resilient Distributed Datasets) are both fundamental data structures, but they differ in several key aspects:
1. Abstraction Level
- RDD (Resilient Distributed Dataset)
- A low-level, immutable distributed collection of objects (rows) with no schema.
- Requires manual optimization (e.g., partitioning, caching).
- Operations are performed using functions (e.g.,
map
,reduce
,filter
).
- DataFrame
- A higher-level abstraction built on top of RDDs, organized into named columns (like a table in SQL).
- Has a schema (explicit structure with data types).
- Optimized using Spark’s Catalyst optimizer and Tungsten execution engine.
- Operations are performed using DataFrame API or SQL queries.
2. Performance Optimization
- RDD
- No built-in optimization (relies on manual tuning).
- Slower for structured data due to lack of schema awareness.
- DataFrame
- Uses Catalyst Optimizer for query planning (predicate pushdown, join optimizations).
- Tungsten improves memory/CPU efficiency via binary storage and code generation.
- Generally faster than RDDs for structured operations.
3. Ease of Use
- RDD
- Requires writing more code for transformations (e.g.,
map
,reduceByKey
). - No built-in SQL support.
- Requires writing more code for transformations (e.g.,
- DataFrame
- Provides a declarative API (similar to Pandas/SQL).
- Supports SQL queries (
spark.sql("SELECT * FROM table")
). - Easier for aggregations, filtering, and joins.
4. Schema Handling
- RDD
- Schema must be inferred or handled manually (e.g., parsing strings into objects).
- DataFrame
- Explicit schema (inferred from data or defined explicitly).
- Enables better error checking (e.g., type mismatches fail early).
5. Use Cases
- Use RDDs when:
- You need fine-grained control over low-level operations.
- Working with unstructured data (e.g., text, raw logs).
- Implementing custom algorithms not expressible in SQL/DataFrame API.
- Use DataFrames when:
- Working with structured/semi-structured data (CSV, JSON, Parquet).
- Needing SQL-like operations (joins, groupBy, aggregations).
- Prioritizing performance and ease of use.
Example Comparison
RDD (Functional Style)
rdd = sc.parallelize([(1, "Alice"), (2, "Bob")])
filtered_rdd = rdd.filter(lambda x: x[0] > 1) # Manual lambda
DataFrame (Declarative Style)
df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])
filtered_df = df.filter(df.id > 1) # SQL-like syntax
Key Takeaway
- RDDs offer flexibility but require more manual effort.
- DataFrames provide optimizations, schema support, and a simpler API for structured data.
In modern PySpark, DataFrames (and Datasets) are preferred for most use cases due to their performance benefits and ease of use. RDDs are still useful for niche scenarios requiring low-level control.
Let’s break down the process of how an RDD in Apache Spark is transformed and executed, going through the DAG (Directed Acyclic Graph), DAG Scheduler, Task Scheduler, YARN, and finally to the Executors.
Also we can add the concept of job execution in Apache Spark. A job in Spark is the highest-level unit of work that gets submitted to the Spark engine. Here’s how the process unfolds from RDD creation to job execution:
1. RDD (Resilient Distributed Dataset)
- RDD Creation: You start by creating RDDs, which can be done by loading data from sources like HDFS, S3, or local files. RDDs can undergo a series of transformations such as
map
,filter
, andreduce
. These transformations define the data flow but don’t trigger any computation immediately.
2. Job Submission
- Triggering a Job: A job is triggered when an action (e.g.,
collect
,count
,saveAsTextFile
) is called on an RDD or DataFrame. Actions force Spark to evaluate the transformations you’ve defined. - Job Definition: When an action is called, Spark creates a job corresponding to the action. This job is a high-level operation that involves multiple stages.
3. DAG (Directed Acyclic Graph)
- DAG Construction: Once the job is defined, Spark constructs a DAG of stages. The DAG represents the logical execution plan, showing how transformations depend on each other and how they can be broken down into stages.
- Stages: Each stage in the DAG represents a group of tasks that can be executed together without requiring data to be shuffled. Stages are determined by shuffle operations, such as
reduceByKey
orgroupBy
.
4. DAG Scheduler
- Stage Scheduling: The DAG Scheduler breaks down the job into stages and schedules them for execution. Stages are executed sequentially if they depend on each other or in parallel if they are independent.
- Task Creation: Within each stage, the DAG Scheduler creates tasks, one for each partition of the data. The number of tasks equals the number of partitions in the RDD for that stage.
5. Task Scheduler
- Task Dispatching: The Task Scheduler is responsible for assigning tasks to available executors. It considers factors like data locality (to minimize data transfer) and resource availability when dispatching tasks.
- Task Execution: The Task Scheduler sends tasks to executors, which are processes running on worker nodes. Each task is a unit of execution corresponding to a partition of the data.
6. YARN (Yet Another Resource Negotiator)
- Resource Allocation: If Spark is running on YARN, it requests resources (CPU, memory) from YARN’s Resource Manager. YARN allocates containers on the cluster nodes, which are used to run executors.
- NodeManager: YARN’s NodeManagers manage the containers on each node, monitoring resource usage, health, and handling failures.
7. Executors
- Task Execution: Executors are the JVM processes running on worker nodes that execute the tasks. Each executor runs tasks in parallel (one task per CPU core) and uses the allocated memory to store data and intermediate results.
- In-Memory Computation: Executors perform computations on the data, typically in-memory, which allows Spark to efficiently handle iterative algorithms and interactive queries.
- Fault Tolerance: If a task fails (due to hardware issues, for example), the DAG Scheduler can reschedule the task on another executor using the lineage information of the RDD to recompute the data.
8. Job Execution Flow
- Stage Execution: The job execution starts with the first stage, which runs its tasks on the available executors. Once a stage completes, the results are either passed to the next stage or returned to the driver.
- Data Shuffling: If a stage boundary involves a shuffle (repartitioning of data across the cluster), Spark redistributes the data across executors, which can introduce network overhead.
- Final Stage and Action: The final stage of a job usually involves writing the results to storage (e.g., HDFS, S3) or returning them to the driver (in the case of actions like
collect
).
9. Completion and Result Handling
- Job Completion: Once all stages of a job are successfully completed, the job is marked as complete. The results of the action that triggered the job are either returned to the user (in the case of actions like
collect
) or saved to storage. - Driver Program: The Spark driver program monitors the execution of the job, collecting the results and handling any errors or retries if tasks or stages fail.
How These Components Work Together:
- Job Submission: When an action is triggered, Spark submits a job.
- DAG Construction: Spark constructs a DAG of stages based on the job’s transformations.
- DAG Scheduling: The DAG Scheduler breaks down the DAG into stages and creates tasks.
- Task Scheduling: The Task Scheduler sends tasks to executors based on data locality and resource availability.
- Resource Allocation (YARN): If using YARN, Spark requests resources and YARN allocates containers.
- Task Execution: Executors on worker nodes execute the tasks, processing data and returning results.
- Job Completion: The driver receives the final results, and the job is marked as complete.
Summary
- A job is a top-level Spark operation triggered by an action. Spark breaks the job down into stages and tasks using a DAG.
- The DAG Scheduler schedules these stages, while the Task Scheduler dispatches tasks to executors.
- YARN handles resource allocation, and executors perform the actual computations.
- Spark ensures fault tolerance through task retries and uses in-memory computation for speed.
- The job completes when all stages are successfully executed and results are returned or saved.
When to Convert DataFrames to RDDs
While DataFrames are generally preferred, you might need RDDs when:
- Need low-level control: Implementing custom algorithms that require fine-grained control over data
- Non-tabular data: Working with complex data structures that don’t fit well in rows/columns
- Custom serialization: When you need specific serialization not supported by DataFrames
- Legacy code: Integrating with existing RDD-based code
- Advanced partitioning: Need custom partitioning beyond what DataFrames offer
Conversion Examples:
DataFrame to RDD:
df = spark.createDataFrame([(1, "A"), (2, "B")], ["id", "value"])
rdd = df.rdd # Returns RDD of Row objects
# Convert Row objects to tuples
rdd_as_tuples = rdd.map(lambda row: (row.id, row.value))
RDD to DataFrame:
rdd = sc.parallelize([(1, "A"), (2, "B")])
df = spark.createDataFrame(rdd, ["id", "value"])
# With schema
from pyspark.sql.types import *
schema = StructType([
StructField("id", IntegerType(), True),
StructField("value", StringType(), True)
])
df = spark.createDataFrame(rdd, schema)
Interview Questions
Conceptual Questions:
- What is an RDD and what are its key characteristics?
- Explain the difference between transformations and actions in Spark.
- What is RDD lineage and why is it important?
- How does Spark achieve fault tolerance with RDDs?
- Compare and contrast RDDs with DataFrames.
- What are the different persistence levels available for RDDs?
- Explain narrow vs wide transformations in Spark.
- What are accumulators and broadcast variables in Spark?
- How does partitioning work in Spark RDDs?
- When would you prefer RDDs over DataFrames?
Coding Questions:
- Implement a word count program using RDDs.
- How would you find the average value per key in a pair RDD?
- Given two RDDs, how would you perform an inner join?
- Write code to remove duplicate entries from an RDD.
- How would you implement a moving average using RDDs?
Complex Coding Problems
Problem 1: Secondary Sort
Implement a secondary sort where you first sort by key and then by value.
data = [("a", 3), ("b", 2), ("a", 1), ("b", 4), ("a", 2)]
# Solution
rdd = sc.parallelize(data)
# Create composite key and sort
sorted_rdd = rdd.map(lambda x: ((x[0], x[1]), x)) \
.sortByKey() \
.map(lambda x: x[1])
print(sorted_rdd.collect())
# [('a', 1), ('a', 2), ('a', 3), ('b', 2), ('b', 4)]
Problem 2: Inverted Index
Create an inverted index (word → list of documents containing it) from document data.
documents = [
(1, "hello world"),
(2, "hello spark"),
(3, "spark is fast")
]
# Solution
rdd = sc.parallelize(documents)
inverted_index = rdd.flatMap(lambda doc: [(word, doc[0]) for word in doc[1].split()]) \
.groupByKey() \
.mapValues(list)
print(inverted_index.collect())
# [('world', [1]), ('hello', [1, 2]), ('spark', [2, 3]), ('is', [3]), ('fast', [3])]
Problem 3: PageRank Algorithm
Implement a simplified PageRank algorithm using RDDs.
# Sample graph data: (page, [links])
graph = [
("A", ["B", "C"]),
("B", ["C"]),
("C", ["A"]),
("D", ["C"])
]
# Initialize ranks
ranks = sc.parallelize([("A", 1.0), ("B", 1.0), ("C", 1.0), ("D", 1.0)])
# Number of iterations
iterations = 10
# PageRank implementation
for i in range(iterations):
# Create (url, (neighbors, rank)) pairs
links = sc.parallelize(graph)
contribs = links.join(ranks) \
.flatMap(lambda x: [(dest, x[1][1]/len(x[1][0])) for dest in x[1][0]])
# Sum contributions by url and update ranks
ranks = contribs.reduceByKey(lambda x, y: x + y) \
.mapValues(lambda rank: 0.15 + 0.85 * rank)
# Collect results
print(ranks.collect())
Problem 4: Handling Skewed Joins
Implement a solution for handling skewed data in RDD joins.
# Sample skewed data
left = sc.parallelize([(1, "A"), (1, "B"), (1, "C"), (2, "D"), (3, "E")] * 100)
right = sc.parallelize([(1, "X"), (2, "Y"), (4, "Z")])
# Solution: Salting technique
import random
# Add random salt to keys in the left RDD
salt_size = 10
salted_left = left.map(lambda x: (str(x[0]) + "_" + str(random.randint(0, salt_size - 1)), x[1]))
# Replicate right RDD for each salt value
salted_right = sc.parallelize([])
for i in range(salt_size):
salted_right = salted_right.union(
right.map(lambda x: (str(x[0]) + "_" + str(i), x[1]))
)
# Perform join
result = salted_left.join(salted_right) \
.map(lambda x: (int(x[0].split("_")[0]), (x[1][0], x[1][1]))) \
.distinct()
print(result.collect())
Problem 5: Time Series Analysis
Calculate moving average for time series data using RDDs.
# Sample time series data (timestamp, value)
data = [(1, 10), (2, 20), (3, 30), (4, 40), (5, 50), (6, 60)]
# Window size for moving average
window_size = 3
# Solution
rdd = sc.parallelize(data)
# Create sliding windows
moving_avg = rdd.map(lambda x: (x[0], x[1])) \
.sortByKey() \
.zipWithIndex() \
.flatMap(lambda x: [(x[1] - i, (x[0][1], 1)) for i in range(window_size)]) \
.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
.mapValues(lambda x: x[0]/x[1]) \
.sortByKey() \
.map(lambda x: (x[0], x[1]))
print(moving_avg.collect())
# [(0, 10.0), (1, 15.0), (2, 20.0), (3, 30.0), (4, 40.0), (5, 50.0)]
This comprehensive tutorial covers RDD fundamentals, operations, advanced use cases, and practical interview problems. While DataFrames/Datasets are now preferred for most use cases, understanding RDDs is crucial for low-level Spark programming and optimization.
Leave a Reply