#Define a function to apply to each row

def process_row(row):
print(f”Name: {row[‘name’]}, Score: {row[‘score’]}”)

#Apply the function using foreach

df.foreach(process_row)..

My question is- the process function for each element gets applied at driver side, is there a way that this loop will execute on distributed side

You’re absolutely right — and this is a key concept in PySpark.


df.foreach(process_row) runs on the driver, not workers.

  • The function you define with foreach() in the DataFrame API is applied on the driver after collecting the rows (internally).
  • So it does NOT run in parallel on the executors (i.e., distributed cluster nodes).

✅ To Run Processing on Workers (Distributed), Use RDD API

If you want your function to be applied on worker nodes in parallel, you need to:

🔄 Convert the DataFrame to an RDD and use .foreach() on that:

df.rdd.foreach(lambda row: your_function(row))

This is executed in parallel on the worker nodes, not on the driver.

🧠 Example

def process_row_distributed(row):
    # This runs on worker nodes
    print(f"Name: {row['name']}, Score: {row['score']}")

df.rdd.foreach(process_row_distributed)

⚠️ Note: print() still goes to executor logs, not driver stdout. If you’re testing this on local mode, it might look like it’s on the driver, but in a real cluster, each worker writes to its own log.


🔁 Summary

MethodWhere It RunsDistributed?
df.foreach()Driver❌ No
df.rdd.foreach()Workers✅ Yes
df.rdd.map()Workers✅ Yes
df.rdd.collect()Driver❌ No

✅ Best Practice

If your function:

  • Is lightweight and needs to access all rows → use collect() or foreach() on the driver.
  • Needs to scale or perform IO/compute per row → use .rdd.foreach() to distribute.

Would you like a version of df.rdd.foreach() that writes to a file from executors or performs a real computation?

Great! Let’s walk through an example where:

  • You write a custom function that runs on Spark worker nodes (executors).
  • Each row is processed and logged to a file, but on distributed workers, not the driver.

✅ Goal:

Use df.rdd.foreach() to process each row in parallel and write results to a distributed location (like HDFS, S3, or a shared mounted path).


🔧 Step-by-Step Example

✅ Setup: Sample DataFrame

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DistributedForeachExample").getOrCreate()

data = [("Alice", 85), ("Bob", 90), ("Charlie", 78)]
df = spark.createDataFrame(data, ["name", "score"])

✅ Approach: Use .rdd.foreach() and write to HDFS or local paths

🧠 Worker Function to Write to HDFS/FileSystem

def process_and_save(row):
    import os

    # Construct output path (HDFS/S3/local path)
    output_dir = "/tmp/processed_scores"
    os.makedirs(output_dir, exist_ok=True)

    file_path = os.path.join(output_dir, f"{row['name']}.txt")

    with open(file_path, 'w') as f:
        f.write(f"Name: {row['name']}, Score: {row['score']}")

🟡 Important Notes:

  • os.makedirs() will only run on each executor and affect local worker disk (or container’s FS).
  • On a real cluster, you’ll want this to go to:
    • HDFS: /user/yourname/processed_scores/{row['name']}.txt
    • S3: Use s3a://bucket/... and the boto3 or Hadoop FS client.

✅ Run in Distributed Mode

df.rdd.foreach(process_and_save)

📂 Where to Check Output?

  • If using local mode: check /tmp/processed_scores/
  • If on a cluster: you must check the executor’s local FS or write to HDFS/S3

✅ For Cloud Storage (S3, GCS, etc.)

To write to S3:

def process_and_save_to_s3(row):
    import boto3

    s3 = boto3.client("s3")
    content = f"Name: {row['name']}, Score: {row['score']}"
    s3.put_object(Bucket="my-output-bucket", Key=f"scores/{row['name']}.txt", Body=content)

Requires boto3 and AWS credentials accessible on the worker.


🧹 Bonus: Use .foreachPartition() for Fewer Connections

If writing to a remote service (like S3 or DB), it’s often better to batch per partition:

def process_partition(partition):
    import boto3
    s3 = boto3.client("s3")

    for row in partition:
        content = f"Name: {row['name']}, Score: {row['score']}"
        s3.put_object(Bucket="my-output-bucket", Key=f"scores/{row['name']}.txt", Body=content)

df.rdd.foreachPartition(process_partition)

Pages: 1 2 3 4

Posted in

Leave a Reply

Your email address will not be published. Required fields are marked *