Memory Management through Hadoop Traditional map reduce vs Pyspark- explained with example of Complex data pipeline used for Both used

by | Jun 25, 2024 | Pyspark | 0 comments

Yup. We will discuss- Memory Management through Hadoop Traditional map reduce vs Pyspark- explained with example of Complex data pipeline used for Both.

Let’s delve into a detailed comparison of memory management between Hadoop Traditional MapReduce and PySpark, using a real-world example of a complex data pipeline for both frameworks.

Hadoop Traditional MapReduce

Real-World Example: Complex ETL Pipeline

Scenario: A data pipeline that processes web server logs to compute user session statistics, filter erroneous data, and generate aggregated reports.

  1. Data Ingestion:
    • Read raw logs from HDFS.
  2. Data Cleaning:
    • Filter out records with missing or malformed fields.
  3. Sessionization:
    • Group records by user and time interval to form sessions.
  4. Aggregation:
    • Compute session statistics such as total time spent and number of pages visited.

Memory Management in Hadoop MapReduce

  • Fixed Memory Allocation:
    • Each task (map or reduce) is allocated a fixed amount of memory, configured via parameters like mapreduce.map.memory.mb and mapreduce.reduce.memory.mb.
  • Intermediate Data Spilling:
    • Intermediate results are spilled to disk when in-memory buffers reach a certain threshold (typically 80%).
  • Disk I/O:
    • Heavy reliance on disk for intermediate data during the shuffle and sort phase.

Configuration:

<configuration>
<property>
<name>mapreduce.map.memory.mb</name>
<value>2048</value>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>4096</value>
</property>
<property>
<name>mapreduce.task.io.sort.mb</name>
<value>1024</value>
</property>
<property>
<name>mapreduce.task.io.sort.factor</name>
<value>100</value>
</property>
</configuration>

Execution and Memory Usage:

  1. Map Phase:
    • Input splits processed by mappers, each allocated 2 GB memory.
    • Intermediate key-value pairs stored in a 1 GB in-memory buffer.
    • Buffer spills to disk when 80% full, resulting in frequent disk I/O.
  2. Shuffle and Sort Phase:
    • Intermediate data merged and sorted on disk.
    • Significant disk I/O overhead due to lack of in-memory processing.
  3. Reduce Phase:
    • Each reducer allocated 4 GB memory.
    • Fetches and processes intermediate data from mappers.
    • Final results written back to HDFS.

PySpark

Real-World Example: Complex ETL Pipeline

Scenario: The same data pipeline as above.

Memory Management in PySpark

  • In-Memory Computation:
    • Data stored in-memory using Resilient Distributed Datasets (RDDs) or DataFrames.
    • Intermediate results cached in memory, reducing disk I/O.
  • Configurable Memory Management:
    • Executor memory and cache persistence levels configurable.
    • Dynamic memory management to balance between storage and execution memory.

Configuration:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("Complex ETL Pipeline") \
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory", "2g") \
.config("spark.memory.fraction", "0.8") \
.getOrCreate()

Execution and Memory Usage:

  1. Data Ingestion:
    • Read raw logs from HDFS into a DataFrame.
    df = spark.read.csv("hdfs:///path/to/input/*.csv", header=True, inferSchema=True)
  2. Data Cleaning:
    • Filter out erroneous records in-memory.
    df_cleaned = df.filter(df["column"].isNotNull())
  3. Sessionization:
    • Group records by user and time interval, leveraging in-memory processing.
    pythonCopy codefrom pyspark.sql.functions import window df_sessions = df_cleaned.groupBy("user", window("timestamp", "30 minutes")).agg({"page": "count"})
  4. Aggregation:
    • Compute session statistics with cached intermediate results.
    df_sessions.cache() df_aggregated = df_sessions.groupBy("user").agg({"session_duration": "sum", "page_count": "sum"})
  5. Write Results:
    • Output the results back to HDFS.
    df_aggregated.write.parquet("hdfs:///path/to/output/")

Memory Usage Details:

  • Executor Memory:
    • Each executor allocated 4 GB memory.
    • Spark dynamically manages memory between storage (cached data) and execution (task processing).
  • In-Memory Processing:
    • Intermediate results (e.g., cleaned data, sessionized data) stored in-memory.
    • Caching reduces recomputation and minimizes disk I/O.
  • Memory Efficiency:
    • Spark’s memory management allows efficient handling of large datasets with minimal spilling to disk.
    • Executors can be dynamically allocated based on workload, improving resource utilization.

Comparison Summary:

FeatureHadoop MapReducePySpark
Memory AllocationFixed per task (e.g., 2 GB for mappers)Configurable executor memory (e.g., 4 GB)
Intermediate Data HandlingSpilled to disk when buffers are fullCached in-memory, reduced disk I/O
Shuffle and SortDisk-based, I/O intensiveIn-memory, optimized memory management
Data CachingNot supportedSupported, reducing recomputation
Dynamic Resource AllocationNot supportedSupported, efficient resource utilization
Execution SpeedSlower due to disk I/OFaster due to in-memory computation

Hadoop Traditional MapReduce relies heavily on disk I/O for intermediate data management, leading to potential performance bottlenecks. Memory management is fixed and can result in frequent spills to disk. In contrast, PySpark utilizes in-memory computation, configurable memory management, and dynamic resource allocation, enabling faster data processing and more efficient memory usage. This makes PySpark more suitable for complex data pipelines, especially those requiring iterative operations and real-time data analysis.

Written by HintsToday Team

Related Posts

Project Alert: Automation in Pyspark

Here is a detailed approach for dividing a monthly PySpark script into multiple code steps. Each step will be saved in the code column of a control DataFrame and executed sequentially. The script will include error handling and pre-checks to ensure source tables are...

What is PySpark DataFrame API? How it relates to Pyspark SQL

In PySpark, you can perform operations on DataFrames using two main APIs: the DataFrame API and the Spark SQL API. Both are powerful and can be used interchangeably to some extent. Here's a breakdown of key concepts and functionalities: 1. Creating DataFrames: you can...

Get the latest news

Subscribe to our Newsletter

0 Comments

Submit a Comment

Your email address will not be published. Required fields are marked *