Spark Dataframe Intro
Introduction to Spark DataFrames
In this section, we will begin exploring Spark DataFrames. Up to this point, you have likely learned about what Spark is and how it compares to MapReduce. We have also covered many interview questions related to why Spark uses lazy transformations versus actions. Additionally, we have examined low-level APIs in detail, specifically your RDDs (Resilient Distributed Datasets).
Understanding RDDs is very important as it sets the foundation for when you start learning about DataFrames and Spark SQL. The higher-level APIs, such as DataFrames, depend on and work with the lower-level APIs.
Overview of Spark DataFrames
In this section, we cover Spark DataFrames in great detail, including how to read and write DataFrames. This knowledge is essential for writing your Spark code effectively. Although RDDs are not used as much, understanding them remains very important to build a clear foundation.
By establishing a solid base, you will have a much better learning journey compared to jumping directly from topic to topic without foundational knowledge.
Handling Data Types and Schemas
We demonstrate how to handle different data types, how to read data, and how to enforce schemas within Spark DataFrames. This schema enforcement was something missing in RDDs, making DataFrames more powerful and structured.
If you have any doubts, please feel free to ask them in the Q&A section, and we will be there to help you. Happy learning, and let’s meet in the next video.
Key Takeaways
- Spark DataFrames build upon the foundational knowledge of Spark and RDDs.
- Understanding Spark’s lazy transformations and actions is crucial.
- DataFrames provide higher-level APIs for reading, writing, and managing data with enforced schemas.
- A solid grasp of lower-level APIs enhances the learning experience with Spark SQL and DataFrames.
DataFrames in Spark
Introduction to DataFrames in Spark
In this Section, we will continue our discussion and learn more in depth about DataFrames in Spark. Up until now, you have an idea about the higher-level APIs, primarily DataFrames and Spark SQL. We have also covered Spark RDDs in considerable depth. The reason for this detailed focus is that Spark is a very important topic. Mastering it will benefit your career for the next three to five years. Therefore, it is essential to understand every topic thoroughly and try out all examples yourself.
What is a DataFrame?
A DataFrame in Spark is a distributed collection of data organized into named columns. This concept is similar to a table in a relational database or a spreadsheet. For example, in a relational database, data is organized with associated columns. Similarly, a spreadsheet contains data with columns. In simple terms, this is what a DataFrame represents. It is an abstraction built on top of RDDs with added schema information, which is metadata.
Understanding Metadata
Metadata is data about data. For instance, consider an image as data. The image itself is the data, while its size, location, and creation time are metadata. On any machine, commands like ls -ltr
in Linux or viewing file properties in Windows display such metadata. Similarly, for a table stored as CSV, text, or JSON files, metadata includes column names and data types, which form the schema.
Important Distinction: Spark DataFrame vs Pandas DataFrame
It is crucial to remember that a Spark DataFrame is not the same as a pandas DataFrame. Although they share the name “DataFrame,” they are different. Spark DataFrames are distributed, whereas pandas DataFrames are not. Additionally, many functions supported by pandas DataFrames do not apply to Spark DataFrames. When generating code for Spark DataFrames, ensure you specify that you are using PySpark; otherwise, the code might be generated for pandas, which would be incorrect.
Advantages of Using Spark DataFrames
- Ease of Use: Spark DataFrames simplify data manipulation. Unlike RDDs, you do not need to write map operations explicitly; you can directly access columns.
- Schema Awareness: Spark DataFrames are schema-aware, which facilitates handling different data types such as float, integer, or boolean columns easily.
- Performance: Having metadata about the data enables Spark’s Catalyst optimizer to ensure efficient performance.
- Support for Multiple File Types: Spark DataFrames support various file formats commonly used in big data, including CSV, JSON, Parquet, and Avro.
Creating a DataFrame from a Local Python List
Let’s see a practical example of creating a DataFrame from a local Python list. Suppose we have a Python list representing data and another list representing column names. We can create a DataFrame using the spark.createDataFrame()
method by providing the data and the column names. It is important to ensure that the number of columns matches the data structure.
data = [
(1, "James", "Smith", "36636", "M", 3000),
(2, "Michael", "Rose", "40288", "M", 4000),
(3, "Robert", "Williams", "42114", "M", 4000),
(4, "Maria", "Jones", "39192", "F", 4000),
(5, "Jen", "Brown", "34561", "F", 4000)
]
columns = ["id", "firstname", "lastname", "dob", "gender", "salary"]
df = spark.createDataFrame(data, columns)
Displaying DataFrame Content
To view the contents of the DataFrame, you can use the show()
method. This displays the data in a tabular format, which was not possible with RDDs due to their lack of structure.
df.show()
Selecting Columns from a DataFrame
You can select specific columns from the DataFrame using the select()
method and then display them using show()
. For example, to select the firstname
column:
df.select("firstname").show()
Reading Data from External Sources
While reading data from a local Python list is straightforward, in real-world scenarios, data is often stored in large files on distributed storage systems like HDFS. These files might be in CSV or other formats. In upcoming videos, we will learn the syntax and methods to read such files into Spark DataFrames, ensuring proper data ingestion.
Key Takeaways
- Spark DataFrames are distributed collections of data organized into named columns, similar to tables in relational databases or spreadsheets.
- Spark DataFrames are schema-aware and include metadata, which improves performance and ease of use.
- Spark DataFrames differ significantly from pandas DataFrames; Spark DataFrames are distributed and support different functions.
- Spark DataFrames support multiple file formats such as CSV, JSON, Parquet, and Avro, facilitating big data processing.
DataFrame – Reading from HDFS
Introduction
In this Section, we move forward to reading data from HDFS as a DataFrame. Previously, we learned how to read a Python list as a DataFrame, but in big data scenarios, we typically read from files such as CSV or JSON formats.
Setting Up the Data Directory in HDFS
We begin by creating a directory in HDFS to store data for future exercises. The command used is:
hadoop fs -mkdir data
This creates a folder named data
in the home directory, making it easily accessible. You can also access the HDFS directory via the web interface by navigating to the HDFS NameNode and browsing the file system.
Uploading Files to HDFS
Direct file uploads via the web interface may not be possible due to security restrictions. Instead, files are typically uploaded using command line tools or the Jupyter Lab interface. In Jupyter Lab, you can create a folder on the local disk and upload files there.
ls
After creating a folder named data_for_HDFS
, you can upload your data file (e.g., first_100_customers.csv
) into this folder.
Moving Data from Local Disk to HDFS
Once the file is uploaded to the local disk on the master machine, use the following command to move it to HDFS:
hadoop fs -put data_for_HDFS/first_100_customers.csv data/
This command transfers the file from the master machine’s local disk to the data
directory in HDFS. You can verify the upload and check replication and block size via the HDFS UI.
Renaming Files in HDFS
To rename a file in HDFS for easier reference, use the following command:
hadoop fs -mv data/first_100_customers.csv data/customers_100.csv
This renames the file within the HDFS data
directory.
Reading Data from HDFS into a Spark DataFrame
To read a file from HDFS into a Spark DataFrame, use the following approach. Specify the format, header, schema inference, and file path:
df_2 = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load('data/customers_100.csv')
Here, format('csv')
specifies the file type, option('header', 'true')
indicates that the first row contains column names, and option('inferSchema', 'true')
lets Spark automatically infer data types for each column. The load()
method specifies the file path.
Displaying Data and Schema
To display the loaded data and the inferred schema, use the following commands:
df_2.show()
df_2.printSchema()
The show()
method displays the data in a tabular format, while printSchema()
shows the inferred data types for each column. Note that schema inference may not always be accurate, so it is recommended to provide the schema explicitly, especially in production environments.
DataFrame Operations: Filtering and Selecting Columns
You can perform operations such as filtering active customers and selecting specific columns. For example, to filter active customers:
active_customers = df_2.filter(df_2.is_active == True)
To select specific columns such as customer ID, name, and city:
selected_columns = df_2.select('customer_id', 'name', 'city')
Remember, Spark DataFrame transformations are lazy; actions like show()
trigger the computation.
Conclusion
This video provided a basic introduction to reading data from HDFS into Spark DataFrames, performing simple operations, and understanding the importance of schema management. In future lessons, we will explore DataFrame operations in greater depth.
Key Takeaways
- Demonstrated how to upload and manage files in HDFS using both command line and Jupyter Lab.
- Showed how to read CSV data from HDFS into a Spark DataFrame with options for header and schema inference.
- Explained the importance of explicitly defining schema in production environments for reliability.
- Illustrated lazy evaluation in Spark DataFrames and basic DataFrame operations such as filtering and selecting columns.
Spark Read – Transformation or Action
Introduction
In this video, we continue our discussion by addressing a very important interview question: When reading data from HDFS or any other location in Spark, is the read operation a transformation or an action?
Previously, we saw how to read data using syntax that specifies the format, various options, and the file location. Now, we want to understand what exactly happens when we try to read data. Does Spark immediately access the location and read the data, or does it wait until an action like collect
or show
is called?
To explore this, I created a new notebook and started a new Spark session. I also opened the Spark UI to observe the live job execution. Initially, the UI is empty since no operations have been performed yet.
I prepared several CSV files in HDFS under the data directory: customers.csv
(~300 MB), customers100.csv
(first 100 rows), and others of varying sizes including a 500 MB file. For this exercise, I chose the 500 MB file to better observe the behavior, as reading a small file like 5 KB is trivial and less informative.
When reading data, we provide options such as header
and inferSchema
. These options determine how Spark processes the read operation. This is an important interview question and a fundamental step in any Spark project, so understanding it thoroughly is crucial.
Case 1: header
= true, inferSchema
= false
First, I read the data with header
set to true and inferSchema
set to false. Running this triggered a single job with one stage and one task, as observed in the Spark UI. This means Spark only read a single partition to infer the header line.
This behavior occurs because Spark runs a lightweight job to read just the first line (header) using a collect
with a limit of one. Hence, only one partition is read, resulting in a single task. This is an eager evaluation but limited to the header only.
Case 2: header
= true, inferSchema
= true
Next, I set both header
and inferSchema
to true. This time, the Spark UI showed two jobs: one for reading the header and another for scanning the entire dataset to infer the schema. The dataset was divided into five partitions, each processed as a separate task.
This means that when inferring the schema, Spark performs a full scan of the dataset, which can be expensive for large files. Therefore, inferring schema on very large datasets can significantly increase job execution time.
Case 3: Providing Explicit Schema
To avoid the overhead of schema inference, you can provide the schema explicitly. I defined a schema using StructType
and StructField
from pyspark.sql.types
and passed it to the schema
option while reading the CSV file.
When providing the schema explicitly, Spark does not trigger any job during the read operation. The schema validation is deferred until an action is performed on the DataFrame. This approach improves performance by avoiding unnecessary data scans.
Summary
- Without schema inference and with header true, Spark triggers a lightweight job to read only the first line to determine column names.
- With schema inference enabled, Spark triggers two jobs: one for reading the header and another for scanning the entire dataset to infer the schema.
- Providing an explicit schema avoids any upfront job execution during the read operation; schema validation occurs only when an action is performed.
This understanding is essential for optimizing Spark jobs and is a common interview question.
Finally, when an action such as df.count()
is called, Spark executes the job, reading all partitions and performing the computation. The Spark UI shows the corresponding stages and tasks for this action.
Understanding the Spark UI and job execution details is crucial for writing efficient Spark code, especially when working in top-tier companies. This knowledge helps in optimizing performance and avoiding unnecessary computations.
Key Takeaways
- Reading data in Spark can be either a transformation or an action depending on options like
header
andinferSchema
. - When
inferSchema
is false andheader
is true, Spark triggers a lightweight job reading only the first partition to infer column names. - When
inferSchema
is true, Spark triggers two jobs: one to read the header and another to scan the entire dataset to infer the schema. - Providing an explicit schema avoids any upfront job execution during data reading, deferring validation until an action is performed.
Schema Enforcement in Spark
In this lecture, we explore how to enforce the schema in a Spark DataFrame, discussing the challenges of schema inference and demonstrating methods to explicitly define schema for reliable data processing.
Challenges with Schema Inference
There are several challenges when inferring schema in Spark:
- Incorrect Inference: For example, if you have a number starting from 1 to 100, but tomorrow it could be a price with a much higher value. During inference, it may be inferred as an integer, but you want it to be a long for future values. Spark may infer incorrect data types and often defaults to string type when unsure.
- Performance Issues: When inferring schema, Spark scans the entire dataset. This can cause performance issues, especially with large files in the big data world.
- Not Production Ready: Schema inference can lead to inconsistency and runtime errors. Code that works on a small file may fail in production with larger or different data. Therefore, it is recommended to provide and enforce your own schema when reading data as a DataFrame in Spark.
Reading Data with Schema Inference
You can read data with schema inference using the following code:
df = spark.read.format('csv').option('header', True).option('inferSchema', True).load('path/to/file.csv')
However, this approach scans the entire data and can lead to incorrect type inference and inconsistency.
Enforcing Schema with StructType
The first way to enforce schema is by using StructType
. In PySpark, you can import the necessary types and define your schema explicitly.
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType
schema = StructType([
StructField('customer_id', IntegerType(), False),
StructField('name', StringType(), True),
StructField('city', StringType(), True),
StructField('state', StringType(), True),
StructField('country', StringType(), True),
StructField('registration_date', StringType(), True),
StructField('is_active', BooleanType(), True)
])
You can then use this schema when reading your data:
df = spark.read.format('csv').option('header', True).schema(schema).load('path/to/file.csv')
After reading the data, you can display it and print the schema:
df.show()
df.printSchema()
Note on Nullable Property
Even if you set nullable=False
in your schema, Spark may override this based on the actual data in the source file, especially when reading from CSV or external sources. The nullable property is determined by the data, not just the schema you provide. To ensure non-nullability, you must check after reading the data.
Enforcing Schema with DDL String
Another way to enforce schema is by using a DDL (Data Definition Language) string. This method is straightforward and similar to SQL.
ddl_schema = 'customer_id INT, name STRING, city STRING, state STRING, country STRING, registration_date STRING, is_active BOOLEAN'
You can use this DDL schema when reading your data:
df_ddl = spark.read.format('csv').option('header', True).schema(ddl_schema).load('path/to/file.csv')
Display the data and print the schema as before:
df_ddl.show()
df_ddl.printSchema()
Handling Schema Mismatches
If you provide a type in the schema that does not match the data, Spark will read those values as null. Similarly, if you do not provide the correct number of columns in your schema, Spark may ignore missing columns.
Practical Advice and Homework
It is important to practice enforcing schema on different datasets. Try writing and applying schemas for other files such as orders or payments. Do not just watch the videos—practice by writing code yourself to build confidence and retain the concepts.
Key Takeaways
- Schema inference in Spark can lead to incorrect data types, performance issues, and production inconsistencies.
- Enforcing schema using StructType or DDL string ensures data consistency and better performance.
- Spark may override the nullable property based on the actual data in external sources.
- Always validate schema enforcement by checking the schema after reading the data.
Read Modes in Spark
Introduction to Read Modes in Apache Spark
In this Section, we will explore the different read modes available in Apache Spark and understand their significance when reading data. Handling corrupt or malformed data records is crucial to ensure data quality. Spark provides three read modes to address this issue.
Previously, in the schema enforcement Section, we saw cases where columns were missing or had incorrect types. For example, if a boolean column like is_active
was missing or if a column like name
was incorrectly typed as an integer or boolean, Spark read those values as null. This behavior can cause problems in production use cases where columns might be misinterpreted due to corrupt or malformed data.
In production, it is unacceptable to have columns misinterpreted because of corrupt or malformed data. For instance, if a mistake causes the name
field to be null, and your logic depends on that field, it can lead to issues. Such problems often cause high null counts in applications due to malformed records.
Understanding how Spark handles malformed records is a good practice and makes it easier to manage data quality. We will use a sample CSV file named customers_with_errors.csv
which contains data with some errors such as invalid dates or incorrect boolean values in the is_active
column.
Fail Fast Mode
The first read mode is fail fast. In this mode, Spark stops reading immediately when it encounters a malformed record. This ensures strict data validation and is useful when you want to enforce schema correctness strictly.
Here is an example of reading data in fail fast mode with a predefined schema and header option enabled:
file_path = "customers_with_errors.csv"
schema = ... # predefined schema for the data
DF_fail_fast = spark.read.option("header", "true") \
.schema(schema) \
.option("mode", "failfast") \
.format("csv") \
.load(file_path)
DF_fail_fast.show()
When running this code, Spark throws an error indicating malformed records were detected. For example, it reports that the value “invalid date” cannot be cast to a date type. This strict validation helps catch data issues early.
Permissive Mode
The second mode is permissive, which is the default mode in Spark. In permissive mode, malformed records are allowed and processed. Invalid values are set to null, allowing the data to be inspected without stopping execution.
Here is how you can read data in permissive mode:
DF_permissive = spark.read.option("header", "true") \
.schema(schema) \
.option("mode", "permissive") \
.format("csv") \
.load(file_path)
DF_permissive.show()
In this mode, invalid records are read with incorrect values replaced by nulls. This is useful when you want to inspect and fix bad data later without interrupting your data processing pipeline.
Drop Malformed Mode
The third mode is drop malformed. As the name suggests, this mode removes malformed records silently. It does not raise errors or warnings, and no trace of dropped records is provided.
Here is an example of reading data in drop malformed mode:
DF_drop_malformed = spark.read.option("header", "true") \
.schema(schema) \
.option("mode", "dropmalformed") \
.format("csv") \
.load(file_path)
DF_drop_malformed.show()
In this mode, malformed records are removed from the dataset without any notification. This mode is useful when you want to proceed without worrying about incorrect data but should be used with caution since data loss can occur silently.
It is important to note that when a column is defined as a string type in the schema, Spark assumes that null values are acceptable. However, for other types like boolean, nulls indicate missing or malformed data. This distinction is important when designing data validation logic.
Use Cases for Read Modes
- Fail Fast: Use when strict data integrity is required, typically at the final stages of a project.
- Permissive: Use when you want to inspect and fix bad data later without stopping the data processing.
- Drop Malformed: Use when you want to ignore incorrect data and proceed without errors or warnings.
Understanding these modes is essential for managing data quality in big data applications. Spark’s built-in support for these modes helps handle corrupt or malformed data efficiently.
Conclusion
In this session, we explored the three read modes in Apache Spark: fail fast, permissive, and drop malformed. Each mode offers different strategies for handling malformed records, enabling you to choose the best approach based on your data quality requirements and project stage.
Key Takeaways
- Apache Spark provides three read modes to handle corrupt or malformed data: fail fast, permissive, and drop malformed.
- The fail fast mode stops reading immediately upon encountering malformed records, ensuring strict data validation.
- The permissive mode allows reading malformed records by setting invalid values to null, enabling inspection without stopping execution.
- The drop malformed mode silently removes malformed records without errors or warnings, useful when incorrect data can be ignored.
Write Operation in Spark
Introduction to Write Operation in Spark
In this Section, we continue our discussion by learning about the write operation in Spark. Since we have understood transformations, actions, and the read operation in depth, the next question is: What do you think the write operation is? Is it an action or a transformation?
The answer seems obvious. The write operation assumes that we will be getting a file. Writing means saving something, similar to concepts from pandas or databases. To write a file, you must have completed all processing and transformations. You should have the output ready before writing the file. Therefore, the write operation is considered an action.
Some learners get confused because the write operation triggers computation and produces a result on external storage. We will now demonstrate this by reading some data or creating data in code and saving it directly to disk. Remember, the write operation is an action. If performed after many transformations, it will trigger all those transformations.
Write operations are mainly used to store your final data after all transformations and efficient parallelization in Spark. This approach is similar to other data processing frameworks like pandas, where you process data and then save the result.
Creating a DataFrame and Writing to Disk
Let’s create a new Spark session and a DataFrame. We import SparkSession from pyspark.sql and initialize a Spark session. Then, we create a DataFrame by providing data and column names.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("WriteInSpark").getOrCreate()
data = [(1, "John"), (2, "Jane"), (3, "Joe")]
columns = ["CustomerID", "Name"]
DF = spark.createDataFrame(data, columns)
Now, to save this DataFrame to disk, we use the write method. We specify the format as CSV, set the header option to true, and provide the path to save the file.
DF.write.format("csv").option("header", "true").save("data/output.csv")
This triggers a Spark job that executes all transformations and writes the output. The data is saved in a directory named output.csv containing multiple part files corresponding to partitions.
Understanding Partitions and Output Files
By default, Spark writes data in multiple partitions. You can check the number of partitions of the DataFrame’s RDD using numPartitions
.
print(DF.rdd.getNumPartitions())
Since the DataFrame has two partitions, the output directory contains two part files. To produce a single output file, you can repartition the DataFrame to one partition before writing.
DF.repartition(1).write.format("csv").option("header", "true").save("data/output_one_part.csv")
This creates a single part file in the output directory. Additionally, Spark creates a _SUCCESS file indicating the write operation was successful. This file is zero bytes but important for pipelines to verify successful writes.
Writing Data in Different Formats and Using Options
Spark supports writing data in various formats such as JSON and Parquet. You can also specify options like delimiters for CSV files.
DF.repartition(1).write.format("csv").option("header", "true").option("sep", ";").save("data/output_delimited.csv")
Note that the option for delimiter in CSV is “sep”. Using the correct option name is important to avoid errors.
Handling Write Conflicts and Modes
If the output directory already exists, Spark will throw an error. In the next video, we will explore different save modes to handle such cases, similar to how read modes work.
This concludes our overview of the write operation in Spark. We have seen that it is an action that triggers computation and saves the final data to disk, with options to control format, partitioning, and output behavior.
Key Takeaways
- The write operation in Spark is an action that triggers computation and saves the final processed data to disk.
- Writing data in Spark requires all transformations to be completed before the action executes.
- Spark writes output in partitions, which can result in multiple files; repartitioning to one partition can produce a single output file.
- The presence of a _SUCCESS file indicates a successful write operation in Spark.
Spark DataFrame Operations: Practical Usage and Common Tasks
Introduction to Spark DataFrame Operations
In this session, we move forward to discuss Spark DataFrame operations. We have already seen how to read and write DataFrames with different options and modes. We have explored the various options available when reading DataFrames and the modes for both reading and writing, including writing in different formats.
Approach to Learning Spark
The intention is to ensure that you are aware of every way to use Spark. Spark is a vast topic with many backend optimizations, but the focus here is on practical usage first. The idea is to make you comfortable with Spark’s practical aspects before diving deep into theory and optimization.
Project Flow in Spark
A typical project flow involves reading data, processing it (using Spark), and then outputting the results. The data can be complex and come from various sources. Spark allows you to write code as if it is running on a single machine, while it actually runs in a distributed manner. Once results are ready, outputs can be saved anywhere, including cloud environments.
Basic DataFrame Operations
Let us look at some basic DataFrame operations. We start by importing the necessary modules and creating a Spark session. We define the schema and create a DataFrame. You can use df.show()
to display the data, and df.show(2)
to display the top two rows.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
spark = SparkSession.builder.appName('dataframe_ops').getOrCreate()
schema = StructType([
StructField('id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
data = [
(1, 'Alice', 25),
(2, 'Bob', 30),
(3, 'Charlie', 35)
]
df = spark.createDataFrame(data, schema)
df.show()
df.show(2)
Exploring DataFrame Schema and Columns
You can inspect the schema of your DataFrame using df.printSchema()
. To view the column names, use df.columns
. For a statistical summary, use df.describe().show()
.
df.printSchema()
df.columns
df.describe().show()
Selecting and Filtering Data
To select specific columns, use the select
method. For example, to select the name
and age
columns, use df.select('name', 'age')
. Remember, transformations like select
do not display results until you use an action like show()
.
df.select('name', 'age').show()
To filter data, you can use the filter
or where
methods. For example, to filter rows where age is greater than 25:
df.filter(df.age > 25).show()
Similarly, to filter rows where the name is ‘Alice’:
df.where(df.name == 'Alice').show()
Reference to Official Documentation
All these functions are available in the official Spark documentation. You do not need to memorize them. With modern tools like AI and StackOverflow, you can easily look up any function as needed.
Common DataFrame Operations
Many operations in Spark DataFrames are similar to those in Pandas, R, or even Excel. Functions like join
, head
, tail
, and fillna
are basic and commonly used in data analysis frameworks.
Distinct and Sorting Operations
To get distinct rows, use the distinct
method. For sorting, use orderBy
. You can sort in ascending or descending order.
df.distinct().show()
df.orderBy('age').show()
df.orderBy(df.age.desc()).show()
Adding and Dropping Columns
To add a new column, use the withColumn
method. For example, to add a new column new_age
which is age + 5
:
df = df.withColumn('new_age', df.age + 5)
df.show()
To drop a column, use the drop
method. Note that changes are not in-place; you must assign the result back to the DataFrame.
df = df.drop('age')
df.show()
Aggregation and Grouping
You can perform aggregation and grouping using groupBy
and aggregation functions like count
and avg
. For example, to group by age and count:
df.groupBy('new_age').count().show()
To aggregate the entire DataFrame, you can use the agg
method. For example, to get the average of new_age
:
from pyspark.sql import functions as F
df.agg(F.avg('new_age')).show()
Joining DataFrames
Joins are an important operation in Spark. You can join two DataFrames on a common column. For example, create a second DataFrame and perform an inner join on the id
column.
data2 = [
(1, 'US'),
(2, 'UK'),
(3, 'India')
]
schema2 = StructType([
StructField('id', IntegerType(), True),
StructField('country', StringType(), True)
])
df2 = spark.createDataFrame(data2, schema2)
df2.show()
final_df = df.join(df2, on='id')
final_df.show()
Remember, calling show()
returns None
, so do not assign the result of show()
to a variable. Always assign the result of transformations, not actions.
Stopping the Spark Session
Once you are done, it is good practice to stop the Spark session.
spark.stop()
Conclusion
The major takeaway from this session is to focus on practical exposure first. Using Spark is not difficult, and you should not be discouraged by its complexity. As we move forward, we will cover handling different data types and explore higher-level APIs like Spark SQL and Spark Table. This will help you become comfortable with writing code in Spark and optimizing it for big data projects.
Key Takeaways
- Spark DataFrame operations are straightforward and similar to other data analysis frameworks like Pandas and Excel.
- Practical exposure to Spark operations is prioritized before delving into theory and optimization.
- Many DataFrame operations such as select, filter, groupBy, and join are intuitive and do not require memorization.
- Saving transformations in Spark is not in-place; assignments must be explicit to persist changes.
Handling Data Types in PySpark
Introduction
In this section, we will move forward and see how to handle different data types in Spark. When working with any particular data, different data types in Spark might require different ways of handling.
We have seen how we have integers, strings, booleans, and dates. Of course, we can have float and other data types as well. The idea is that they might require different ways of handling. For that, we will see how we can do the same thing in Spark. In some cases, it might be complex, but you will get the basic idea.
Handling String Data
In strings, we can handle inconsistent cases, trimming of spaces, or invalid encodings. For example, if the name is something like ” Mayank “, we can use the trim function to clean it.
Handling Numeric Data
For numbers, there can be conversion issues, dealing with nulls, and handling precision for float or doubles. These are common issues. Dates are important as well, for example, parsing formats, handling invalid dates, and timezone mismatches. In booleans, you can get values like true/false, 1/0, or yes/no.
Data Preparation in PySpark
We will now go to our notebook and understand this in a practical manner. I have created a new notebook. Here, if you see, we import SparkSession and use the SparkSession builder.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Using Sample Data
The data is not being used from a file so that anyone facing issues with the environment can copy and practice locally or on Google Colab. The size of the data does not matter much for these examples. Once you have the data in a DataFrame, you can save it as a CSV. There are datasets of multiple sizes, such as 1MB, 5MB, 10MB, etc.
Data Size Considerations
With any operation, always try to understand if what you are doing is dependent on the data size. For example, join operations can be affected by data size. Joining 10 rows is fast, but joining 1 million rows can cause out-of-memory errors. Joins are wide operations and can take a lot of memory. In contrast, filter is a narrow operation and can be performed easily regardless of data size.
Practical Advice on Data Size
Do not unnecessarily load large files, such as 500MB or 1TB, for simple operations. Use smaller files for practice and only scale up when necessary. This approach helps you avoid unnecessary complexity and errors, and keeps your learning efficient.
Loading and Inspecting Data
After loading the data, you may find some invalid values such as None. Since no schema is provided, everything will be inferred as string. You can check the schema using the following function:
df.printSchema()
The first column is long, and everything else is string as inferred by Spark. You can provide your own schema as well. For example, in the date column, providing a schema at the start can cause parse issues and result in nulls. You can either read everything as string and cast later, or handle it at the start, but be aware of potential data loss.
Handling Integer Columns
To access a column, use df.id
. If the column has special characters or spaces, you can still use df.id
. For example, to filter rows where id > 3:
df.filter(df.id > 3).show()
You can also create new columns, such as doubling the id:
df.withColumn('id_double', df.id * 2).show()
If the id column is read as long, you can cast it back to integer as follows:
from pyspark.sql.types import IntegerType
df = df.withColumn('id', df['id'].cast(IntegerType()))
Understanding PySpark Modules
The col
function is used to select a particular column and is present in pyspark.sql.functions
. The type classes like IntegerType
are present in pyspark.sql.types
. These distinctions are important to understand.
Handling String Columns
Suppose you want everything in uppercase. You can use the upper
function from pyspark.sql.functions
to create a new column:
from pyspark.sql.functions import upper
df = df.withColumn('name_upper', upper(df['name']))
Similarly, you can use lower
, trim
, startswith
, and endswith
functions. For example, to filter cities that start with ‘B’:
df.filter(df.city.startswith('B')).show()
You can also create a lowercase version of a column:
from pyspark.sql.functions import lower
df = df.withColumn('name_lower', lower(df['name']))
Handling Float Columns
The amount column is currently a string but should be a float. You can cast it as follows:
from pyspark.sql.types import FloatType
df = df.withColumn('amount', df['amount'].cast(FloatType()))
You can check the schema to confirm the change:
df.printSchema()
If there are null or NaN values, you can fill them using the fillna
function:
df_filled = df.fillna({'amount': 0})
Handling Date Columns
Handling date columns can be more complex. You may encounter invalid dates. It is important to practice handling integer, float, and string columns first. In the next video, we will cover date columns in detail.
Additional Resources
An additional file has been added with more detailed functions and schema information to help you better navigate and learn about data types.
Key Takeaways
- Different data types in PySpark require specific handling methods, such as casting and using appropriate functions.
- Always consider the size of your data when performing operations, as some operations like joins are affected by data size.
- Use PySpark’s schema inference or provide your own schema, but be aware of potential data loss or parsing issues.
- Practice handling integer, float, and string columns before moving on to more complex types like dates.
Handling Date Type in Spark
Introduction to Handling Date Types in Spark
In this session, we continue working with data, focusing on handling date types in Spark. Previously, we covered integer, string, and float data types. Now, we will discuss the intricacies of managing date columns, which often require special treatment.
Preparing the Data and Environment
First, a Spark session is created. The data to be used is prepared and saved as a CSV file. The file is then transferred to HDFS using Hadoop commands. This ensures the data is accessible for processing in Spark.
Exploring the Data Structure
The dataset contains columns such as id
, date_iso
, and date
, each possibly in different date formats. These formats include ISO (YYYY-MM-DD), slashes, hyphens, and full timestamps. Initially, all columns except id
are read as strings.
Spark’s Date Recognition and Approaches
Spark recognizes dates only in the ISO format (yyyy-MM-dd
). There are two main ways to handle date columns:
- Convert every date to the ISO format before reading.
- Read all date columns as strings and handle the conversion later.
The second approach is often preferred, as preprocessing all dates before reading may not always be feasible.
Reading Data with a DDL Schema
A DDL schema is defined where all date columns are initially set as strings. The data is read from the CSV file using this schema. At this stage, all date-related columns remain as strings, ensuring no data is lost due to format issues.
Attempting to Read Dates Directly
When attempting to read date columns directly as date types, Spark only successfully parses columns in the ISO format. Other formats result in null values. This highlights the importance of either standardizing date formats or handling conversion after reading as strings.
Handling Timestamps
If a column contains a full timestamp in ISO format, Spark can read it as a timestamp type. However, if the column is set as a date type, the time information is lost. Using the correct type ensures all relevant information is retained.
Converting String Columns to Date Types
To handle various date formats, string columns are converted to date types using the to_date
function from pyspark.sql.functions
. The correct format string must be specified for each column.
from pyspark.sql.functions import to_date
df = df.withColumn('parsed_date_iso', to_date(df['date_iso'], 'yyyy-MM-dd'))
df = df.withColumn('parsed_date_dmy', to_date(df['date_dmy'], 'dd-MM-yyyy'))
df = df.withColumn('parsed_date_mdy', to_date(df['date_mdy'], 'MM-dd-yyyy'))
For columns containing timestamps, use the to_timestamp
function to convert string columns to timestamp types. This allows Spark to understand and process the time component.
from pyspark.sql.functions import to_timestamp
df = df.withColumn('parsed_timestamp', to_timestamp(df['timestamp']))
Extracting Date and Time Components
Once columns are properly converted to date or timestamp types, Spark functions can extract components such as year, month, day, hour, and minute.
from pyspark.sql.functions import year, month, dayofmonth, hour, minute
df = df.withColumn('year', year(df['parsed_timestamp']))
df = df.withColumn('month', month(df['parsed_timestamp']))
df = df.withColumn('day', dayofmonth(df['parsed_timestamp']))
df = df.withColumn('hour', hour(df['parsed_timestamp']))
df = df.withColumn('minute', minute(df['parsed_timestamp']))
Calculating Date Differences
The date_diff
function computes the number of days between two date columns. This is useful for calculating durations or intervals between events.
from pyspark.sql.functions import datediff
df = df.withColumn('date_difference', datediff(df['parsed_date_iso'], df['parsed_date_mdy']))
Conclusion and Best Practices
Handling date and timestamp columns in Spark requires careful attention to formats and types. Always ensure that date columns are either standardized before reading or properly parsed after reading as strings. Converting to the appropriate type enables powerful date and time operations, which are essential in many data processing tasks.
Key Takeaways
- Spark requires date columns to be in a specific ISO format to recognize them as dates or timestamps.
- There are two main approaches: convert all dates to ISO format before reading, or read as string and parse later.
- PySpark provides functions like
to_date
,to_timestamp
, anddate_diff
to handle various date and timestamp formats. - Converting date strings to proper date/timestamp types enables extraction of components like year, month, day, hour, and minute.