Absolutely! Let’s walk through all major PySpark data structures and types that are commonly used in transformations and aggregations — especially:

  • Row
  • StructType / StructField
  • Struct
  • ArrayType / array
  • MapType / map
  • collect_list(), collect_set()
  • Nested combinations: Array of Structs, Map of Arrays, etc.

🧱 1. Row — Spark’s Internal Data Holder

  • The most basic Spark element.
  • Spark represents each record internally as a Row object (like a named tuple).

Example:

from pyspark.sql import Row

r = Row(name="Alice", age=30)
print(r.name)  # → "Alice"

Used when creating small DataFrames manually.


🏗 2. StructType / StructField — Schema Definition Objects

  • Used to define schema when creating DataFrames from raw data (like JSON or CSV).
  • StructType is a list of StructFields.

Example:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

Used with:

df = spark.read.schema(schema).json("people.json")

🧱 3. struct() — Row-like object inside a column

  • Combines multiple columns into a named nested object.
  • Becomes StructType inside a column.

Example:

from pyspark.sql.functions import struct

df = df.withColumn("contact", struct("email", "phone"))
df.printSchema()

Output schema:

root
 |-- contact: struct (nullable = true)
 |    |-- email: string (nullable = true)
 |    |-- phone: string (nullable = true)

Access in SQL: contact.email


📚 4. array() — Creates ArrayType column

  • Combines columns into an ordered array (like a Python list).
  • All elements must be of the same type.

Example:

from pyspark.sql.functions import array

df = df.withColumn("scores", array("math_score", "eng_score", "sci_score"))

Schema:

 |-- scores: array (nullable = true)
 |    |-- element: integer (containsNull = true)

Access with index: scores[0]
Use explode() to flatten.


🗺 5. map() — Key-Value Data in One Column (MapType)

  • Creates a column with a MapType value.
  • Keys must be unique and same type; values same type too.

Example:

from pyspark.sql.functions import map

df = df.withColumn("details_map", map(
    F.lit("city"), F.col("city"),
    F.lit("state"), F.col("state")
))

Schema:

 |-- details_map: map (nullable = false)
 |    |-- key: string
 |    |-- value: string

Access like: details_map['city']


🧺 6. collect_list() and collect_set() — Grouped Aggregates

🔁 collect_list(column)

  • Collects column values into an array.
  • Duplicates allowed.
  • Order is not guaranteed.
from pyspark.sql.functions import collect_list

df.groupBy("customer_id").agg(collect_list("txn_id"))

📦 Output type: ArrayType


🔂 collect_set(column)

  • Same as above but removes duplicates.
from pyspark.sql.functions import collect_set

📦 Output type: ArrayType — but unique values only.


🧬 7. Nested Structures: Combine Them!

✅ Array of Structs

df = df.groupBy("cust_id").agg(
    collect_list(struct("txn_id", "amount", "date")).alias("transactions")
)

Schema:

transactions: array<struct<txn_id:string, amount:double, date:string>>

✅ Map of Arrays

df = df.withColumn("multi_map", map(
    F.lit("debit"), array("d1", "d2"),
    F.lit("credit"), array("c1", "c2")
))

Schema:

multi_map: map<string, array<string>>

📌 Summary Table

TypePySpark FunctionOutput TypeDescription
RowRow()Internal row objectOne record of data
Structstruct()StructTypeNamed group of fields
Arrayarray()ArrayTypeList of same-typed values
Mapmap()MapTypeKey-value pair column
collect_list()collect_list()ArrayTypeGrouped list (with duplicates)
collect_set()collect_set()ArrayTypeGrouped list (no duplicates)
StructTypeStructType()Schema objectDefines schema of DataFrame

📊 Visual Schema Example

root
 |-- customer_id: string
 |-- transactions: array
 |    |-- element: struct
 |    |    |-- txn_id: string
 |    |    |-- amount: double
 |    |    |-- date: string
 |-- profile: struct
 |    |-- city: string
 |    |-- country: string
 |-- preferences: map<string, string>

🛠 When to Use What?

Use CaseUse
Group many columns into onestruct()
Model many values of same typearray()
Collect grouped valuescollect_list() / collect_set()
Key-value pair fieldsmap()
Define complex schemaStructType manually

# PySpark Demo: Core Data Structures and Transformations
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

spark = SparkSession.builder.appName("PySparkDataStructuresDemo").getOrCreate()

# ---------------------------
# Sample Raw Data
# ---------------------------
data = [
    ("Alice", 1000, 500, 3.5, "Delhi", "IN"),
    ("Bob", 2000, 300, 4.0, "Mumbai", "IN"),
    ("Alice", 1500, 400, 3.8, "Delhi", "IN"),
    ("Charlie", 2500, 600, 4.2, "New York", "US")
]

columns = ["name", "balance", "overdraft", "interest", "city", "country"]
df = spark.createDataFrame(data, columns)
df.show(truncate=False)

# ---------------------------
# 1. Struct Column
# ---------------------------
df_struct = df.withColumn("metrics_struct", F.struct("balance", "overdraft", "interest"))
df_struct.select("name", "metrics_struct").show(truncate=False)

# ---------------------------
# 2. Array Column
# ---------------------------
df_array = df.withColumn("metrics_array", F.array("balance", "overdraft", "interest"))
df_array.select("name", "metrics_array").show(truncate=False)

# ---------------------------
# 3. Map Column
# ---------------------------
df_map = df.withColumn("location_map", F.map(F.lit("city"), F.col("city"), F.lit("country"), F.col("country")))
df_map.select("name", "location_map").show(truncate=False)

# ---------------------------
# 4. collect_list() and collect_set()
# ---------------------------
collect_df = df.groupBy("name").agg(
    F.collect_list("city").alias("city_list"),
    F.collect_set("city").alias("city_set")
)
collect_df.show(truncate=False)

# ---------------------------
# 5. Array of Structs (Grouped)
# ---------------------------
df_struct_arr = df.groupBy("name").agg(
    F.collect_list(F.struct("balance", "overdraft", "interest")).alias("metrics_hist")
)
df_struct_arr.show(truncate=False)

# ---------------------------
# 6. Sorting Array
# ---------------------------
df_sorted_array = df.withColumn("sorted_metrics", F.sort_array(F.array("balance", "overdraft", "interest")))
df_sorted_array.select("name", "sorted_metrics").show(truncate=False)

# ---------------------------
# 7. Manual Schema with StructType
# ---------------------------
manual_data = [("A1", 25), ("A2", 30)]
schema = StructType([
    StructField("id", StringType(), True),
    StructField("age", IntegerType(), True)
])
df_manual = spark.createDataFrame(manual_data, schema)
df_manual.printSchema()
df_manual.show()

# Stop the Spark session
spark.stop()

Pages: 1 2 3


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Posted in

Leave a Reply

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading