Here’s a clear and concise breakdown of multiprocessing vs multithreading in Python, with differences, real-world data engineering use cases, and code illustrations.


🧠 Core Difference:

FeatureMultithreadingMultiprocessing
Concurrency TypeI/O-boundCPU-bound
Threads/ProcessesMultiple threads in the same process (share memory)Multiple processes (each with its own memory)
GIL ImpactAffected by Python’s GIL (Global Interpreter Lock)Bypasses GIL—true parallelism
Memory UsageShared memory (less RAM used)Separate memory (more RAM used)
Best ForI/O-bound tasks (API calls, file reads/writes)CPU-bound tasks (data crunching, large transformations)
StabilityLess stable if thread crashesMore isolated and fault-tolerant

🔧 When to Use What?

Task TypeUse
Reading many CSVs from diskMultithreading (I/O-bound)
Fetching multiple APIsMultithreading (network-bound)
Parsing XML/JSONMultithreading
Large data transformationsMultiprocessing (CPU-intensive)
Complex calculations (e.g., ML)Multiprocessing
Cleaning large 1–10 GB chunksMultiprocessing

🧪 Example: Multithreading (I/O-bound)

import requests
from concurrent.futures import ThreadPoolExecutor

urls = ["https://httpbin.org/delay/1"] * 5  # Simulates slow API

def fetch(url):
    response = requests.get(url)
    return url, response.status_code

with ThreadPoolExecutor(max_workers=5) as executor:
    results = executor.map(fetch, urls)

for r in results:
    print(r)

✅ This runs in ~1 second total (instead of 5 sec sequentially).


🧪 Example: Multiprocessing (CPU-bound)

from multiprocessing import Pool
import time

def square(n):
    time.sleep(1)
    return n * n

numbers = [1, 2, 3, 4, 5]

with Pool(processes=5) as pool:
    results = pool.map(square, numbers)

print(results)

✅ Also runs in ~1 second total, because each number is squared in a separate process.


🧱 Real-World Use Case Comparison

ScenarioRecommendedWhy?
ETL: Fetch from 10 APIsMultithreadingLow CPU, high wait time
ETL: Transform 5 large CSVsMultiprocessingHigh CPU, parallel processing
Web scraper fetching 100 URLsMultithreadingMostly network delay
ML training or parallel predictionMultiprocessingCPU-intensive work
Read/write to disk in batchMultithreadingI/O-bound operation
Pandas chunked transformMultiprocessingHeavy CPU usage for large data

🔐 Note on GIL (Global Interpreter Lock)

  • Python has a GIL which prevents multiple native threads from executing Python bytecodes at the same time.
  • Threading is limited in Python because of the GIL.
  • Multiprocessing creates separate interpreter processes, fully bypassing GIL, enabling true parallelism.

✅ Summary Table

FeatureMultithreadingMultiprocessing
Python GILAffectedNot affected
MemorySharedSeparate
Speed for I/OFast (concurrent)Slower, overkill
Speed for CPUSlower (GIL bottleneck)Much faster
CrashesAffects whole appIsolated, safer
Use in ETLAPI calls, disk read/writeTransform, validation, parsing

Below is a fully in‑line Jupyter‑style notebook (Markdown + code cells) that you can copy‑paste straight into any .ipynb or run cell‑by‑cell in an interactive notebook.
It covers:

  • Retry decorator (with exponential back‑off option)
  • OOP loaders (CSVLoader, APILoader)
  • Multithreaded ETL for I/O‑bound work
  • Multiprocessing ETL for CPU‑bound work
  • Side‑by‑side timing comparison on the same CSV files

# Modular ETL Demo – Multithreading vs. Multiprocessing
Demonstrates core data‑engineering patterns in Python:

* OOP & composition  
* Custom exceptions & robust error handling  
* Reusable decorators (`@retry`)  
* Concurrency: **ThreadPoolExecutor** (I/O) vs. **multiprocessing.Pool** (CPU)  
# Cell 1 ─ Imports
import os, time, math, random, requests, csv, pathlib
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Pool, cpu_count
import pandas as pd
# Cell 2 ─ Retry decorator (with optional back‑off)
def retry(n=3, delay=1, backoff=1):
    """
    Retry a function up to *n* times.
    :param delay: initial wait in seconds
    :param backoff: back‑off multiplier (1 = constant delay)
    """
    def decorator(fn):
        def wrapper(*args, **kwargs):
            wait = delay
            for attempt in range(1, n + 1):
                try:
                    return fn(*args, **kwargs)
                except Exception as e:
                    if attempt == n:
                        raise
                    print(f"[retry] attempt {attempt} failed: {e} → waiting {wait}s")
                    time.sleep(wait)
                    wait *= backoff
        return wrapper
    return decorator
