What is PySpark DataFrame API? How it relates to Pyspark SQL

by | Jul 1, 2024 | Pyspark

In PySpark, you can perform operations on DataFrames using two main APIs: the DataFrame API and the Spark SQL API. Both are powerful and can be used interchangeably to some extent.

Here’s a breakdown of key concepts and functionalities:

1. Creating DataFrames:

you can create a PySpark DataFrame from a list of rows

from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df
DataFrame[a: bigint, b: double,c: string, d: date, e: timestamp]

Create a PySpark DataFrame with an explicit schema.

df = spark.createDataFrame([
(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

From existing data sources:

Read data from various sources like CSV, Parquet, JSON, or Hive tables using spark.read.format("format").load("path").

Leverage JDBC to connect to external databases and read tables.

PySpark DataFrame from a pandas DataFrame

pandas_df = pd.DataFrame({
'a': [1, 2, 3],
'b': [2., 3., 4.],
'c': ['string1', 'string2', 'string3'],
'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df

Also we can Create DataFrames from lists, dictionaries, or other Python data structures using spark.createDataFrame(data).

PySpark DataFrame from an RDD consisting of a list of tuples.

rdd = spark.sparkContext.parallelize([
(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
])
df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]
The DataFrames created above all have the same results and schema.
# All DataFrames above result same.
df.show()
df.printSchema()

2. DataFrame Structure:

  • DataFrames are distributed collections of rows, where each row represents a record.
  • Each column has a name and a specific data type (e.g., integer, string, timestamp).
  • You can access columns by name (e.g., df.column_name).

3. DataFrame Operations:

  • Selection: Select specific columns using df.select("column1", "column2").
  • Filtering: Filter rows based on certain conditions using df.where(condition).
  • Sorting: Sort DataFrames using df.sort(col1.desc(), col2.asc()).
  • Aggregation: Calculate summary statistics using functions like df.groupBy("column").agg(F.avg("value")).
  • Joining: Combine DataFrames based on shared columns using df1.join(df2, on="column_name", how="inner").
  • Transformations: Apply custom logic to each row using df.withColumn("new_column", F.col("existing_column") * 2).
  • Action Operations: These operations return results or trigger computations, like df.show(), df.count(), df.write.parquet("path").

4. Key Features:

  • Laziness: DataFrame operations are not immediately executed but rather built into a logical plan. When an action operation is triggered (e.g., df.show()), the plan is optimized and executed on the cluster.
  • Distributed Processing: Operations are spread across the Spark cluster for parallel processing, enabling efficient handling of large datasets.
  • Data Types: DataFrames support various data types for flexibility in data representation.

5. Relationship with Spark SQL:

  • PySpark DataFrame API provides a programmatic way to interact with DataFrames.
  • Spark SQL offers a SQL-like syntax for querying DataFrames. You can seamlessly switch between these APIs for data manipulation.

Here are some resources for further exploration:

The PySpark SQL API offers a powerful way to interact with structured data in PySpark using SQL-like syntax. Here’s a breakdown of key concepts and functionalities:

1. SparkSession:

  • The entry point for working with PySpark SQL is the SparkSession object.
  • It provides methods for creating DataFrames, interacting with catalogs (like Hive tables), and configuring Spark SQL settings.

2. DataFrames:

  • DataFrames are the fundamental data structures in PySpark SQL. They represent distributed collections of rows with named columns and specific data types.
  • You can create DataFrames from various sources (CSV, Parquet, JSON, Hive tables) or using Python collections (lists, dictionaries).

3. SQL-like Operations:

  • PySpark SQL allows you to write SQL-inspired queries on DataFrames.
  • You can perform actions like:
    • Selection: SELECT column1, column2 FROM df
    • Filtering: SELECT * FROM df WHERE condition
    • Aggregation: SELECT column, SUM(value) AS total FROM df GROUP BY column
    • Joining: SELECT * FROM df1 JOIN df2 ON df1.column = df2.column
    • Sorting: SELECT * FROM df ORDER BY column ASC

4. Key Features:

  • Integration with DataFrame API: You can seamlessly switch between PySpark SQL and DataFrame API for data manipulation.
  • SQL Functions: PySpark SQL provides a rich set of built-in functions for various operations (e.g., mathematical, string manipulation, date/time).
  • User-Defined Functions (UDFs): You can create custom functions in Python and use them within your SQL queries.
  • Catalog Integration: PySpark SQL interacts with catalogs like Hive, allowing you to manage and query external tables.

5. Benefits of PySpark SQL:

  • Readability: SQL-like syntax often feels more natural for data analysts and SQL-familiar users.
  • Conciseness: Complex data manipulations can be expressed concisely in SQL queries.
  • Familiarity: Existing SQL knowledge can be leveraged for working with PySpark.

6. Resources:

  • While PySpark SQL provides SQL-like syntax, it’s not a full-fledged SQL implementation. Some functionalities might differ from traditional SQL engines.
  • PySpark SQL leverages the power of PySpark’s distributed processing for efficient handling of large datasets.

Below is an explanation of each, along with examples.

DataFrame API

The DataFrame API provides a higher-level abstraction for structured data. It allows you to manipulate data in a more Pythonic way.

Example of DataFrame API

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("DataFrame API Example").getOrCreate()

# Create a DataFrame
data = [("James", "Smith", "M", 3000),
("Anna", "Rose", "F", 4100),
("Robert", "Williams", "M", 6200)]

columns = ["FirstName", "LastName", "Gender", "Salary"]
df = spark.createDataFrame(data, schema=columns)

# Show DataFrame
df.show()

# Select specific columns
df.select("FirstName", "Salary").show()

# Filter rows
df.filter(df["Salary"] > 4000).show()

# Group by and aggregation
df.groupBy("Gender").avg("Salary").show()

# Add new column
df.withColumn("Bonus", df["Salary"] * 0.10).show()

# Stop Spark session
spark.stop()

Spark SQL API

The Spark SQL API allows you to run SQL queries on DataFrames, providing a SQL-like interface for querying data. This can be particularly useful for users who are familiar with SQL.

Example of Spark SQL API

pythonCopy codefrom pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("Spark SQL API Example").getOrCreate()

# Create a DataFrame
data = [("James", "Smith", "M", 3000),
        ("Anna", "Rose", "F", 4100),
        ("Robert", "Williams", "M", 6200)]

columns = ["FirstName", "LastName", "Gender", "Salary"]
df = spark.createDataFrame(data, schema=columns)

# Register DataFrame as a temporary SQL table
df.createOrReplaceTempView("employees")

# Show DataFrame using SQL
spark.sql("SELECT * FROM employees").show()

# Select specific columns using SQL
spark.sql("SELECT FirstName, Salary FROM employees").show()

# Filter rows using SQL
spark.sql("SELECT * FROM employees WHERE Salary > 4000").show()

# Group by and aggregation using SQL
spark.sql("SELECT Gender, AVG(Salary) AS AvgSalary FROM employees GROUP BY Gender").show()

# Add new column using SQL
spark.sql("SELECT *, Salary * 0.10 AS Bonus FROM employees").show()

# Stop Spark session
spark.stop()

When to Use Each API

  • DataFrame API: Use this for more programmatic operations and when you need to chain multiple transformations together. It is more Pythonic and integrates seamlessly with other PySpark operations.
  • Spark SQL API: Use this when you have complex queries that are easier to express in SQL or when working with people who are more familiar with SQL.

Combined Example

You can combine both APIs in a single application. For instance, you might load data and perform initial transformations with the DataFrame API, then register the DataFrame as a table and perform complex queries with Spark SQL.

pythonCopy codefrom pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("Combined Example").getOrCreate()

# Create a DataFrame
data = [("James", "Smith", "M", 3000),
        ("Anna", "Rose", "F", 4100),
        ("Robert", "Williams", "M", 6200)]

columns = ["FirstName", "LastName", "Gender", "Salary"]
df = spark.createDataFrame(data, schema=columns)

# DataFrame API
df = df.withColumn("Bonus", df["Salary"] * 0.10)

# Register DataFrame as a temporary SQL table
df.createOrReplaceTempView("employees")

# Spark SQL API
result_df = spark.sql("SELECT FirstName, LastName, Salary + Bonus AS TotalCompensation FROM employees")

# Show result
result_df.show()

# Stop Spark session
spark.stop()

This flexibility allows you to leverage the strengths of both APIs and use the most appropriate tool for each task in your ETL pipeline.

Written by HintsToday Team

Related Posts

Project Alert: Automation in Pyspark

Here is a detailed approach for dividing a monthly PySpark script into multiple code steps. Each step will be saved in the code column of a control DataFrame and executed sequentially. The script will include error handling and pre-checks to ensure source tables are...

Get the latest news

Subscribe to our Newsletter

0 Comments