🔍 What Are Accumulators in PySpark?

Accumulators are write‑only shared variables that executors can only add to, while the driver can read their aggregated value after an action completes.

FeatureDetail
PurposeCollect side‑effect statistics (counters, sums) during distributed computation
VisibilityExecutors: can add()Driver: can read result (only reliable after an action)
Data typesBuilt‑ins: LongAccumulator, DoubleAccumulator, CollectionAccumulatorCustom: subclass AccumulatorV2
Fault toleranceEach task keeps a local copy; value is merged on the driver ⇒ survives retries (may add twice if task is rerun)
UIShows up in the Spark UI (“Accumulators” tab) for quick debugging

🏗️ How Accumulators Are Implemented (Under the Hood)

  1. Created on the driver via spark.sparkContext.longAccumulator("bad_rows").
  2. Serialized and sent to each executor with the task.
  3. Task‑local slot: each executor has its own copy; it calls add() without locking.
  4. Task completion: executor sends the local delta back to the driver through the scheduler’s RPC.
  5. Driver merges deltas using the accumulator’s merge() (for numeric types, an addition).
  6. Spark UI & driver code can read the final value after the action (e.g., collect, count, write, …).

Important: If a task is retried Spark may add its contribution again, so accumulators are at‑least‑once; they should not drive program logic, only diagnostics.


🛠️ Creating & Using Accumulators

1. Built‑in LongAccumulator

bad_rows_acc = spark.sparkContext.longAccumulator("bad_rows")

def parse(line):
    try:
        return int(line)
    except ValueError:
        bad_rows_acc.add(1)
        return None        # or raise

rdd = spark.sparkContext.textFile("numbers.txt")
valid = rdd.map(parse).filter(lambda x: x is not None)
valid.sum()                # ← action triggers computation

print("Bad rows:", bad_rows_acc.value)   # safe to read here

2. Custom Accumulator (AccumulatorV2)

from pyspark.accumulators import AccumulatorParam  # <= Spark 2.0 legacy
from pyspark.sql import SparkSession
from pyspark import AccumulatorV2

class SetAccumulator(AccumulatorV2):
    def __init__(self, initial=None):
        self._set = set() if initial is None else set(initial)

    def isZero(self):
        return len(self._set) == 0

    def copy(self):
        return SetAccumulator(self._set)

    def reset(self):
        self._set.clear()

    def add(self, value):
        self._set.add(value)

    def merge(self, other):
        self._set |= other._set

    def value(self):
        return self._set

spark = SparkSession.builder.getOrCreate()
unique_errs = SetAccumulator()
spark.sparkContext.register(unique_errs, "errCodes")

# add inside tasks …

📈 Typical Use‑Cases

CategoryExample
Data‑quality countersCount malformed records, null columns, late events
ETL metricsTrack number of rows skipped, converted, anonymized
Model trainingCount label‑distribution skew, feature missingness
DebuggingVerify that a branch of logic executes expected # of times
Performance tuningCount heavy joins, fallback partitions, spills (with custom instrumented code)

⚠️ Limitations & Gotchas

  1. Not idempotent: task retries may duplicate additions → use only for approximate or logging stats.
  2. Executors can’t read accumulator value — it’s write‑only on workers.
  3. Lazy eval: value appears only after an action; reading before returns 0.
  4. No control‑flow: never make program decisions based on accumulator mid‑job (could be inconsistent).
  5. Streaming structured queries: not supported inside streaming map functions.

🪄 Best Practices

TipWhy
Give them names (longAccumulator("bad_rows"))Easier to spot in Spark UI
Read after an action, not in the middleGuarantees aggregation complete
For custom types, subclass AccumulatorV2Provides isZero, merge, etc.
Avoid large per‑task objectsSend small diffs; else network overhead
Reset between jobs if reusedaccumulator.reset() on the driver

✅ Takeaway

Accumulators are simple, driver‑visible counters designed for instrumentation and diagnostics in PySpark:

Executors add values → Driver reads final metric.

Why do we need “special” counters at all?

In a single‑machine Python scriptIn a Spark job
You can increment a normal count += 1 variable inside a for loop.Your code is split across dozens or hundreds of JVM/Python worker processes. Each worker has its own copy of every variable, so a simple count += 1 only changes a local copy that the driver never sees.

Accumulators exist to collect simple metrics coming from many distributed tasks back to the driver in one aggregated value.
They solve the “many workers → one total” problem without you having to collect() the whole dataset or run a second pass with groupBy()/count().


Three ways to keep a counter in PySpark

MethodWhere it runsHow it worksProsConsTypical use
1. DataFrame/RDD aggregationdf.filter(...).count()Executed on the cluster, result sent to driverUses Spark’s built‑in shuffle & reduceAccurate, fault‑tolerantTriggers another full pass over the dataBusiness metrics you’ll query anyway
2. Manual per‑partition returnmapPartitions → yield (data, counter) then reduceWorkers compute partial counts, return them as recordsPure functional, deterministicNo side effects; full controlYou must change the data path (add extra columns / union extra RDD)When the counter is part of the pipeline output
3. Accumulator (e.g. LongAccumulator)Workers call acc.add(1); Spark merges on driver after the actionSide‑effect counter, zero extra data shuffleAlmost zero overhead; shows in Spark UI“At‑least‑once” update → slight over‑count if tasks retry; write‑only from executorsDebugging, data‑quality tallies, progress metrics

