Absolutely! Let’s walk through all major PySpark data structures and types that are commonly used in transformations and aggregations — especially:
Row
StructType
/StructField
Struct
ArrayType
/array
MapType
/map
collect_list()
,collect_set()
- Nested combinations:
Array of Structs
,Map of Arrays
, etc.
🧱 1. Row
— Spark’s Internal Data Holder
- The most basic Spark element.
- Spark represents each record internally as a
Row
object (like a named tuple).
Example:
from pyspark.sql import Row
r = Row(name="Alice", age=30)
print(r.name) # → "Alice"
Used when creating small DataFrames manually.
🏗 2. StructType
/ StructField
— Schema Definition Objects
- Used to define schema when creating DataFrames from raw data (like JSON or CSV).
StructType
is a list ofStructField
s.
Example:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
Used with:
df = spark.read.schema(schema).json("people.json")
🧱 3. struct()
— Row-like object inside a column
- Combines multiple columns into a named nested object.
- Becomes
StructType
inside a column.
Example:
from pyspark.sql.functions import struct
df = df.withColumn("contact", struct("email", "phone"))
df.printSchema()
Output schema:
root
|-- contact: struct (nullable = true)
| |-- email: string (nullable = true)
| |-- phone: string (nullable = true)
Access in SQL: contact.email
📚 4. array()
— Creates ArrayType column
- Combines columns into an ordered array (like a Python list).
- All elements must be of the same type.
Example:
from pyspark.sql.functions import array
df = df.withColumn("scores", array("math_score", "eng_score", "sci_score"))
Schema:
|-- scores: array (nullable = true)
| |-- element: integer (containsNull = true)
Access with index: scores[0]
Use explode()
to flatten.
🗺 5. map()
— Key-Value Data in One Column (MapType)
- Creates a column with a
MapType
value. - Keys must be unique and same type; values same type too.
Example:
from pyspark.sql.functions import map
df = df.withColumn("details_map", map(
F.lit("city"), F.col("city"),
F.lit("state"), F.col("state")
))
Schema:
|-- details_map: map (nullable = false)
| |-- key: string
| |-- value: string
Access like: details_map['city']
🧺 6. collect_list()
and collect_set()
— Grouped Aggregates
🔁 collect_list(column)
- Collects column values into an array.
- Duplicates allowed.
- Order is not guaranteed.
from pyspark.sql.functions import collect_list
df.groupBy("customer_id").agg(collect_list("txn_id"))
📦 Output type: ArrayType
🔂 collect_set(column)
- Same as above but removes duplicates.
from pyspark.sql.functions import collect_set
📦 Output type: ArrayType
— but unique values only.
🧬 7. Nested Structures: Combine Them!
✅ Array of Structs
df = df.groupBy("cust_id").agg(
collect_list(struct("txn_id", "amount", "date")).alias("transactions")
)
Schema:
transactions: array<struct<txn_id:string, amount:double, date:string>>
✅ Map of Arrays
df = df.withColumn("multi_map", map(
F.lit("debit"), array("d1", "d2"),
F.lit("credit"), array("c1", "c2")
))
Schema:
multi_map: map<string, array<string>>
📌 Summary Table
Type | PySpark Function | Output Type | Description |
---|---|---|---|
Row | Row() | Internal row object | One record of data |
Struct | struct() | StructType | Named group of fields |
Array | array() | ArrayType | List of same-typed values |
Map | map() | MapType | Key-value pair column |
collect_list() | collect_list() | ArrayType | Grouped list (with duplicates) |
collect_set() | collect_set() | ArrayType | Grouped list (no duplicates) |
StructType | StructType() | Schema object | Defines schema of DataFrame |
📊 Visual Schema Example
root
|-- customer_id: string
|-- transactions: array
| |-- element: struct
| | |-- txn_id: string
| | |-- amount: double
| | |-- date: string
|-- profile: struct
| |-- city: string
| |-- country: string
|-- preferences: map<string, string>
🛠 When to Use What?
Use Case | Use |
---|---|
Group many columns into one | struct() |
Model many values of same type | array() |
Collect grouped values | collect_list() / collect_set() |
Key-value pair fields | map() |
Define complex schema | StructType manually |
# PySpark Demo: Core Data Structures and Transformations
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
spark = SparkSession.builder.appName("PySparkDataStructuresDemo").getOrCreate()
# ---------------------------
# Sample Raw Data
# ---------------------------
data = [
("Alice", 1000, 500, 3.5, "Delhi", "IN"),
("Bob", 2000, 300, 4.0, "Mumbai", "IN"),
("Alice", 1500, 400, 3.8, "Delhi", "IN"),
("Charlie", 2500, 600, 4.2, "New York", "US")
]
columns = ["name", "balance", "overdraft", "interest", "city", "country"]
df = spark.createDataFrame(data, columns)
df.show(truncate=False)
# ---------------------------
# 1. Struct Column
# ---------------------------
df_struct = df.withColumn("metrics_struct", F.struct("balance", "overdraft", "interest"))
df_struct.select("name", "metrics_struct").show(truncate=False)
# ---------------------------
# 2. Array Column
# ---------------------------
df_array = df.withColumn("metrics_array", F.array("balance", "overdraft", "interest"))
df_array.select("name", "metrics_array").show(truncate=False)
# ---------------------------
# 3. Map Column
# ---------------------------
df_map = df.withColumn("location_map", F.map(F.lit("city"), F.col("city"), F.lit("country"), F.col("country")))
df_map.select("name", "location_map").show(truncate=False)
# ---------------------------
# 4. collect_list() and collect_set()
# ---------------------------
collect_df = df.groupBy("name").agg(
F.collect_list("city").alias("city_list"),
F.collect_set("city").alias("city_set")
)
collect_df.show(truncate=False)
# ---------------------------
# 5. Array of Structs (Grouped)
# ---------------------------
df_struct_arr = df.groupBy("name").agg(
F.collect_list(F.struct("balance", "overdraft", "interest")).alias("metrics_hist")
)
df_struct_arr.show(truncate=False)
# ---------------------------
# 6. Sorting Array
# ---------------------------
df_sorted_array = df.withColumn("sorted_metrics", F.sort_array(F.array("balance", "overdraft", "interest")))
df_sorted_array.select("name", "sorted_metrics").show(truncate=False)
# ---------------------------
# 7. Manual Schema with StructType
# ---------------------------
manual_data = [("A1", 25), ("A2", 30)]
schema = StructType([
StructField("id", StringType(), True),
StructField("age", IntegerType(), True)
])
df_manual = spark.createDataFrame(manual_data, schema)
df_manual.printSchema()
df_manual.show()
# Stop the Spark session
spark.stop()
Leave a Reply