Pyspark RDDs & Dataframes -Transformations, actions and execution operations- please explain and list them

by | Jun 16, 2024 | Pyspark | 0 comments

PySpark offers two fundamental data structures for distributed data processing: Resilient Distributed Datasets (RDDs) and DataFrames. Both provide mechanisms for manipulating and analyzing data, but they differ in structure and operations. Here’s a breakdown of transformations, actions, and execution:

1. RDDs (Resilient Distributed Datasets):

  • Represent an immutable collection of data objects partitioned across a cluster of machines.
  • Offer fault tolerance: if a worker node fails, the data can be recomputed from other nodes.

Transformations:

  • Operations that create a new RDD from an existing one. They are lazy (don’t trigger computation until an action is called). Common transformations include:
    • map(func): Applies a function to each element in the RDD.
    • filter(func): Creates a new RDD containing only elements that satisfy a condition defined by the function.
    • flatMap(func): Similar to map but allows a function to return multiple elements for each input element.
    • join(otherRDD, func): Joins two RDDs based on a specified function.
    • groupBy(func): Groups elements together based on the output of the function.

Actions:

  • Operations that return a result or perform a side effect on the RDD. They trigger the actual computation on the cluster. Common actions include:
    • collect(): Gathers all elements of the RDD to the driver program (potentially large for big datasets).
    • count(): Returns the total number of elements in the RDD.
    • first(): Returns the first element in the RDD.
    • foreach(func): Applies a function to each element in the RDD on the worker nodes.
    • saveAsTextFile(path): Saves the RDD as a text file.

Execution:

  • Transformations are applied lazily, building a lineage of operations.
  • When an action is called, the lineage is traversed, and the actual computation happens on the cluster in stages.

2. DataFrames:

  • Built on top of RDDs, providing a more structured and SQL-like interface for data manipulation.
  • Offer columnar data storage for efficient querying.

Transformations:

  • Similar to RDD transformations but operate on DataFrames with column-level operations. Common transformations include:
    • select(col1, col2): Selects specific columns.
    • filter(condition): Filters rows based on a condition.
    • join(otherDF, condition): Joins two DataFrames based on a specified condition.
    • groupBy(col): Groups rows by a specific column.
    • orderBy(col, ascending=True): Orders rows by a column in ascending or descending order.

Actions:

  • Similar to RDD actions but work on DataFrames. Common actions include:
    • collect(): Gathers all rows of the DataFrame to the driver program.
    • count(): Returns the total number of rows in the DataFrame.
    • show(): Displays a limited number of rows in the DataFrame.
    • write.csv(path): Saves the DataFrame as a CSV file.

Execution:

  • Transformations are generally optimized for DataFrames, leading to potentially faster execution compared to RDDs.
  • Similar to RDDs, actions trigger the computation on the cluster.

In summary:

  • Use RDDs for low-level data manipulation and when you need maximum control over data processing.
  • Use DataFrames for more structured data, leveraging SQL-like operations and potentially better performance for common data analysis tasks.

RDD Transformations and Actions

Transformations

Transformations create a new RDD from an existing one. They are lazy and are not executed until an action is called.

TransformationDescriptionExample
mapApplies a function to each elementrdd.map(lambda x: x * 2)
filterFilters elements based on a predicaterdd.filter(lambda x: x > 10)
flatMapSimilar to map but each input item can be mapped to 0 or more output itemsrdd.flatMap(lambda x: x.split(" "))
unionReturns a new RDD containing the union of elementsrdd1.union(rdd2)
distinctReturns a new RDD with distinct elementsrdd.distinct()
groupByKeyGroups the values for each key in the RDDrdd.groupByKey()
reduceByKeyMerges values for each key using an associative functionrdd.reduceByKey(lambda x, y: x + y)
sortBySorts the RDD by the given key functionrdd.sortBy(lambda x: x)
joinJoins two RDDs by their keysrdd1.join(rdd2)

Actions

Actions trigger the execution of transformations and return a result.

ActionDescriptionExample
collectReturns all elements as a listrdd.collect()
countReturns the number of elements in the RDDrdd.count()
firstReturns the first elementrdd.first()
takeReturns the first n elementsrdd.take(5)
reduceAggregates elements using an associative functionrdd.reduce(lambda x, y: x + y)
saveAsTextFileSaves the RDD as a text filerdd.saveAsTextFile("path/to/file")
countByKeyReturns the count of each keyrdd.countByKey()

DataFrame Transformations and Actions

Transformations

DataFrame transformations are lazy and return a new DataFrame.

TransformationDescriptionExample
selectSelects a set of columnsdf.select("name", "age")
filter/whereFilters rows based on a conditiondf.filter(df['age'] > 21)
groupByGroups rows by a columndf.groupBy("age").count()
aggPerforms aggregationdf.groupBy("age").agg({"salary": "avg"})
joinJoins two DataFramesdf1.join(df2, "id")
withColumnAdds or replaces a columndf.withColumn("new_col", df['existing_col'] * 2)
dropDrops a columndf.drop("column_name")
distinctReturns distinct rowsdf.distinct()
orderBySorts rows by a columndf.orderBy("age", ascending=False)

Actions

DataFrame actions trigger the execution of transformations and return a result.

ActionDescriptionExample
showDisplays the top rows of the DataFramedf.show(5)
collectReturns all rows as a list of Row objectsdf.collect()
countReturns the number of rowsdf.count()
firstReturns the first rowdf.first()
takeReturns the first n rowsdf.take(5)
writeWrites the DataFrame to storagedf.write.csv("path/to/file")
describeComputes basic statisticsdf.describe().show()
toPandasConverts the DataFrame to a Pandas DataFramedf.toPandas()

Execution Operations

Execution operations involve how Spark processes and schedules the tasks.

OperationDescription
Lazy EvaluationTransformations are recorded and optimized but not executed until an action is called.
DAG SchedulerConverts the logical plan into a physical execution plan and breaks it into stages.
Task SchedulerSchedules tasks within each stage to run on different nodes in the cluster.
Caching/PersistenceStores intermediate results in memory or disk to speed up future computations.
ShuffleRedistributes data across the cluster, often needed by operations like groupByKey and join.

Written by HintsToday Team

Related Posts

Project Alert: Automation in Pyspark

Here is a detailed approach for dividing a monthly PySpark script into multiple code steps. Each step will be saved in the code column of a control DataFrame and executed sequentially. The script will include error handling and pre-checks to ensure source tables are...

read more

Get the latest news

Subscribe to our Newsletter

0 Comments

Submit a Comment

Your email address will not be published. Required fields are marked *