Key distinction

Accumulators are not meant to replace normal aggregations; they are a light‑weight tap for side metrics when you don’t want to alter the main data flow.


Minimal working examples

1. Classic aggregation (accurate but full extra pass)

errors = df.filter("status = 'ERROR'").count()        # action #1
processed = df.count()                                # action #2
print(errors, processed)

Two actions → two jobs.


2. MapPartitions pattern (brings counters back in data)

def tag_and_count(iterator):
    bad = 0
    for row in iterator:
        if row.status == "ERROR":
            bad += 1
            continue
        yield row  # only good rows pass downstream
    yield {"_counter": bad}          # add one special row

cleaned = (df.rdd
           .mapPartitions(tag_and_count)
           .toDF(df.schema)          # schema must match; messy
)

Accurate, but you’ve polluted the data with “fake” rows.


3. Accumulator (side metric, one‑liner)

bad_acc = spark.sparkContext.longAccumulator("bad_rows")

def filter_good(row):
    if row.status == "ERROR":
        bad_acc.add(1)       # every executor just “fire‑and‑forgets”
        return False
    return True

good_df = df.filter(filter_good)
good_df.write.mode("overwrite").parquet("/data/good")   # single job

print("Bad rows seen:", bad_acc.value)   # safe AFTER the write

No extra shuffle, no schema tricks, counter pops up in Spark UI.


When not to use accumulators

  1. Driving control flow if bad_acc.value > 1000: # ← dangerous: might still be 0 mid‑job spark.cancelJobGroup(...) The value is only reliable after the action completes.
  2. Exact audit counts for financial/regulatory reporting
    Retries may double‑increment; use a deterministic aggregation instead.

When accumulators shine

Use caseWhy a normal aggregation is heavy / awkward
Counting malformed JSON lines while streaming a 200 GB file into ParquetYou’d have to read the file twice or cache it just for the error tally.
Tracking how many records hit the “fallback” code path inside a big ML feature‑engineering pipelineYou don’t want an extra column or a second pass.
Emitting simple health metrics to the Spark UI so ops can see them in real timeAccumulators surface automatically in the UI.

Quick checklist for using accumulators safely

  1. Name them: sparkContext.longAccumulator("late_events")
  2. Add, don’t read, in executors
  3. Read only after an action or at foreachBatch end
  4. Expect slight over‑count if tasks may retry
  5. Reset (acc.reset()) before the next independent job if reused

TL;DR

  • Counters in distributed Spark require a mechanism that all workers can update and the driver can read.
  • Accumulators give you that with almost zero coding overhead, perfect for metrics & debugging, but they’re not a substitute for real aggregations when you need 100 % accuracy.

Great question — let’s break this down carefully.


🔹 Short answer:

No, a monotonically increasing column (like monotonically_increasing_id()) is not a counter in the same sense as an accumulator.

They serve very different purposes.


🧭 Comparison: Counter vs Monotonically Increasing ID

Featuremonotonically_increasing_id()Accumulator
What is it?A generated column that assigns a unique (but not strictly sequential) ID to each rowA shared variable updated by executors, aggregated on driver
ScopeRow-level; runs during transformationsCross-job; updates live while job runs
PurposeAdd unique row IDs (for joins, surrogate keys)Count metrics like bad rows, nulls, filtered items
TypeColumn (LongType)Driver-side variable (LongAccumulator)
Is it guaranteed to be contiguous or sequential?❌ No — values are increasing but not contiguous✅ Accumulator grows exactly as you add()
Used for control logic or metrics?❌ No — just a data transformation✅ Yes — intended for metrics/debug counters

🔍 monotonically_increasing_id() in Detail

from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn("row_id", monotonically_increasing_id())
  • Spark creates a unique long number per row, often based on partition ID + record ID.
  • Not guaranteed to start from 0 or be sequential: row_id: 0, 1, 4294967296, 4294967297, ...

Useful for:

  • Creating row identifiers in otherwise unordered datasets.
  • Doing joins where unique keys are missing.

But not useful for:

  • Tallying number of errors, nulls, fallback cases
  • Tracking distributed metrics
  • Logging or UI metrics

🔍 Example Comparison

✅ Want to count rows with NULLs in column amount?

Accumulator:

nulls_acc = spark.sparkContext.longAccumulator("null_amounts")

df_filtered = df.filter(lambda row: not (row.amount is None or nulls_acc.add(1)))
df_filtered.write.parquet("/cleaned")
print("Rows with nulls:", nulls_acc.value)

monotonically_increasing_id():

df = df.withColumn("row_id", monotonically_increasing_id())

This helps assign IDs but doesn’t count anything.


✅ Summary Table

You want to…Use
Track how many bad records occurredAccumulator
Get a unique ID per rowmonotonically_increasing_id()
Count how many rows matched a conditionAccumulator or df.filter(...).count()
Generate surrogate keys for a dimension tablemonotonically_increasing_id()
Show metrics in Spark UI for debuggingAccumulator

Pages: 1 2 3


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Posted in

Leave a Reply

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading