Great topic! Parallel processing is essential for optimizing performance in Python—especially in data engineering and PySpark pipelines where you’re often handling:
- Large datasets
- Batch API calls
- File ingestion
- Transform-heavy workloads
Let’s break it down with ✅ why, 🚀 techniques, 🧰 use cases, and 🔧 code examples.
✅ Why Parallel Processing in Python?
Problem Area | Parallelism Benefit |
---|---|
Processing large files | Split across threads/processes |
Batch API calls | Send multiple calls simultaneously |
CPU-heavy computations | Utilize multiple cores |
Data ingestion pipelines | Read/write files, DB, cloud storage faster |
🚀 Techniques: Overview with Use Case Matching
Method | Use Case | How It Works |
---|---|---|
threading | I/O-bound tasks (e.g. web scraping, API calls) | Shared memory, multiple threads |
multiprocessing | CPU-bound tasks (e.g. image/video processing) | Separate memory, multiple processes |
concurrent.futures | Cleaner syntax for threads/processes | ThreadPool / ProcessPool Executors |
asyncio | High-performance async I/O | Cooperative multitasking (event loop) |
PySpark parallelism | Distributed big data tasks | Cluster-level distributed compute |
🔧 THREADING — For I/O-bound (APIs, file reads)
import threading
import time
def download_file(url):
print(f"Downloading {url}")
time.sleep(2)
print(f"Done {url}")
urls = ["file1", "file2", "file3"]
threads = [threading.Thread(target=download_file, args=(url,)) for url in urls]
for t in threads:
t.start()
for t in threads:
t.join()
🔧 MULTIPROCESSING — For CPU-bound (data crunching)
from multiprocessing import Pool
def square(n):
return n * n
with Pool(4) as p:
results = p.map(square, [1, 2, 3, 4, 5])
print(results)
🔧 CONCURRENT.FUTURES — Unified API
from concurrent.futures import ThreadPoolExecutor
def fetch_data(url):
return f"Fetched from {url}"
with ThreadPoolExecutor(max_workers=3) as executor:
futures = executor.map(fetch_data, ["u1", "u2", "u3"])
for result in futures:
print(result)
You can replace ThreadPoolExecutor
with ProcessPoolExecutor
for CPU-bound tasks.
🧰 Real-World Use Cases in Data Engineering + PySpark
1. ✅ Parallel file ingestion
from concurrent.futures import ThreadPoolExecutor
import os
def load_to_df(file_path):
return spark.read.csv(file_path, header=True)
paths = ["/data/file1.csv", "/data/file2.csv", "/data/file3.csv"]
with ThreadPoolExecutor(max_workers=4) as executor:
dfs = list(executor.map(load_to_df, paths))
df_all = reduce(DataFrame.unionByName, dfs)
2. ✅ Batch API calls (e.g., metadata enrichment)
def fetch_metadata(record_id):
response = requests.get(f"http://api/metadata/{record_id}")
return response.json()
with ThreadPoolExecutor(max_workers=10) as executor:
metadata = list(executor.map(fetch_metadata, id_list))
3. ✅ Using PySpark’s Internal Parallelism
PySpark is natively parallel. But when combined with external Python functions, you can:
a. Use .foreachPartition()
for API calls
def enrich_partition(iterator):
for row in iterator:
# API call inside
yield call_api(row)
df.rdd.mapPartitions(enrich_partition).toDF().show()
b. Broadcast variables to avoid re-computation
lookup = {"A": 1, "B": 2}
broadcasted = spark.sparkContext.broadcast(lookup)
df = df.withColumn("code", F.udf(lambda x: broadcasted.value.get(x))(F.col("key")))
⚠️ Caution: GIL (Global Interpreter Lock)
- Python’s GIL allows only one thread to execute Python bytecode at a time.
- Thus:
threading
≠ true parallelism (good for I/O, bad for CPU)multiprocessing
bypasses GIL by creating separate processes- PySpark itself is not limited by GIL—because Spark jobs run in the JVM across distributed nodes.
🧠 When to Use What (Decision Tree)
Task Type?
|
----------------------
| |
I/O-bound? CPU-bound?
| |
Use threading Use multiprocessing
|
Many small I/O? |
| Use concurrent.futures
Use ThreadPool (ProcessPoolExecutor)
🛠 Summary Table
Tool/Method | I/O-bound | CPU-bound | Easy Syntax | PySpark-Compatible |
---|---|---|---|---|
threading | ✅ | ❌ | 😐 | Not really |
multiprocessing | ❌ | ✅ | 😐 | ❌ (not Spark-safe) |
concurrent.futures | ✅/❌ | ✅/❌ | ✅ | Partially |
asyncio | ✅ | ❌ | ⚠️ Complex | ❌ |
PySpark (RDD/DataFrame) | ✅✅ | ✅✅ | ✅ | ✅✅✅ |
in the code using ThreadPoolExecutor
, the Python threads are subject to Python’s GIL (Global Interpreter Lock), and even though they are dispatched in parallel, the actual Spark file reads are executed serially from Python.
❌ Why ThreadPoolExecutor
doesn’t help with Spark reads:
- Each
spark.read.csv()
is a JVM call, not an async or non-blocking I/O. - PySpark is not thread-safe; multiple Spark jobs cannot safely run in parallel threads from Python.
- So you’re just submitting one read at a time via the driver (Python process), even if threaded.
✅ Better Approach: Let Spark Read Files in Parallel Natively
Instead of parallelizing with threads, let Spark do the parallelism it was built for.
🔥 ✅ Use this idiomatic PySpark pattern:
df_all = spark.read.option("header", True).csv(paths)
✅
paths
can be:
- A list of file paths
- A directory path
- A glob path like
"/data/*.csv"
✔ What happens here:
- Spark splits the read operation into partitions.
- Spark will distribute reading multiple files in parallel across executors.
- It uses the JVM’s I/O threads, not Python threads — so GIL is not a problem.
🧪 Example with directory globbing:
df_all = spark.read.option("header", True).csv("/data/file*.csv")
🛠️ Notes on Spark’s File Parallelism:
Feature | Description |
---|---|
Multi-file read | Spark reads multiple files in parallel |
Partitioned reads | Each file can be split into chunks if it’s large enough (splittable format) |
Supported formats | CSV, Parquet, ORC, JSON |
Performance optimization | Use .repartition() , coalesce() as needed |
Avoid Python thread-based reads | Always let the JVM/Spark handle parallelism |
🧠 Summary
Method | Parallel Read? | Recommended? | Notes |
---|---|---|---|
ThreadPoolExecutor + spark.read | ❌ (serial via driver) | ❌ | Not Spark-safe |
spark.read.csv(paths) | ✅ | ✅✅✅ | Spark parallelizes across workers |
spark.read.csv("*.csv") | ✅ | ✅✅✅ | Clean and scalable |
Leave a Reply