Apache Spark RDDs: Comprehensive Tutorial
Table of Contents
- Introduction to RDDs
- RDD Lineage
- Creating RDDs
- Transformations and Actions
- Pair RDDs
- Advanced RDD Operations
- RDD Persistence
- RDDs vs DataFrames
- When to Convert DataFrames to RDDs
- Interview Questions
- Complex Coding Problems
Introduction to RDDs
Resilient Distributed Datasets (RDDs) are the fundamental data structure of Spark. They are:
- Immutable: Cannot be modified after creation
- Distributed: Data is partitioned across cluster
- Resilient: Can recompute lost partitions due to node failures
- Lazy evaluated: Computed only when actions are called
Key Characteristics:
- In-memory computation
- Fault tolerance through lineage
- Parallel operations
- Ability to use many data sources
RDD Lineage
RDD lineage is a graph of all the parent RDDs of an RDD. It’s built as a result of applying transformations to the RDD.
# Example showing lineage
rdd1 = sc.parallelize([1, 2, 3, 4])
rdd2 = rdd1.map(lambda x: x*2)
rdd3 = rdd2.filter(lambda x: x > 4)
rdd4 = rdd3.reduce(lambda a, b: a + b)
# To see lineage
print(rdd3.toDebugString().decode())
Output would show the transformation steps:
(2) PythonRDD[3] at RDD at PythonRDD.scala:53 []
| MapPartitionsRDD[2] at map at <stdin>:1 []
| ParallelCollectionRDD[0] at parallelize at <stdin>:1 []
Creating RDDs
From collections:
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
From external datasets:
# Text file
rdd = sc.textFile("hdfs://path/to/file.txt")
# Whole text files (returns filename, content pairs)
rdd = sc.wholeTextFiles("hdfs://path/to/directory")
From DataFrames:
df = spark.createDataFrame([(1, "A"), (2, "B")], ["id", "value"])
rdd = df.rdd # Convert to RDD of Rows
Transformations and Actions
Common Transformations:
Normal RDDs:
# Map
rdd.map(lambda x: x*2)
# Filter
rdd.filter(lambda x: x > 10)
# Distinct
rdd.distinct()
# FlatMap
rdd.flatMap(lambda x: x.split(" "))
# Sample
rdd.sample(withReplacement=False, fraction=0.5)
# Union
rdd.union(other_rdd)
# Intersection
rdd.intersection(other_rdd)
# Cartesian
rdd.cartesian(other_rdd)
Pair RDDs (covered in next section):
# reduceByKey
# groupByKey
# join, etc.
Common Actions:
# Collect (be careful with large datasets)
rdd.collect()
# Count
rdd.count()
# First
rdd.first()
# Take
rdd.take(5)
# Reduce
rdd.reduce(lambda a, b: a + b)
# Aggregate
rdd.aggregate((0, 0),
lambda acc, value: (acc[0] + value, acc[1] + 1),
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))
# Save
rdd.saveAsTextFile("output_path")
Pair RDDs
Pair RDDs are RDDs where each element is a key-value pair. They enable operations that act on each key.
Creating Pair RDDs:
# From a normal RDD
pair_rdd = rdd.map(lambda x: (x, 1))
# From a list of tuples
data = [("a", 1), ("b", 2)]
pair_rdd = sc.parallelize(data)
Key Transformations on Pair RDDs:
# reduceByKey - merge values for each key
pair_rdd.reduceByKey(lambda a, b: a + b)
# groupByKey - group values by key
pair_rdd.groupByKey()
# sortByKey - return RDD sorted by key
pair_rdd.sortByKey()
# join - inner join between two pair RDDs
rdd1.join(rdd2)
# leftOuterJoin / rightOuterJoin
rdd1.leftOuterJoin(rdd2)
# cogroup - group data from both RDDs sharing the same key
rdd1.cogroup(rdd2)
# subtractByKey - remove elements with a key present in other RDD
rdd1.subtractByKey(rdd2)
# keys - return RDD of just the keys
pair_rdd.keys()
# values - return RDD of just the values
pair_rdd.values()
Example: Word Count
text_rdd = sc.textFile("hdfs://path/to/textfile.txt")
words = text_rdd.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
word_counts.collect()
Advanced RDD Operations
Custom Partitioning:
# Partition by custom function
rdd = sc.parallelize(range(100))
rdd = rdd.partitionBy(4, lambda x: x % 4)
# Check partitions
rdd.glom().collect() # Shows data in each partition
Accumulators:
# Create accumulator
accum = sc.accumulator(0)
# Use in transformations
rdd = sc.parallelize([1, 2, 3, 4])
rdd.foreach(lambda x: accum.add(x))
print(accum.value) # 10
Broadcast Variables:
# Create broadcast variable
broadcast_var = sc.broadcast([1, 2, 3])
# Use in transformations
rdd = sc.parallelize([1, 2, 3])
rdd.map(lambda x: x + broadcast_var.value[0]).collect()
RDD Persistence
Caching RDDs in memory or disk for reuse:
rdd.persist(StorageLevel.MEMORY_ONLY) # Default
rdd.persist(StorageLevel.MEMORY_AND_DISK)
rdd.persist(StorageLevel.DISK_ONLY)
rdd.persist(StorageLevel.MEMORY_ONLY_SER) # Serialized
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
# Unpersist
rdd.unpersist()
RDDs vs DataFrames
Feature | RDD | DataFrame |
---|---|---|
Data Structure | Unstructured | Structured (named columns) |
Optimization | No built-in optimization | Catalyst optimizer |
Serialization | Java/Python objects | Binary format (Tungsten) |
API | Functional (map, reduce) | SQL-like |
Schema | No schema | Has schema |
Performance | Slower | Faster |
Use Case | Low-level, complex logic | Structured data operations |
In PySpark, DataFrames and RDDs (Resilient Distributed Datasets) are both fundamental data structures, but they differ in several key aspects:
1. Abstraction Level
- RDD (Resilient Distributed Dataset)
- A low-level, immutable distributed collection of objects (rows) with no schema.
- Requires manual optimization (e.g., partitioning, caching).
- Operations are performed using functions (e.g.,
map
,reduce
,filter
).
- DataFrame
- A higher-level abstraction built on top of RDDs, organized into named columns (like a table in SQL).
- Has a schema (explicit structure with data types).
- Optimized using Spark’s Catalyst optimizer and Tungsten execution engine.
- Operations are performed using DataFrame API or SQL queries.
2. Performance Optimization
- RDD
- No built-in optimization (relies on manual tuning).
- Slower for structured data due to lack of schema awareness.
- DataFrame
- Uses Catalyst Optimizer for query planning (predicate pushdown, join optimizations).
- Tungsten improves memory/CPU efficiency via binary storage and code generation.
- Generally faster than RDDs for structured operations.
3. Ease of Use
- RDD
- Requires writing more code for transformations (e.g.,
map
,reduceByKey
). - No built-in SQL support.
- Requires writing more code for transformations (e.g.,
- DataFrame
- Provides a declarative API (similar to Pandas/SQL).
- Supports SQL queries (
spark.sql("SELECT * FROM table")
). - Easier for aggregations, filtering, and joins.
4. Schema Handling
- RDD
- Schema must be inferred or handled manually (e.g., parsing strings into objects).
- DataFrame
- Explicit schema (inferred from data or defined explicitly).
- Enables better error checking (e.g., type mismatches fail early).
5. Use Cases
- Use RDDs when:
- You need fine-grained control over low-level operations.
- Working with unstructured data (e.g., text, raw logs).
- Implementing custom algorithms not expressible in SQL/DataFrame API.
- Use DataFrames when:
- Working with structured/semi-structured data (CSV, JSON, Parquet).
- Needing SQL-like operations (joins, groupBy, aggregations).
- Prioritizing performance and ease of use.
Example Comparison
RDD (Functional Style)
rdd = sc.parallelize([(1, "Alice"), (2, "Bob")])
filtered_rdd = rdd.filter(lambda x: x[0] > 1) # Manual lambda
DataFrame (Declarative Style)
df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])
filtered_df = df.filter(df.id > 1) # SQL-like syntax
Key Takeaway
- RDDs offer flexibility but require more manual effort.
- DataFrames provide optimizations, schema support, and a simpler API for structured data.
In modern PySpark, DataFrames (and Datasets) are preferred for most use cases due to their performance benefits and ease of use. RDDs are still useful for niche scenarios requiring low-level control.
When to Convert DataFrames to RDDs
While DataFrames are generally preferred, you might need RDDs when:
- Need low-level control: Implementing custom algorithms that require fine-grained control over data
- Non-tabular data: Working with complex data structures that don’t fit well in rows/columns
- Custom serialization: When you need specific serialization not supported by DataFrames
- Legacy code: Integrating with existing RDD-based code
- Advanced partitioning: Need custom partitioning beyond what DataFrames offer
Conversion Examples:
DataFrame to RDD:
df = spark.createDataFrame([(1, "A"), (2, "B")], ["id", "value"])
rdd = df.rdd # Returns RDD of Row objects
# Convert Row objects to tuples
rdd_as_tuples = rdd.map(lambda row: (row.id, row.value))
RDD to DataFrame:
rdd = sc.parallelize([(1, "A"), (2, "B")])
df = spark.createDataFrame(rdd, ["id", "value"])
# With schema
from pyspark.sql.types import *
schema = StructType([
StructField("id", IntegerType(), True),
StructField("value", StringType(), True)
])
df = spark.createDataFrame(rdd, schema)
Interview Questions
Conceptual Questions:
- What is an RDD and what are its key characteristics?
- Explain the difference between transformations and actions in Spark.
- What is RDD lineage and why is it important?
- How does Spark achieve fault tolerance with RDDs?
- Compare and contrast RDDs with DataFrames.
- What are the different persistence levels available for RDDs?
- Explain narrow vs wide transformations in Spark.
- What are accumulators and broadcast variables in Spark?
- How does partitioning work in Spark RDDs?
- When would you prefer RDDs over DataFrames?
Coding Questions:
- Implement a word count program using RDDs.
- How would you find the average value per key in a pair RDD?
- Given two RDDs, how would you perform an inner join?
- Write code to remove duplicate entries from an RDD.
- How would you implement a moving average using RDDs?
Complex Coding Problems
Problem 1: Secondary Sort
Implement a secondary sort where you first sort by key and then by value.
data = [("a", 3), ("b", 2), ("a", 1), ("b", 4), ("a", 2)]
# Solution
rdd = sc.parallelize(data)
# Create composite key and sort
sorted_rdd = rdd.map(lambda x: ((x[0], x[1]), x)) \
.sortByKey() \
.map(lambda x: x[1])
print(sorted_rdd.collect())
# [('a', 1), ('a', 2), ('a', 3), ('b', 2), ('b', 4)]
Problem 2: Inverted Index
Create an inverted index (word → list of documents containing it) from document data.
documents = [
(1, "hello world"),
(2, "hello spark"),
(3, "spark is fast")
]
# Solution
rdd = sc.parallelize(documents)
inverted_index = rdd.flatMap(lambda doc: [(word, doc[0]) for word in doc[1].split()]) \
.groupByKey() \
.mapValues(list)
print(inverted_index.collect())
# [('world', [1]), ('hello', [1, 2]), ('spark', [2, 3]), ('is', [3]), ('fast', [3])]
Problem 3: PageRank Algorithm
Implement a simplified PageRank algorithm using RDDs.
# Sample graph data: (page, [links])
graph = [
("A", ["B", "C"]),
("B", ["C"]),
("C", ["A"]),
("D", ["C"])
]
# Initialize ranks
ranks = sc.parallelize([("A", 1.0), ("B", 1.0), ("C", 1.0), ("D", 1.0)])
# Number of iterations
iterations = 10
# PageRank implementation
for i in range(iterations):
# Create (url, (neighbors, rank)) pairs
links = sc.parallelize(graph)
contribs = links.join(ranks) \
.flatMap(lambda x: [(dest, x[1][1]/len(x[1][0])) for dest in x[1][0]])
# Sum contributions by url and update ranks
ranks = contribs.reduceByKey(lambda x, y: x + y) \
.mapValues(lambda rank: 0.15 + 0.85 * rank)
# Collect results
print(ranks.collect())
Problem 4: Handling Skewed Joins
Implement a solution for handling skewed data in RDD joins.
# Sample skewed data
left = sc.parallelize([(1, "A"), (1, "B"), (1, "C"), (2, "D"), (3, "E")] * 100)
right = sc.parallelize([(1, "X"), (2, "Y"), (4, "Z")])
# Solution: Salting technique
import random
# Add random salt to keys in the left RDD
salt_size = 10
salted_left = left.map(lambda x: (str(x[0]) + "_" + str(random.randint(0, salt_size - 1)), x[1]))
# Replicate right RDD for each salt value
salted_right = sc.parallelize([])
for i in range(salt_size):
salted_right = salted_right.union(
right.map(lambda x: (str(x[0]) + "_" + str(i), x[1]))
)
# Perform join
result = salted_left.join(salted_right) \
.map(lambda x: (int(x[0].split("_")[0]), (x[1][0], x[1][1]))) \
.distinct()
print(result.collect())
Problem 5: Time Series Analysis
Calculate moving average for time series data using RDDs.
# Sample time series data (timestamp, value)
data = [(1, 10), (2, 20), (3, 30), (4, 40), (5, 50), (6, 60)]
# Window size for moving average
window_size = 3
# Solution
rdd = sc.parallelize(data)
# Create sliding windows
moving_avg = rdd.map(lambda x: (x[0], x[1])) \
.sortByKey() \
.zipWithIndex() \
.flatMap(lambda x: [(x[1] - i, (x[0][1], 1)) for i in range(window_size)]) \
.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
.mapValues(lambda x: x[0]/x[1]) \
.sortByKey() \
.map(lambda x: (x[0], x[1]))
print(moving_avg.collect())
# [(0, 10.0), (1, 15.0), (2, 20.0), (3, 30.0), (4, 40.0), (5, 50.0)]
This comprehensive tutorial covers RDD fundamentals, operations, advanced use cases, and practical interview problems. While DataFrames/Datasets are now preferred for most use cases, understanding RDDs is crucial for low-level Spark programming and optimization.
Leave a Reply