# Cell 3 ─ OOP: loaders & custom error
class DataLoadError(Exception):
    """Raised when a data source cannot be loaded."""
    pass

class DataLoader:
    def __init__(self, source):
        self.source = source
        
    def load(self):
        raise NotImplementedError
        
class CSVLoader(DataLoader):
    def load(self):
        if not self.source.endswith(".csv"):
            raise DataLoadError("Only CSV supported")
        return pd.read_csv(self.source)
    
class APILoader(DataLoader):
    @retry(n=2, delay=2)
    def load(self):
        print(f"Fetching {self.source}")
        r = requests.get(self.source, timeout=2)
        r.raise_for_status()
        return r.json()
# Cell 4 ─ Helper: generate sample CSVs (~10 MB total, adjustable)
def generate_csv_files(n_files=5, rows=200_000, out_dir="sample_csv"):
    Path(out_dir).mkdir(exist_ok=True)
    files = []
    for i in range(n_files):
        path = Path(out_dir) / f"data_{i}.csv"
        if path.exists():  # avoid regenerating
            files.append(str(path)); continue
        df = pd.DataFrame({
            "id": range(rows),
            "value": [random.random() for _ in range(rows)]
        })
        df.to_csv(path, index=False)
        files.append(str(path))
    return files

csv_files = generate_csv_files()
csv_files[:2]  # peek
# Cell 5 ─ Multithreaded ETL (I/O‑bound)
def thread_transform(df: pd.DataFrame) -> pd.DataFrame:
    """Light transform: compute mean & std of 'value'."""
    return pd.DataFrame({
        "mean": [df["value"].mean()],
        "std": [df["value"].std()]
    })

def threaded_etl(files, max_workers=4):
    def task(path):
        loader = CSVLoader(path)
        df = loader.load()
        return thread_transform(df)
    
    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        results = list(ex.map(task, files))
    return pd.concat(results, ignore_index=True)
# Cell 6 ─ Multiprocessing ETL (CPU‑bound)
def cpu_heavy_transform(df: pd.DataFrame) -> pd.DataFrame:
    """Artificial CPU load: apply log‑sum‑exp row‑wise."""
    x = df["value"].values
    # expensive but pointless computation for demo
    lse = math.log(sum(math.exp(v) for v in x))
    return pd.DataFrame({"log_sum_exp": [lse]})

def process_file(path):
    loader = CSVLoader(path)
    df = loader.load()
    return cpu_heavy_transform(df)

def multiproc_etl(files, workers=None):
    workers = workers or min(cpu_count(), len(files))
    with Pool(processes=workers) as pool:
        results = pool.map(process_file, files)
    return pd.concat(results, ignore_index=True)
# Cell 7 ─ Timing helper
from contextlib import contextmanager

@contextmanager
def timer(label):
    start = time.perf_counter()
    yield
    dur = time.perf_counter() - start
    print(f"[{label}] {dur:.2f}s")
# Cell 8 ─ Run and compare
with timer("THREADING"):
    thread_out = threaded_etl(csv_files, max_workers=5)

with timer("MULTIPROCESSING"):
    mp_out = multiproc_etl(csv_files, workers=5)

display(thread_out.head())
display(mp_out.head())
## Observations

* **THREADING** excels when the bottleneck is disk/network I/O  
  (CSV read → light transform).  

* **MULTIPROCESSING** shines when each task is CPU‑intensive  
  (heavy math transform), bypassing the GIL for true parallelism.  

Tune `max_workers` / `workers` based on CPU cores, file count, and memory.  

Can the multithreading / multiprocessing patterns be “lifted” to a distributed PySpark environment?

Yes — but you need to translate where the parallelism happens and who owns what work. Below is a concise guide that maps each element of the earlier notebook to its Spark‑native equivalent, highlights the “gotchas,” and finishes with a minimal PySpark pipeline skeleton you can paste into a Databricks / EMR / local‐Spark notebook.


1. What changes and what stays the same?

Earlier PatternRole in a single‑node scriptSpark‑Native EquivalentKey Adaptation Points
ThreadPool / multiprocessingGave parallelism on one machineSpark executors & tasksDon’t spawn extra threads/processes inside executors; Spark already parallelises.
Pandas read_csvI/O‑bound file readsspark.read.csv (wildcards / partition discovery)Spark splits input files into many partitions automatically.
Custom loader classes (CSVLoader, APILoader)Encapsulate source‑specific logicKeep them! They now run on the driver to configure Spark jobs, or run on each executor via UDF / mapPartitions.
Retry decoratorDeals with flaky I/OStill valuable for: • remote API fetch in driver • per‑record API calls via @pandas_udf or mapPartitionsBe sure decorator is serialisable (pure‑Python functions serialise fine).
CPU‑heavy transformIllustrates bypassing GILUse Spark SQL / DataFrame API or vectorised Pandas UDFs (Arrow) ‑‑> true distributed CPU use.
Timing helperMeasures local runtimeUse Spark UI / spark.time() or wrap actions in Python time.perf_counter().

2. Architectural shift

┌──────── Driver ────────┐             ┌─ Executor 1 ─┐
│ • Defines ETL classes  │  Spark job  │Partition #1  │
│ • Reads configs        │  ⇩⇧ events  │Partition #2  │
│ • Kicks off actions    │────────────▶│ ...          │
│ • Runs small API loads │             └──────────────┘
└────────────────────────┘                 ⋮
                        ←──── multiple executors & tasks ────→
  • Driver (your notebook or Python script):
    • Houses global OOP orchestration, retry decorators, and external‑system calls that do not need cluster‑scale.
  • Executors:
    • Automatically handle data‑parallel work (CSV reading, DataFrame transformations, pandas UDFs).
    • Don’t start concurrent.futures pools inside them unless you really know what you’re doing— extra processes contend with Spark’s own thread pools and can starve resources.

3. Example: Sparkified ETL Skeleton

# Cell 0 – Spark session
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("DistributedETL")
         .getOrCreate())

# -------------------------------------------
# Cell 1 – Retry decorator (unchanged)
import time, functools, requests
def retry(n=3, delay=1):
    def deco(fn):
        @functools.wraps(fn)
        def wrapper(*a, **kw):
            for i in range(1, n+1):
                try:
                    return fn(*a, **kw)
                except Exception as e:
                    if i == n: raise
                    time.sleep(delay)
        return wrapper
    return deco
# -------------------------------------------
# Cell 2 – OOP loaders (driver‑side)
class CSVLoader:
    def __init__(self, path_glob, schema=None):
        self.path_glob = path_glob
        self.schema = schema
    def load(self):
        df = (spark.read
                     .option("header", True)
                     .schema(self.schema)
                     .csv(self.path_glob))
        return df

class APILoader:
    def __init__(self, url):
        self.url = url
    @retry(n=2, delay=2)
    def load(self):
        r = requests.get(self.url, timeout=5)
        r.raise_for_status()
        return r.json()
# -------------------------------------------
# Cell 3 – CPU‑heavy transform using pandas UDF
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
import numpy as np
# Vectorised row‑batch transform
@F.pandas_udf(DoubleType())
def log_sum_exp_udf(s: pd.Series) -> pd.Series:
    return np.log(np.exp(s).sum())
# -------------------------------------------
# Cell 4 – End‑to‑end ETL
def distributed_etl(csv_glob, out_table):
    # 1. Ingest
    df = CSVLoader(csv_glob).load()          # distributed read
    # 2. Transform
    df2 = df.withColumn("lse", log_sum_exp_udf("value"))
    # 3. Write
    (df2.write.mode("overwrite")
              .partitionBy("year")           # example
              .format("delta")               # or parquet/csv/etc
              .saveAsTable(out_table))
    return df2

result_df = distributed_etl("/mnt/data/sample_csv/*.csv",
                            "analytics.facts_values")
result_df.show(3)

What this demonstrates

  1. Driver chooses file glob and schema.
  2. Spark reads each CSV file in parallel partitions.
  3. The pandas UDF executes the CPU‑heavy log_sum_exp on each partition → real parallel CPU across executors.
  4. Results are written back as a Delta table (or Parquet files).

4. Where you might still use native Python concurrency

NeedPreferred PlaceHow
Calling a REST API per row in a dimension table (few thousand rows)mapPartitions + small ThreadPool inside itPartition boundary keeps thread pools contained.
High‑fan‑out web scraping before Spark ingestDriver using ThreadPoolExecutor, then spark.createDataFrame()Only thousands of small JSON objects → collect then parallelise.
CPU‑bound pure‑Python algorithm Spark doesn’t cover (e.g. custom Cython lib)rdd.mapPartitions(lambda it: expensive(it))Each partition is already a separate process; no extra pool needed.

5. Common Pitfalls

  1. Nested multiprocessing inside Spark executors → often hangs or OOMs.
  2. Large Python objects in broadcast variables → serialisation overhead.
  3. Logging / retry code that isn’t serialisable (non‑global lambdas, open sockets).
  4. Ignoring data skew—one “giant” CSV or partition can bottleneck an otherwise parallel job.

Global Interpreter Lock (GIL) — in Plain English and Under‑the‑Hood

What it isWhy it existsPractical impact
A mutual‑exclusion (mutex) lock inside the CPython interpreter that ensures only one OS thread at a time executes Python byte‑code.CPython’s memory model uses reference counting for garbage‑collection. Incrementing/decrementing those counters must be thread‑safe; the simplest guarantee is to wrap byte‑code execution in a single lock.• CPU‑bound threads can’t run in parallel on multiple cores.• I/O‑bound threads often benefit anyway (GIL is released while waiting on I/O).• True multi‑core execution needs processes (multiprocessing, Spark, Ray) or native extensions that release the GIL.

1. Where the GIL Lives

┌────────── CPython Runtime ──────────┐
│  Thread‑1   Thread‑2   Thread‑3     │
│    ▼          ▼          ▼          │
│  ┌──────── Global Interpreter Lock ─┐│
│  │  Only **one** thread at a time   ││
│  └───────────────────────────────────┘│
│  ... executes Python byte‑code ...     │
└────────────────────────────────────────┘

2. When the GIL Is Held vs. Released

StateGILExamples
Executing byte‑code🔒Pure‑Python loops, math, string ops
Blocking on I/O🔓socket.recv(), time.sleep(), file read/write
Inside C‑extension code that calls Py_BEGIN_ALLOW_THREADS … Py_END_ALLOW_THREADS🔓NumPy vector ops, pandas C‑level routines, requests.get() waiting on network

Result: Python threads can overlap I/O and C‑level numerics, but not CPU‑bound pure‑Python work.


3. Demonstrating the Bottleneck (conceptual)

import threading, time, math

def cpu_bound(n):
    s = 0
    for i in range(10_000_000):
        s += math.sin(i)
    return s

start = time.perf_counter()
threads = [threading.Thread(target=cpu_bound, args=(0,)) for _ in range(4)]
[t.start() for t in threads]; [t.join() for t in threads]
print(f"4 threads: {time.perf_counter()-start:.2f}s")   # ≈ same as one thread

Running four threads is not faster than one because each thread waits for the GIL.


4. Ways to Work Around the GIL

StrategyHow it helpsTypical Tools
Multiprocessing / joblib / Ray / SparkEach child is a new interpreter process → own GIL → true multi‑coremultiprocessing.Pool, spark-submit, ray
Native extensions that release the GILHeavy work done in C/C++/Fortran while GIL is releasedNumPy, pandas, TensorFlow, PyTorch
Cython “nogil” blocks / Rust + PyO3Manually drop the GIL inside compiled code regionscython.parallel, maturin
Alternative interpretersNo GIL, or a different concurrency modelPyPy‑STM (experimental), GraalPy, Jython (uses JVM threads)
Python 3.12 sub‑interpreters (PEP 684)Separate interpreters in one process with per‑interpreter GILs; still early‑stageinterpreters.create()

5. When It Doesn’t Matter

  • I/O‑bound workloads (web scrapers, Kafka consumers, ETL reading S3): threads overlap network/disk waits; GIL is released while blocking.
  • Vectorised numeric workloads: NumPy, pandas, and many ML libs push work to BLAS/CUDA—which run outside the GIL.

6. Mental Model Cheat‑Sheet

Is the job CPU‑bound  ➔   use processes or Spark.
Is the job I/O‑bound ➔   threads are fine (GIL mostly released).
Is the job numeric C‑backed ➔   threads + NumPy OK.
Need micro‑parallel loops in pure Python ➔   Cython/Rust or rewrite algorithm.

Key Take‑Aways

  1. The GIL is implementation‑specific (only CPython); your Python code, not the language spec, imposes it.
  2. It’s a trade‑off: simplicity & C‑extension speed vs multi‑core threading.
  3. Data engineers typically bypass it with multiprocessing, distributed engines (PySpark, Dask, Ray), or C‑accelerated libraries—so you can still scale Python pipelines to TB‑scale data.

Pages: 1 2


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Posted in

Leave a Reply

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading