🔍 What Does collect_list()
Do in Spark SQL?
collect_list()
is an aggregation function in Spark SQL and PySpark.
It:
Collects all values of a column (within a group, if grouped) into a single array, preserving duplicates and order (non-deterministically).
✅ Syntax
SELECT collect_list(column_name) FROM your_table;
In PySpark:
from pyspark.sql import functions as F
df.groupBy("some_column").agg(F.collect_list("value_column"))
🧾 Example
Input table:
category | value |
---|---|
A | x |
A | y |
A | x |
B | z |
B | y |
Query:
SELECT category, collect_list(value) AS value_list
FROM your_table
GROUP BY category;
Output:
category | value_list |
---|---|
A | [x, y, x] |
B | [z, y] |
- ✅ Duplicates are preserved
- ⚠️ Order is not guaranteed (unless explicitly sorted via
sort_array()
later)
🔄 Difference Between collect_list()
vs collect_set()
Function | Duplicates | Output Order |
---|---|---|
collect_list() | ✅ Yes | ❌ Not guaranteed |
collect_set() | ❌ No | ❌ Not guaranteed |
If you want a distinct sorted list, combine:
sort_array(collect_set(value_column))
📌 Common Use Cases
Use Case | Description |
---|---|
Grouped logs | Collect all log messages for a session |
Building pairs | Combine row values into arrays |
Pre-join packing | Pack rows before sending to UDF |
Feature engineering | List of all transactions, behaviors, tags per user |
Below are a few “recipes” showing how to collect values only for the columns you care about.
Pick the pattern that fits your goal:
Goal | Best Pattern | Spark SQL | PySpark DataFrame |
---|---|---|---|
📥 Collect a single column’s values across rows (classic use) | collect_list(col) | collect_list(colA) | F.collect_list("colA") |
📦 Collect several columns together as one object per row | collect_list(struct(...)) | collect_list(struct(colA , colB)) | F.collect_list(F.struct("colA","colB")) |
🗂 Collect each column separately (two independent arrays) | multiple collect_list calls | see example ② | see example ② |
🏷 Collect key/value pairs (column‑name → value) | collect_list(named_struct(...)) or map_from_arrays | see example ③ | see example ③ |
🧾 Collect within each row (no grouping → pack selected columns) | array(colA , colB , colC) | array(colA,colB,colC) | F.array("colA","colB","colC") |
① Collect several columns together (one array of structs)
“Give me, per customer, every (txn_id, amount) pair they made.”
SELECT
customer_id,
collect_list(struct(txn_id, amount)) AS txn_hist
FROM bank_txns
GROUP BY customer_id;
Result sample (JSON view):
{
"customer_id": 42,
"txn_hist": [
{"txn_id": "T100", "amount": 500},
{"txn_id": "T101", "amount": 200}
]
}
PySpark equivalent
hist = (df
.groupBy("customer_id")
.agg(F.collect_list(F.struct("txn_id", "amount")).alias("txn_hist"))
)
✔️ All values stay aligned (txn_id
with its matching amount
) because they sit in the same struct
.
② Collect each column separately
“Per branch, list all distinct account numbers and all distinct currencies in two arrays.”
SELECT
branch_id,
collect_set(account_no) AS accts,
collect_set(currency) AS ccy
FROM bank_txns
GROUP BY branch_id;
PySpark
df_sets = (df
.groupBy("branch_id")
.agg(
F.collect_set("account_no").alias("accts"),
F.collect_set("currency").alias("ccy")
)
)
Result:
branch_id | accts | ccy |
---|---|---|
007 | [ACC3, ACC1, ACC2] | [USD, INR] |
③ Collect dynamic key/value pairs (named_struct / map)
“Per customer, pack KYC attributes we care about into one array of
key=value
structs.”
SELECT
cust_id,
collect_list(
named_struct('key','city', 'val', city)
) || collect_list(
named_struct('key','risk_flag', 'val', risk_flag)
) AS kyc_blob
FROM customers
GROUP BY cust_id;
Or build a MapType instead of an array of structs:
SELECT
cust_id,
map_from_arrays( -- Spark SQL 3.0+
array('city','risk_flag'),
array(city , risk_flag)
) AS kyc_map
FROM customers;
Result:
{
"cust_id": 99,
"kyc_map": {"city":"Delhi", "risk_flag":"LOW"}
}
④ Pack selected columns inside each row (no collect_list
)
If you don’t need grouping—just want a tidy array/struct per row:
SELECT
account_no,
array(balance, overdraft_limit, interest_rate) AS metrics
FROM accounts;
PySpark:
df_with_array = df.select(
"account_no",
F.array("balance","overdraft_limit","interest_rate").alias("metrics")
)
🔑 Tips & Gotchas
- Order isn’t guaranteed with
collect_list
/collect_set
.
Usesort_array(...)
afterwards if deterministic order matters:sort_array(collect_list(colA))
collect_set
removes duplicates,collect_list
keeps them.- For very wide arrays, consider exploding later:
SELECT cust_id, txn.* FROM ( ...collect_list(struct(...)) ) t LATERAL VIEW explode(txn_hist) expl AS txn;
- If you’ll query deeply nested results in SQL, store them in Delta—Spark SQL can still push predicates down into structs/arrays.
Use sort_array()
(SQL) or array_sort()
/ sort_array()
(PySpark) to sort the elements inside the array after you have built it.
Spark SQL
SELECT
account_no,
balance,
overdraft_limit,
interest_rate,
-- Ascending sort (default)
sort_array(array(balance, overdraft_limit, interest_rate)) AS metrics_asc,
-- Descending sort
sort_array(array(balance, overdraft_limit, interest_rate), false) AS metrics_desc
FROM accounts;
Notes
Point | Detail |
---|---|
Function | sort_array(<array_expr> [, ascending_boolean]) |
Default order | Ascending (true ) |
Descending | Pass false as the second argument (Spark 3.1 +) |
Type requirement | All elements must share the same data type (e.g. all DOUBLE ). |
NULL handling | NULL s come first in ascending order, last in descending. |
Version alias | From Spark 3.1 onward, array_sort() is an exact synonym. |
PySpark DataFrame API
from pyspark.sql import functions as F
df_sorted = (df
.withColumn("metrics_asc",
F.sort_array(F.array("balance", "overdraft_limit", "interest_rate"))
)
.withColumn("metrics_desc",
F.sort_array(
F.array("balance", "overdraft_limit", "interest_rate"),
asc=False # descending
)
)
)
Example Result for one row
balance | overdraft_limit | interest_rate | metrics_asc | metrics_desc |
---|---|---|---|---|
1000 | 500 | 3.5 | [3.5, 500, 1000] | [1000, 500, 3.5] |
If you need stable field names instead of positions
You can keep the original array unsorted and add a separate sorted one, or explode the array later:
SELECT
account_no,
EXPLODE(sort_array(array(balance, overdraft_limit, interest_rate))) AS metric_value
FROM accounts;
This produces one row per metric, already ordered.
Yes — instead of packing values into a plain array, you can pack them into a struct
(row) object and even into an array of structs.
A struct
in Spark SQL / PySpark is the same as a nested row: each field keeps its own name and type.
1 ️⃣ Build one struct per row (no aggregation)
SELECT
account_no,
struct(
balance AS bal,
overdraft_limit AS od_limit,
interest_rate AS int_rate
) AS metrics_struct
FROM accounts;
Result sneak‑peek:
account_no | metrics_struct |
---|---|
1001 | {bal: 1000, od_limit: 500, int_rate: 3.5} |
PySpark equivalent
from pyspark.sql import functions as F
df_struct = (df
.withColumn(
"metrics_struct",
F.struct(
F.col("balance").alias("bal"),
F.col("overdraft_limit").alias("od_limit"),
F.col("interest_rate").alias("int_rate")
)
)
)
Why use a struct?
- Field names stay intact (
bal
,od_limit
,int_rate
) → easier downstream SQL:metrics_struct.bal
.
2 ️⃣ Aggregate into an array of structs (collect rows together)
SELECT
customer_id,
collect_list(
struct(txn_id, amount, txn_ts) -- one struct per transaction
) AS txn_hist
FROM bank_txns
GROUP BY customer_id;
Results in one row per customer, with a JSON‑like list:
{
"customer_id": 42,
"txn_hist": [
{"txn_id":"T100","amount":500,"txn_ts":"2025‑07‑05"},
{"txn_id":"T101","amount":200,"txn_ts":"2025‑07‑04"}
]
}
Sorting an array‑of‑structs
Spark 3.4 + lets you sort by a field using a lambda comparator:
SELECT
customer_id,
array_sort(
collect_list(struct(txn_id, amount, txn_ts)),
(left, right) -> left.txn_ts < right.txn_ts -- oldest‑first
) AS txn_hist_sorted
FROM bank_txns
GROUP BY customer_id;
(For earlier Spark versions you’d explode
, order by
, and collect_list
again.)
3 ️⃣ Mixing arrays and structs
You can keep both: a quick‑to‑scan numeric array and a named struct:
SELECT
account_no,
/* purely positional metrics array */
sort_array(array(balance, overdraft_limit, interest_rate)) AS metrics_array,
/* self‑describing struct */
struct(balance, overdraft_limit, interest_rate) AS metrics_struct
FROM accounts;
4 ️⃣ Accessing nested values later
-- get only interest_rate out of the struct
SELECT account_no, metrics_struct.interest_rate
FROM accounts_with_struct;
-- explode an array of structs, then filter
SELECT customer_id, txn.*
FROM customers c
LATERAL VIEW explode(c.txn_hist) t AS txn
WHERE txn.amount > 1000;
🔑 When to choose struct over array
Need | Choose |
---|---|
Self‑documenting field names | Struct |
Numeric vector / ML feature array | Array |
Must preserve 1‑to‑1 alignment of several columns when aggregating | Array of structs |
Very wide, anonymous data you’ll explode quickly | Array |
Both struct
and array[struct]
are row objects under the hood (Spark’s Row
/ StructType
). So yes—packing specific columns as struct(s) is fully supported and often clearer than a bare array.
Leave a Reply