🔍 What Does collect_list() Do in Spark SQL?

collect_list() is an aggregation function in Spark SQL and PySpark.

It:

Collects all values of a column (within a group, if grouped) into a single array, preserving duplicates and order (non-deterministically).


✅ Syntax

SELECT collect_list(column_name) FROM your_table;

In PySpark:

from pyspark.sql import functions as F

df.groupBy("some_column").agg(F.collect_list("value_column"))

🧾 Example

Input table:

categoryvalue
Ax
Ay
Ax
Bz
By

Query:

SELECT category, collect_list(value) AS value_list
FROM your_table
GROUP BY category;

Output:

categoryvalue_list
A[x, y, x]
B[z, y]
  • ✅ Duplicates are preserved
  • ⚠️ Order is not guaranteed (unless explicitly sorted via sort_array() later)

🔄 Difference Between collect_list() vs collect_set()

FunctionDuplicatesOutput Order
collect_list()✅ Yes❌ Not guaranteed
collect_set()❌ No❌ Not guaranteed

If you want a distinct sorted list, combine:

sort_array(collect_set(value_column))

📌 Common Use Cases

Use CaseDescription
Grouped logsCollect all log messages for a session
Building pairsCombine row values into arrays
Pre-join packingPack rows before sending to UDF
Feature engineeringList of all transactions, behaviors, tags per user

Below are a few “recipes” showing how to collect values only for the columns you care about.
Pick the pattern that fits your goal:

GoalBest PatternSpark SQLPySpark DataFrame
📥 Collect a single column’s values across rows (classic use)collect_list(col)collect_list(colA)F.collect_list("colA")
📦 Collect several columns together as one object per rowcollect_list(struct(...))collect_list(struct(colA , colB))F.collect_list(F.struct("colA","colB"))
🗂 Collect each column separately (two independent arrays)multiple collect_list callssee example ②see example ②
🏷 Collect key/value pairs (column‑name → value)collect_list(named_struct(...)) or map_from_arrayssee example ③see example ③
🧾 Collect within each row (no grouping → pack selected columns)array(colA , colB , colC)array(colA,colB,colC)F.array("colA","colB","colC")

① Collect several columns together (one array of structs)

“Give me, per customer, every (txn_id, amount) pair they made.”

SELECT
  customer_id,
  collect_list(struct(txn_id, amount))  AS txn_hist
FROM bank_txns
GROUP BY customer_id;

Result sample (JSON view):

{
  "customer_id": 42,
  "txn_hist": [
    {"txn_id": "T100", "amount": 500},
    {"txn_id": "T101", "amount": 200}
  ]
}

PySpark equivalent

hist = (df
    .groupBy("customer_id")
    .agg(F.collect_list(F.struct("txn_id", "amount")).alias("txn_hist"))
)

✔️ All values stay aligned (txn_id with its matching amount) because they sit in the same struct.


② Collect each column separately

“Per branch, list all distinct account numbers and all distinct currencies in two arrays.”

SELECT
  branch_id,
  collect_set(account_no) AS accts,
  collect_set(currency)   AS ccy
FROM bank_txns
GROUP BY branch_id;

PySpark

df_sets = (df
    .groupBy("branch_id")
    .agg(
        F.collect_set("account_no").alias("accts"),
        F.collect_set("currency").alias("ccy")
    )
)

Result:

branch_idacctsccy
007[ACC3, ACC1, ACC2][USD, INR]

③ Collect dynamic key/value pairs (named_struct / map)

“Per customer, pack KYC attributes we care about into one array of key=value structs.”

SELECT
  cust_id,
  collect_list(
      named_struct('key','city',      'val', city)
  ) || collect_list(
      named_struct('key','risk_flag', 'val', risk_flag)
  ) AS kyc_blob
FROM customers
GROUP BY cust_id;

Or build a MapType instead of an array of structs:

SELECT
  cust_id,
  map_from_arrays(                -- Spark SQL 3.0+
      array('city','risk_flag'),
      array(city , risk_flag)
  ) AS kyc_map
FROM customers;

Result:

{
  "cust_id": 99,
  "kyc_map": {"city":"Delhi", "risk_flag":"LOW"}
}

④ Pack selected columns inside each row (no collect_list)

If you don’t need grouping—just want a tidy array/struct per row:

SELECT
  account_no,
  array(balance, overdraft_limit, interest_rate) AS metrics
FROM accounts;

PySpark:

df_with_array = df.select(
    "account_no",
    F.array("balance","overdraft_limit","interest_rate").alias("metrics")
)

🔑 Tips & Gotchas

  1. Order isn’t guaranteed with collect_list / collect_set.
    Use sort_array(...) afterwards if deterministic order matters: sort_array(collect_list(colA))
  2. collect_set removes duplicates, collect_list keeps them.
  3. For very wide arrays, consider exploding later: SELECT cust_id, txn.* FROM ( ...collect_list(struct(...)) ) t LATERAL VIEW explode(txn_hist) expl AS txn;
  4. If you’ll query deeply nested results in SQL, store them in Delta—Spark SQL can still push predicates down into structs/arrays.

