Creating DataFrames in PySpark
Creating DataFrames in PySpark is essential for processing large-scale data efficiently. PySpark allows DataFrames to be created from various sources, ranging from manual data entry to structured storage systems. Below are different ways to create PySpark DataFrames, along with interesting examples.
1. Creating DataFrames from List of Tuples (Manual Entry)
This is one of the simplest ways to create a PySpark DataFrame manually.
from pyspark.sql import SparkSession
# Initialize Spark Session
spark = SparkSession.builder.appName("CreateDataFrame").getOrCreate()
# List of tuples
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 35)]
# Define column names
columns = ["ID", "Name", "Age"]
# Create DataFrame
df = spark.createDataFrame(data, columns)
df.show()
Use Case: Best for small, manually defined datasets.
2. Creating DataFrames from CSV Files
PySpark can load structured data from CSV files efficiently.
df = spark.read.csv("people.csv", header=True, inferSchema=True)
df.show()
Use Case: Useful when working with structured tabular data stored in CSV format.
3. Creating DataFrames from JSON Files
JSON files are widely used for semi-structured data.
df = spark.read.json("data.json")
df.show()
Use Case: Best for APIs, logs, or nested data.
4. Creating DataFrames from Parquet Files
Parquet is a columnar storage format optimized for big data processing.
df = spark.read.parquet("data.parquet")
df.show()
Use Case: Recommended for fast data processing in Spark.
5. Creating DataFrames from Databases (JDBC Connection)
Connecting to external databases is common for real-world ETL tasks.
df = spark.read \
.format("jdbc") \
.option("url", "jdbc:mysql://localhost:3306/testdb") \
.option("dbtable", "users") \
.option("user", "root") \
.option("password", "password") \
.load()
df.show()
Use Case: Best for integrating with external SQL databases.
6. Creating DataFrames from RDDs
Sometimes, raw RDDs need to be converted into DataFrames.
from pyspark.sql import Row
rdd = spark.sparkContext.parallelize([
Row(ID=1, Name="Alice", Age=25),
Row(ID=2, Name="Bob", Age=30),
Row(ID=3, Name="Charlie", Age=35)
])
df = spark.createDataFrame(rdd)
df.show()
Use Case: Useful for transitioning from RDD-based transformations to DataFrames.
7. Creating DataFrames from Pandas DataFrames
Converting a Pandas DataFrame to PySpark is helpful when scaling operations.
import pandas as pd
# Create Pandas DataFrame
pdf = pd.DataFrame({"ID": [1, 2, 3], "Name": ["Alice", "Bob", "Charlie"], "Age": [25, 30, 35]})
# Convert to PySpark DataFrame
df = spark.createDataFrame(pdf)
df.show()
Use Case: Best for transitioning from local Pandas to distributed PySpark.
8. Creating DataFrames from API Response (Using JSON Parsing)
For web scraping or API data processing, JSON responses can be converted into DataFrames.
import requests
import json
response = requests.get("https://api.example.com/users")
data = json.loads(response.text)
df = spark.createDataFrame(data)
df.show()
Use Case: Useful for processing real-time API data.
9. Creating DataFrames from XML Data
Spark supports XML parsing through third-party libraries.
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([
StructField("ID", StringType(), True),
StructField("Name", StringType(), True),
StructField("Age", StringType(), True)
])
df = spark.read.format("com.databricks.spark.xml") \
.option("rowTag", "person") \
.schema(schema) \
.load("people.xml")
df.show()
Use Case: Useful for handling structured XML-based datasets.
10. Creating DataFrames Using Range for Auto-Generated Data
If you need a sequence of numbers, range()
can quickly create a DataFrame.
df = spark.range(1, 6).toDF("ID")
df.show()
Use Case: Useful for generating test sequences or dummy IDs.
Summary of Methods
Method | Use Case |
---|---|
List of Tuples | Simple and widely used |
CSV Files | Best for tabular structured data |
JSON Files | Ideal for nested and semi-structured data |
Parquet Files | Best for big data performance |
JDBC Databases | Useful for ETL and database integration |
RDD Conversion | Transitioning from RDDs to DataFrames |
Pandas Conversion | Best for scaling Pandas workloads |
API Response (JSON) | Real-time API data processing |
XML Parsing | Handling structured XML data |
Auto-Generated Range | Generating test data quickly |
Creating Dummy DataFrames in PySpark
Creating dummy DataFrames in PySpark is useful for testing, prototyping, and learning. PySpark provides multiple ways to create DataFrames manually, each suited to different scenarios. Below are various methods to create dummy DataFrames with examples.
1. Using List of Tuples (Most Common Method)
This is one of the most common ways to create a PySpark DataFrame.
from pyspark.sql import SparkSession
# Initialize Spark Session
spark = SparkSession.builder.appName("DummyDataFrame").getOrCreate()
# List of tuples
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 35)]
# Define column names
columns = ["ID", "Name", "Age"]
# Create DataFrame
df = spark.createDataFrame(data, columns)
df.show()
Use Case: Best for small, manually defined datasets.
2. Using List of Lists
This method is similar to the list of tuples but uses lists instead.
data = [[1, "Alice", 25], [2, "Bob", 30], [3, "Charlie", 35]]
df = spark.createDataFrame(data, columns)
df.show()
Use Case: When working with mutable lists instead of immutable tuples.
3. Using Dictionary with Row Objects
Using Row
objects allows for named attributes, making it easy to access values.
from pyspark.sql import Row
data = [Row(ID=1, Name="Alice", Age=25),
Row(ID=2, Name="Bob", Age=30),
Row(ID=3, Name="Charlie", Age=35)]
df = spark.createDataFrame(data)
df.show()
Use Case: When you need named fields and structured data representation.
4. Using Dictionary with Explicit Schema
When you want stricter control over column types, defining a schema is a good approach.
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# Define schema
schema = StructType([
StructField("ID", IntegerType(), True),
StructField("Name", StringType(), True),
StructField("Age", IntegerType(), True)
])
# Create DataFrame from a list of dictionaries
data = [{"ID": 1, "Name": "Alice", "Age": 25},
{"ID": 2, "Name": "Bob", "Age": 30},
{"ID": 3, "Name": "Charlie", "Age": 35}]
df = spark.createDataFrame(data, schema=schema)
df.show()
Use Case: Ensures correct data types and improves performance.
5. Using RDD with Row Objects
If you are working with distributed data, creating an RDD first can be beneficial.
rdd = spark.sparkContext.parallelize([
Row(ID=1, Name="Alice", Age=25),
Row(ID=2, Name="Bob", Age=30),
Row(ID=3, Name="Charlie", Age=35)
])
df = spark.createDataFrame(rdd)
df.show()
Use Case: Best when working with large distributed datasets.
6. Using Pandas DataFrame Conversion
If you already have a Pandas DataFrame, you can convert it to a PySpark DataFrame.
import pandas as pd
# Create Pandas DataFrame
pdf = pd.DataFrame({"ID": [1, 2, 3], "Name": ["Alice", "Bob", "Charlie"], "Age": [25, 30, 35]})
# Convert to PySpark DataFrame
df = spark.createDataFrame(pdf)
df.show()
Use Case: When transitioning from Pandas to PySpark.
7. Using range()
for Auto-Generated Data
If you need a sequence of numbers, range()
can quickly create a DataFrame.
df = spark.range(1, 6).toDF("ID")
df.show()
Use Case: When you need an auto-incrementing column.
Summary of Methods
Method | Use Case |
---|---|
List of Tuples | Simple and widely used |
List of Lists | Similar to tuples but mutable |
Dictionary with Row | Allows named attributes |
Dictionary with Schema | Ensures correct data types |
RDD with Row | Works well for distributed data |
Pandas Conversion | Best for small datasets |
range() for Auto-Generated Data | When you need incremental values |
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,when,count
spark=SparkSession.builder.appName("MySparK Learning1").getOrCreate()
import random
import string
random_numbers=[random.randint(1,100) for _ in range(100)]
domain_names=['gmail.com','xyz.com','hotmail.com','wikipedia.org','hintstoday.com']
def generate_random_username(length=8):
#characters = string.ascii_lowercase + string.digits # Lowercase letters and digits
letter_and_digits=string.ascii_letters+string.digits
username=''.join(random.choice(letter_and_digits) for _ in range(length))
return username
def get_username():
username=''.join(random.choice(string.ascii_lowercase+string.digits) for _ in range(10))
return username
def get_first_name():
length=random.randint(5,10)
first_name=''.join(random.choice(string.ascii_lowercase) for _ in range(length))
return first_name
email_adresses=[]
for n in range(5):
user_name=generate_random_username()
chosen_names=random.choice(domain_names)
email_add=f"{user_name}@{chosen_names}"
email_adresses.append(email_add)
print(email_adresses)
data=[(i+1,(datetime.now()).strftime('%y-%m-%d'), random.randint(1000,100000),get_username() + "@" + random.choice(domain_names), get_first_name()+ ' '+ get_first_name()) for i in range(100)]
columns=['Sl_No',"Date_Val","Ran_id","email","Name"]
df=spark.createDataFrame(data,columns)
df.show()
data=((1,),(2,),(3,))
columns=['No']
df=spark.createDataFrame(data,columns)
df.show()
df1=[(1,)]
col=['r1']
df2=spark.createDataFrame(df1,col)
df2.show()
from pyspark.sql.functions import upper
str(df.columns).split()
''.join(str(df.columns).split())
' '.join(df.columns)
df.toDF(*[column.replace('_','X')+'_v1' for column in df.columns])
df1=df.select('b').filter(col('b')%2==0 )
df2=df.join(df1.select('b').filter(col('b')%2==0 ), 'b')
df2.show()
from pyspark.sql import Row
rdd = spark.sparkContext.parallelize([
Row(ID=1, Name="Alice", Age=25),
Row(ID=2, Name="Bob", Age=30),
Row(ID=3, Name="Charlie", Age=35)
])
df = spark.createDataFrame(rdd)
df.show()
import random
import string
username = ''.join(list(random.choice(string.ascii_lowercase + string.digits) for _ in range(10)))
def get_address():
Flat_Name=''.join([random.choice(string.ascii_letters + string.digits) for _ in range(5)])
username = ''.join(list(random.choice(string.ascii_lowercase + string.digits) for _ in range(10)))
street_name=''.join(random.choice(string.ascii_letters + string.digits) for _ in range(5))
City=''.join(random.choice(string.ascii_letters + string.digits) for _ in range(7))
Area_Name=''.join(random.choice(string.ascii_letters + string.digits) for _ in range(5))
pincode=random.randint(800000,900000)
Country='India'
return f"{username} {Flat_Name} , {street_name} , {City} , {pincode} , {Country}"
print(get_address())
from pyspark.sql import Row
rdd=spark.sparkContext.parallelize(Row(Sl_No=i+1,nickname=''.join(list(random.choice(string.ascii_lowercase + string.digits) for _ in range(10))) , Address=get_address()) for i in range(100))
df_data= spark.createDataFrame(rdd)
df_data.show()
Discover more from HintsToday
Subscribe to get the latest posts sent to your email.