Pyspark, Spark SQL and Python Pandas- Collection of Various Useful cheatsheets, cheatcodes for revising


— by

Comparative overview of partitions, bucketing, segmentation, and broadcasting in PySpark, Spark SQL, and Hive QL in tabular form, along with examples

Here’s a comparative overview of partitions, bucketing, segmentation, and broadcasting in PySpark, Spark SQL, and Hive QL in tabular form, along with examples:

ConceptPySparkSpark SQLHive QL
Partitionsdf.repartition(numPartitions, "column") creates partitions based on specified column.CREATE TABLE table_name PARTITIONED BY (col1 STRING) allows data to be organized by partition.ALTER TABLE table_name ADD PARTITION (col1='value') adds a new partition to an existing table.
Bucketingdf.write.bucketBy(numBuckets, "column").saveAsTable("table_name") for distributing data.CREATE TABLE table_name CLUSTERED BY (col1) INTO numBuckets BUCKETS for bucketing data in a table.CREATE TABLE table_name CLUSTERED BY (col1) INTO numBuckets BUCKETS for bucketing strategy.
SegmentationSegmentation done by filtering DataFrames based on specific criteria, e.g., df.filter(df.col > 1).SELECT * FROM table WHERE col > value for segmenting data based on specific criteria.SELECT * FROM table WHERE col > value for segmentation in queries.
Broadcastingspark.conf.set("spark.sql.autoBroadcastJoinThreshold", size) for broadcast joins.SELECT /*+ BROADCAST(t2) */ * FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key for broadcast hint.SET hive.auto.convert.join = true allows automatic broadcasting of smaller tables during joins.

Examples:

  • Partitions: In PySpark, if you have a DataFrame df, you can repartition it as follows: df = df.repartition(4, "column_name")
  • Bucketing: When bucketing in PySpark: df.write.bucketBy(5, "column_name").saveAsTable("bucketed_table")
  • Segmentation: Segmenting DataFrames:pythonCopy codedf_segment = df.filter(df["column"] > 100)
  • Broadcasting: Broadcasting a DataFrame for optimization: from import SparkSession spark = SparkSession.builder.getOrCreate() spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10485760) # 10MB

This table provides a concise yet comprehensive guide to partitioning, bucketing, segmentation, and broadcasting across different frameworks.

We can use the NTILE window function to create buckets within a dataset in both PySpark and Spark SQL. The NTILE(n) function divides the result set into n buckets or groups, assigning a bucket number to each row. Here’s how you can apply it:

PySpark Example

from pyspark.sql import Window
from pyspark.sql import functions as F

windowSpec = Window.orderBy("column_name")
df_with_buckets = df.withColumn("bucket", F.ntile(4).over(windowSpec))

Spark SQL Example

SELECT *, NTILE(4) OVER (ORDER BY column_name) AS bucket
FROM table_name;

This creates four buckets based on the ordering of column_name. Each row is assigned a bucket number from 1 to 4.

Important concepts in the PySpark DataFrame API / Spark SQL

Here’s the updated comprehensive overview of important concepts in the PySpark DataFrame API, now including options for showing DataFrame content, schema, and columns:

CategoryFunctionDescriptionExample
Data Readingspark.read.csv()Read a CSV file into a DataFrame.df = spark.read.csv("path/to/file.csv")
spark.read.json()Read a JSON file into a DataFrame.df = spark.read.json("path/to/file.json")
spark.read.parquet()Read a Parquet file into a DataFrame.df = spark.read.parquet("path/to/file.parquet")
DataFrame CreationcreateDataFrame()Create a DataFrame from an RDD or a list.df = spark.createDataFrame(data, schema)
from_records()Create a DataFrame from structured data (like a list of tuples).df = spark.createDataFrame(data, ["col1", "col2"])
Transformationselect()Select specific columns from the DataFrame.df.select("column1", "column2")
filter()Filter rows based on a condition.df.filter(df["column"] > 100)
groupBy()Group rows by a specific column and perform aggregations.df.groupBy("column").count()
withColumn()Add a new column or replace an existing column.df.withColumn("new_col", df["col1"] + 1)
join()Join two DataFrames together.df1.join(df2, "common_col")
union()Combine two DataFrames with the same schema.df1.union(df2)
drop()Drop specified columns from the DataFrame.df.drop("column1")
distinct()Return a new DataFrame with distinct rows.df.distinct()
orderBy()Sort the DataFrame based on one or more columns.df.orderBy("column1", ascending=False)
pivot()Pivot a DataFrame to reshape it.df.groupBy("column1").pivot("column2").agg(F.sum("value"))
transpose()Transpose the DataFrame, flipping rows and columns.df.T (not directly available; use other methods to achieve)
Window FunctionsWindow.partitionBy()Create a window specification for calculations over specified partitions.windowSpec = Window.partitionBy("column").orderBy("value")
row_number()Assign a unique row number to rows within a partition.df.withColumn("row_num", F.row_number().over(windowSpec))
UDFs (User Defined Functions)udf.register()Register a Python function as a UDF.spark.udf.register("my_udf", my_function)
withColumn()Use a UDF to create a new column.df.withColumn("new_col", udf("my_udf")(df["col"]))
String ManipulationF.concat()Concatenate multiple strings into one.df.withColumn("new_col", F.concat(df["col1"], df["col2"]))
F.substring()Extract a substring from a string column.df.withColumn("sub_col", F.substring("col", 1, 3))
F.lower()Convert a string column to lowercase.df.withColumn("lower_col", F.lower("col"))
Date ManipulationF.current_date()Get the current date.df.withColumn("current_date", F.current_date())
F.date_add()Add days to a date column.df.withColumn("new_date", F.date_add("date_col", 5))
F.year()Extract the year from a date column.df.withColumn("year", F.year("date_col"))
Schema and Data TypesStructType()Define the schema of a DataFrame.schema = StructType([...])
DataTypeDefine data types for DataFrame columns.IntegerType(), StringType(), ...
Actionsshow(n)Displays the first n rows of the DataFrame. Defaults to 20.df.show(5)
show(truncate=True)Displays all columns but truncates long strings.df.show(truncate=False)
printSchema()Prints the schema of the DataFrame, showing column names and types.df.printSchema()
df.columnsReturns a list of column names in the DataFrame.columns = df.columns
count()Count the number of rows in the DataFrame.row_count = df.count()
collect()Retrieve all rows from the DataFrame as a list.data = df.collect()
Optimizationpersist()Store DataFrame in memory/disk for re-use.df.persist()
cache()Cache the DataFrame in memory.df.cache()
repartition()Change the number of partitions of the DataFrame.df.repartition(4)
coalesce()Reduce the number of partitions without a full shuffle.df.coalesce(2)

This table now includes options for showing DataFrame content, schema, and columns along with various other functionalities in the PySpark DataFrame API.

Here’s a cheat sheet for Spark SQL with common queries, functionalities, and examples:

CategorySQL CommandDescriptionExample
Data ReadingSELECT * FROM tableRetrieve all columns from a table.SELECT * FROM employees
FROMSpecify the table from which to select data.SELECT name, age FROM employees
Data FilteringWHEREFilter rows based on a condition.SELECT * FROM employees WHERE age > 30
Data TransformationSELECT ... ASRename selected columns.SELECT name AS employee_name FROM employees
JOINCombine rows from two or more tables based on a related column.SELECT * FROM employees JOIN departments ON employees.dep_id = departments.id
GROUP BYGroup rows that have the same values in specified columns.SELECT department, COUNT(*) FROM employees GROUP BY department
HAVINGFilter groups based on a condition after grouping.SELECT department, COUNT(*) FROM employees GROUP BY department HAVING COUNT(*) > 5
AggregationsCOUNT()Count the number of rows.SELECT COUNT(*) FROM employees
SUM(), AVG(), MIN(), MAX()Perform summation, average, minimum, and maximum calculations.SELECT AVG(salary) FROM employees
SortingORDER BYSort the result set by one or more columns.SELECT * FROM employees ORDER BY salary DESC
Window FunctionsROW_NUMBER() OVERAssign a unique number to rows within a partition.SELECT name, ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS rank FROM employees
Set OperationsUNIONCombine the results of two queries, removing duplicates.SELECT name FROM employees UNION SELECT name FROM contractors
UNION ALLCombine results including duplicates.SELECT name FROM employees UNION ALL SELECT name FROM contractors
Data ManipulationINSERT INTOInsert new records into a table.INSERT INTO employees VALUES ('John Doe', 28, 'Sales')
UPDATEModify existing records in a table.UPDATE employees SET age = age + 1 WHERE name = 'John Doe'
DELETERemove records from a table.DELETE FROM employees WHERE name = 'John Doe'
Table ManagementCREATE TABLECreate a new table.CREATE TABLE new_table (id INT, name STRING)
DROP TABLEDelete a table and its data.DROP TABLE new_table
ALTER TABLEModify an existing table (e.g., add a column).ALTER TABLE employees ADD COLUMN hire_date DATE
String FunctionsCONCAT()Concatenate two or more strings.SELECT CONCAT(first_name, ' ', last_name) AS full_name FROM employees
SUBSTRING()Extract a substring from a string.SELECT SUBSTRING(name, 1, 3) FROM employees
Date FunctionsCURRENT_DATERetrieve the current date.SELECT CURRENT_DATE
DATEDIFF()Calculate the difference between two dates.SELECT DATEDIFF('2024-01-01', '2023-01-01') AS days_difference
YEAR(), MONTH(), DAY()Extract year, month, or day from a date.SELECT YEAR(hire_date) FROM employees