Use sort_array() (SQL) or array_sort() / sort_array() (PySpark) to sort the elements inside the array after you have built it.


Spark SQL

SELECT
    account_no,
    balance,
    overdraft_limit,
    interest_rate,

    -- Ascending sort  (default)
    sort_array(array(balance, overdraft_limit, interest_rate))      AS metrics_asc,

    -- Descending sort
    sort_array(array(balance, overdraft_limit, interest_rate), false) AS metrics_desc
FROM accounts;

Notes

PointDetail
Functionsort_array(<array_expr> [, ascending_boolean])
Default orderAscending (true)
DescendingPass false as the second argument (Spark 3.1 +)
Type requirementAll elements must share the same data type (e.g. all DOUBLE).
NULL handlingNULLs come first in ascending order, last in descending.
Version aliasFrom Spark 3.1 onward, array_sort() is an exact synonym.

PySpark DataFrame API

from pyspark.sql import functions as F

df_sorted = (df
    .withColumn("metrics_asc",
        F.sort_array(F.array("balance", "overdraft_limit", "interest_rate"))
    )
    .withColumn("metrics_desc",
        F.sort_array(
            F.array("balance", "overdraft_limit", "interest_rate"),
            asc=False            # descending
        )
    )
)

Example Result for one row

balanceoverdraft_limitinterest_ratemetrics_ascmetrics_desc
10005003.5[3.5, 500, 1000][1000, 500, 3.5]

If you need stable field names instead of positions

You can keep the original array unsorted and add a separate sorted one, or explode the array later:

SELECT
  account_no,
  EXPLODE(sort_array(array(balance, overdraft_limit, interest_rate))) AS metric_value
FROM accounts;

This produces one row per metric, already ordered.


Yes — instead of packing values into a plain array, you can pack them into a struct (row) object and even into an array of structs.

A struct in Spark SQL / PySpark is the same as a nested row: each field keeps its own name and type.


1 ️⃣ Build one struct per row (no aggregation)

SELECT
    account_no,
    struct(
        balance          AS bal,
        overdraft_limit  AS od_limit,
        interest_rate    AS int_rate
    ) AS metrics_struct
FROM accounts;

Result sneak‑peek:

account_nometrics_struct
1001{bal: 1000, od_limit: 500, int_rate: 3.5}

PySpark equivalent

from pyspark.sql import functions as F

df_struct = (df
    .withColumn(
        "metrics_struct",
        F.struct(
            F.col("balance").alias("bal"),
            F.col("overdraft_limit").alias("od_limit"),
            F.col("interest_rate").alias("int_rate")
        )
    )
)

Why use a struct?

  • Field names stay intact (bal, od_limit, int_rate) → easier downstream SQL: metrics_struct.bal.

2 ️⃣ Aggregate into an array of structs (collect rows together)

SELECT
    customer_id,
    collect_list(
        struct(txn_id, amount, txn_ts)         -- one struct per transaction
    ) AS txn_hist
FROM bank_txns
GROUP BY customer_id;

Results in one row per customer, with a JSON‑like list:

{
  "customer_id": 42,
  "txn_hist": [
    {"txn_id":"T100","amount":500,"txn_ts":"2025‑07‑05"},
    {"txn_id":"T101","amount":200,"txn_ts":"2025‑07‑04"}
  ]
}

Sorting an array‑of‑structs

Spark 3.4 + lets you sort by a field using a lambda comparator:

SELECT
    customer_id,
    array_sort(
        collect_list(struct(txn_id, amount, txn_ts)),
        (left, right) -> left.txn_ts < right.txn_ts          -- oldest‑first
    ) AS txn_hist_sorted
FROM bank_txns
GROUP BY customer_id;

(For earlier Spark versions you’d explode, order by, and collect_list again.)


3 ️⃣ Mixing arrays and structs

You can keep both: a quick‑to‑scan numeric array and a named struct:

SELECT
    account_no,

    /* purely positional metrics array */
    sort_array(array(balance, overdraft_limit, interest_rate))        AS metrics_array,

    /* self‑describing struct */
    struct(balance, overdraft_limit, interest_rate)                   AS metrics_struct
FROM accounts;

4 ️⃣ Accessing nested values later

-- get only interest_rate out of the struct
SELECT account_no, metrics_struct.interest_rate
FROM accounts_with_struct;

-- explode an array of structs, then filter
SELECT customer_id, txn.*
FROM customers c
LATERAL VIEW explode(c.txn_hist) t AS txn
WHERE txn.amount > 1000;

🔑 When to choose struct over array

NeedChoose
Self‑documenting field namesStruct
Numeric vector / ML feature arrayArray
Must preserve 1‑to‑1 alignment of several columns when aggregatingArray of structs
Very wide, anonymous data you’ll explode quicklyArray

Both struct and array[struct] are row objects under the hood (Spark’s Row / StructType). So yes—packing specific columns as struct(s) is fully supported and often clearer than a bare array.

Pages: 1 2 3


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Posted in

Leave a Reply

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading