#Define a function to apply to each row
def process_row(row):
print(f”Name: {row[‘name’]}, Score: {row[‘score’]}”)#Apply the function using foreach
df.foreach(process_row)..
My question is- the process function for each element gets applied at driver side, is there a way that this loop will execute on distributed side
You’re absolutely right — and this is a key concept in PySpark.
❗df.foreach(process_row)
runs on the driver, not workers.
- The function you define with
foreach()
in the DataFrame API is applied on the driver after collecting the rows (internally). - So it does NOT run in parallel on the executors (i.e., distributed cluster nodes).
✅ To Run Processing on Workers (Distributed), Use RDD API
If you want your function to be applied on worker nodes in parallel, you need to:
🔄 Convert the DataFrame to an RDD and use .foreach()
on that:
df.rdd.foreach(lambda row: your_function(row))
This is executed in parallel on the worker nodes, not on the driver.
🧠 Example
def process_row_distributed(row):
# This runs on worker nodes
print(f"Name: {row['name']}, Score: {row['score']}")
df.rdd.foreach(process_row_distributed)
⚠️ Note:
print()
still goes to executor logs, not driver stdout. If you’re testing this on local mode, it might look like it’s on the driver, but in a real cluster, each worker writes to its own log.
🔁 Summary
Method | Where It Runs | Distributed? |
---|---|---|
df.foreach() | Driver | ❌ No |
df.rdd.foreach() | Workers | ✅ Yes |
df.rdd.map() | Workers | ✅ Yes |
df.rdd.collect() | Driver | ❌ No |
✅ Best Practice
If your function:
- Is lightweight and needs to access all rows → use
collect()
orforeach()
on the driver. - Needs to scale or perform IO/compute per row → use
.rdd.foreach()
to distribute.
Would you like a version of df.rdd.foreach()
that writes to a file from executors or performs a real computation?
Great! Let’s walk through an example where:
- You write a custom function that runs on Spark worker nodes (executors).
- Each row is processed and logged to a file, but on distributed workers, not the driver.
✅ Goal:
Use df.rdd.foreach()
to process each row in parallel and write results to a distributed location (like HDFS, S3, or a shared mounted path).
🔧 Step-by-Step Example
✅ Setup: Sample DataFrame
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DistributedForeachExample").getOrCreate()
data = [("Alice", 85), ("Bob", 90), ("Charlie", 78)]
df = spark.createDataFrame(data, ["name", "score"])
✅ Approach: Use .rdd.foreach()
and write to HDFS or local paths
🧠 Worker Function to Write to HDFS/FileSystem
def process_and_save(row):
import os
# Construct output path (HDFS/S3/local path)
output_dir = "/tmp/processed_scores"
os.makedirs(output_dir, exist_ok=True)
file_path = os.path.join(output_dir, f"{row['name']}.txt")
with open(file_path, 'w') as f:
f.write(f"Name: {row['name']}, Score: {row['score']}")
🟡 Important Notes:
os.makedirs()
will only run on each executor and affect local worker disk (or container’s FS).- On a real cluster, you’ll want this to go to:
- HDFS:
/user/yourname/processed_scores/{row['name']}.txt
- S3: Use
s3a://bucket/...
and theboto3
or Hadoop FS client.
- HDFS:
✅ Run in Distributed Mode
df.rdd.foreach(process_and_save)
📂 Where to Check Output?
- If using local mode: check
/tmp/processed_scores/
- If on a cluster: you must check the executor’s local FS or write to HDFS/S3
✅ For Cloud Storage (S3, GCS, etc.)
To write to S3:
def process_and_save_to_s3(row):
import boto3
s3 = boto3.client("s3")
content = f"Name: {row['name']}, Score: {row['score']}"
s3.put_object(Bucket="my-output-bucket", Key=f"scores/{row['name']}.txt", Body=content)
Requires
boto3
and AWS credentials accessible on the worker.
🧹 Bonus: Use .foreachPartition()
for Fewer Connections
If writing to a remote service (like S3 or DB), it’s often better to batch per partition:
def process_partition(partition):
import boto3
s3 = boto3.client("s3")
for row in partition:
content = f"Name: {row['name']}, Score: {row['score']}"
s3.put_object(Bucket="my-output-bucket", Key=f"scores/{row['name']}.txt", Body=content)
df.rdd.foreachPartition(process_partition)
Leave a Reply