This cheat sheet covers various aspects of Spark SQL, including data reading, transformation, filtering, and aggregations, along with examples for clarity. You can adapt these queries to your specific dataset and requirements.

comparison cheatcode for date manipulation in PySpark SQL vs Hive QL:

FunctionPySpark SQL SyntaxHiveQL SyntaxDescription
Current Datecurrent_date()current_date()Returns current date.
Add Daysdate_add(date, days)date_add(date, days)Adds specified days to a date.
Subtract Daysdate_sub(date, days)date_sub(date, days)Subtracts specified days from a date.
Add Monthsadd_months(date, months)add_months(date, months)Adds specified months to a date.
Truncate Datedate_trunc(format, date)trunc(date, 'format')Truncates date to specified unit.
Date Differencedatediff(end_date, start_date)datediff(end_date, start_date)Returns days between dates.
Year Extractionyear(date)year(date)Extracts the year from a date.
Month Extractionmonth(date)month(date)Extracts the month from a date.
Day Extractionday(date) or dayofmonth(date)day(date) or dayofmonth(date)Extracts the day from a date.
Format Datedate_format(date, 'format')date_format(date, 'format')Formats date to specified pattern.
Last Day of Monthlast_day(date)last_day(date)Returns last day of the month.
Unix Timestampunix_timestamp(date, 'pattern')unix_timestamp(date, 'pattern')Converts date to Unix timestamp.
From Unix Timestampfrom_unixtime(unix_time, 'format')from_unixtime(unix_time, 'format')Converts Unix timestamp to date.
Next Daynext_day(date, 'day_of_week')next_day(date, 'day_of_week')Returns next specified day of the week.

Example

To add 5 days to the current date:

  • PySpark SQL: SELECT date_add(current_date(), 5)
  • Hive QL: SELECT date_add(current_date(), 5)

Both PySpark SQL and HiveQL share similar functions, but PySpark SQL has better integration with Python datetime types and additional flexibility.

PySpark SQL provides better integration with Python datetime types due to its compatibility with the pyspark.sql.functions library, which allows for seamless handling of Python datetime objects. This flexibility enables direct manipulation of dates and times, conversion between types, and usage of Python’s datetime features within PySpark’s DataFrames.

Additionally, PySpark SQL allows using user-defined functions (UDFs) that interact directly with Python datetime libraries, enhancing its adaptability for complex time-based transformations compared to HiveQL’s fixed SQL-like functions.

Here’s a table showing examples of how PySpark SQL integrates with Python datetime types via the pyspark.sql.functions library:

FunctionDescriptionPySpark ExampleExplanation
Current DateReturns today’s date.from pyspark.sql.functions import current_date
df.select(current_date().alias("current_date"))
Directly fetches the current date using current_date() function.
Convert to DateTypeConverts a Python datetime to Spark DateType.from datetime import datetime
from pyspark.sql.functions import lit, to_date
df.select(to_date(lit(datetime.now())))
Allows converting Python datetime objects directly to Spark DateType using to_date() function.
Add DaysAdds days to a date.from pyspark.sql.functions import date_add
df.select(date_add(current_date(), 5))
Adds 5 days to the current date.
Date FormattingFormats date to a specified pattern.from pyspark.sql.functions import date_format
df.select(date_format(current_date(), 'yyyy-MM-dd'))
Formats the date into yyyy-MM-dd format.
Extracting Parts of DateExtracts year, month, day, etc., from a date.from pyspark.sql.functions import year, month, dayofmonth
df.select(year(current_date()), month(current_date()))
Extracts year and month directly from the current date.
Date DifferenceFinds the difference between two dates.from pyspark.sql.functions import datediff
df.select(datediff(current_date(), lit('2022-01-01')))
Calculates the difference in days between today and a specified date.
Handling Null DatesChecks and fills null date values.df.fillna({'date_column': '1970-01-01'})Replaces null date values with a default value (e.g., epoch start date).
Convert String to DateConverts a string to DateType.df.select(to_date(lit("2023-10-31"), "yyyy-MM-dd"))Converts a string to DateType using a specified format.
Using Python UDFs with DatesApplies Python functions directly to datetime columns.from pyspark.sql.functions import udf
df.withColumn('year', udf(lambda x: x.year, IntegerType())(col('date_column')))
Allows creating UDFs with Python’s datetime methods, e.g., extracting the year directly from date columns.

Explanation:

  • PySpark’s integration with Python datetime types simplifies date manipulations by allowing direct conversion and application of Python datetime methods.
  • Functions like to_date, date_format, and UDFs make PySpark SQL highly flexible for working with dates, giving more control over formatting, extraction, and handling nulls or custom date logic in transformations.

This compatibility makes it easier to write complex date manipulations within PySpark, directly utilizing Python’s rich datetime functionality.


Discover more from AI HintsToday

Subscribe to get the latest posts sent to your email.

Newsletter

Our latest updates in your e-mail.

Comments


Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Discover more from AI HitsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading