Item | PySpark | Spark SQL | Pandas |
Read CSV | spark.read.csv(“file.csv”) | SELECT * FROM csv.file.csv` | pd.read_csv(“file.csv”) |
spark.read.csv | SELECT * FROM csv. | pd.read_csv | |
Read JSON | spark.read.json(“file.json”) | SELECT * FROM json.file.json` | pd.read_json(“file.json”) |
Read Parquet | spark.read.parquet(“file.parquet”) | SELECT * FROM parquet.file.parquet` | pd.read_parquet(“file.parquet”) |
Data Creation | PySpark | Spark SQL | Pandas |
From List | spark.createDataFrame([(1, ‘A’), (2, ‘B’)], [“col1”, “col2”]) | INSERT INTO table VALUES (1, ‘A’), (2, ‘B’) | pd.DataFrame({‘col1’: [1, 2], ‘col2’: [‘A’, ‘B’]}) |
From Dictionary | spark.createDataFrame([Row(**dict1)]) | N/A | pd.DataFrame.from_dict(dict1) |
UDF | PySpark | Spark SQL | Pandas |
Define UDF | from pyspark.sql.functions import udf | CREATE FUNCTION custom_func AS ‘com.example.CustomFunction’ | def func(x): return x+1 |
def func(x): return x+1 | |||
func_udf = udf(func) | |||
Use UDF | df.withColumn(“new_col”, func_udf(df[“col”])) | SELECT custom_func(col) AS new_col FROM table | df[“new_col”] = df[“col”].apply(func) |
Table Creation | PySpark | Spark SQL | Pandas |
Create DataFrame | spark.createDataFrame(data) | CREATE TABLE table AS SELECT * FROM data | pd.DataFrame(data) |
Create Temporary View | LP | CREATE TEMPORARY VIEW table AS SELECT * FROM data | N/A |
Display | PySpark | Spark SQL | Pandas |
Display First N Rows | df.show(n) | SELECT * FROM table LIMIT n | df.head(n) |
Show All Rows | df.show(truncate=False) | SELECT * FROM table | df |
Show Specific Columns | df.select(“col1”, “col2”).show() | SELECT col1, col2 FROM table LIMIT n | df[[‘col1’, ‘col2’]].head(n) |
Show with Truncation | df.show(n, truncate=True) | SELECT * FROM table LIMIT n | N/A |
Show with Custom Width | df.show(n, truncate=<width>) | N/A | N/A |
Vertical Show | df.show(n, vertical=True) | N/A | df.head(n).T |
Display Summary Stats | df.describe().show() | DESCRIBE table | df.describe() |
Show Distinct Rows | df.distinct().show() | SELECT DISTINCT * FROM table | df.drop_duplicates() |
Display Schema | df.printSchema() | DESCRIBE TABLE table | df.info() |
Show Unique Values | df.select(“col”).distinct().show() | SELECT DISTINCT col FROM table | df[‘col’].unique() |
Count Rows | df.count() | SELECT COUNT(*) FROM table | len(df) or df.shape[0] |
Check for Nulls | df.filter(df.col.isNull()).show() | SELECT * FROM table WHERE col IS NULL | df[df[‘col’].isna()] |
Column Operation | PySpark | Spark SQL | Pandas |
Add New Column | df.withColumn(“new_col”, df.col * 2) | SELECT *, col * 2 AS new_col FROM table | df[‘new_col’] = df[‘col’] * 2 |
Drop Column | df.drop(“col”) | SELECT * EXCEPT (col) FROM table | df.drop(“col”, axis=1) |
Column Manipulations | df.withColumn(“col”, expr), df.drop(“col”) | ALTER TABLE DROP COLUMN or create new tables with selected columns | df[‘new_col’] = expr, df.drop(columns=[“col”]) |
Table Alteration | df.withColumnRenamed, df.drop and then save as new | ALTER TABLE table ADD/DROP COLUMN … | df.rename(columns={“old”: “new”}) |
Rename Column | df.withColumnRenamed(“old”, “new”) | SELECT col AS new FROM table | df.rename(columns={“old”: “new”}) |
df_new = df.toDF(*[col + “_v1” for col in df.columns]) | |||
Transformation | PySpark | Spark SQL | Pandas |
Select Columns | df.select(“col1”, “col2”) | SELECT col1, col2 FROM table | df[[“col1”, “col2”]] |
Filter Rows | df.filter(df[“col”] > 5) | SELECT * FROM table WHERE col > 5 | df[df[“col”] > 5] |
Group By | df.groupBy(“col”).count() | SELECT col, COUNT(*) FROM table GROUP BY col | df.groupby(“col”).count() |
Group By Multiple Columns | df.groupBy([“col1”, “col2”]) | SELECT col1, col2, COUNT(*) FROM table GROUP BY col1, col2 | df.groupby([“col1”, “col2”]) |
Count | df.groupBy(“col”).count() | SELECT col, COUNT(*) FROM table GROUP BY col | df.groupby(“col”).count() |
Sum | df.groupBy(“col”).sum() | SELECT col, SUM(value) FROM table GROUP BY col | df.groupby(“col”)[“value”].sum() |
Average | df.groupBy(“col”).avg() | SELECT col, AVG(value) FROM table GROUP BY col | df.groupby(“col”)[“value”].mean() |
Max | df.groupBy(“col”).max() | SELECT col, MAX(value) FROM table GROUP BY col | df.groupby(“col”)[“value”].max() |
Min | df.groupBy(“col”).min() | SELECT col, MIN(value) FROM table GROUP BY col | df.groupby(“col”)[“value”].min() |
Group By with Filter | df.groupBy(“col”).filter(df[“value”] > 10) | SELECT col, COUNT(*) FROM table WHERE value > 10 GROUP BY col | df.groupby(“col”).filter(lambda x: x[“value”] > 10) |
Group By with Having | df.groupBy(“col”).having(df[“value”] > 10) | SELECT col, COUNT(*) FROM table GROUP BY col HAVING value > 10 | df.groupby(“col”).having(df[“value”] > 10) |
Group By with Rollup | df.groupBy(“col”).rollup() | SELECT col, COUNT(*) FROM table GROUP BY ROLLUP(col) | N/A |
Sort Ascending | df.sort(“col”) | SELECT * FROM table ORDER BY col ASC | df.sort_values(“col”, ascending=True) |
Sort Ascending | df.orderBy(“col”) | ||
Sort Ascending | df.orderBy(“col”, ascending=True) | SELECT * FROM table ORDER BY col Asc | df.sort_values(by=”col”, ascending=True) |
Sort Ascending | df.sort(col(“col”).asc()) | ||
Sort Descending | df.sort(col(“col”).desc()) | SELECT * FROM table ORDER BY col DESC | df.sort_values(“col”, ascending=False) |
Sort Descending | df.orderBy(“col”, ascending=False) | SELECT * FROM table ORDER BY col DESC | df.sort_values(by=”col”, ascending=False) |
Sort Descending | df.orderBy(col(“col”).desc()) | ||
Sort Multiple Columns | df.sort(“col1”, “col2”) | SELECT * FROM table ORDER BY col1 ASC, col2 DESC | df.sort_values([“col1”, “col2”], ascending=[True, False]) |
Sort Multiple Columns (Asc/Desc) | df.sort(col(“col1”).asc(), col(“col2”).desc()) | ||
Sort Multiple Columns (Asc/Desc) | df.orderBy([“col1”, “col2”], ascending=[True, False]) | SELECT * FROM table ORDER BY col1 ASC, col2 DESC | df.sort_values(by=[“col1”, “col2”], ascending=[True, False]) |
Sort Multiple Columns | df.orderBy(“col1”, “col2”) | ||
Sort with Expressions | df.sort((col(“col1”) + col(“col2”)).desc()) | SELECT * FROM table ORDER BY (col1 + col2) DESC | df.sort_values(df[“col1”] + df[“col2”], ascending=False) |
df.orderBy((df[“col1”] * 2).asc()) | |||
Sort with Nulls First | df.sort(col(“col”).asc_nulls_first()) | SELECT * FROM table ORDER BY col ASC NULLS FIRST | df.sort_values(“col”, ascending=True, na_position=’first’) |
df.orderBy(col(“col”).desc_nulls_first()) | |||
Sort with Nulls Last | df.sort(col(“col”).asc_nulls_last()) | SELECT * FROM table ORDER BY col ASC NULLS LAST | df.sort_values(“col”, ascending=True, na_position=’last’) |
df.orderBy(col(“col”).desc_nulls_last()) | |||
Sort In Place | N/A | N/A | df.sort_values(“col”, inplace=True) |
Sort by Index | N/A | N/A | df.sort_index(ascending=True) |
df.sort_index(ascending=False) | |||
Sort with Multiple Sort Keys | df.sort(“col1”, col(“col2”).desc()) | SELECT * FROM table ORDER BY col1 ASC, col2 DESC | df.sort_values([“col1”, “col2”], ascending=[True, True]) |
df.orderBy(“col1”, col(“col2”).asc()) | |||
Sort Using SQL Query | N/A | SELECT * FROM table ORDER BY col | N/A |
Sort with Custom Order | N/A | N/A | df[“col”] = pd.Categorical(df[“col”], categories=[“A”, “B”, “C”], ordered=True) |
df.sort_values(“col”) | |||
Sort with Multiple Data Types | df.sort(“col1”, “col2”) where col1 is integer and col2 is string | SELECT * FROM table ORDER BY col1, col2 | df.sort_values([“col1”, “col2”], ascending=[True, False]) |
df.orderBy(“col1”, “col2”) | |||
Sort with Functions | df.sort(lower(col(“col1”))) | SELECT * FROM table ORDER BY LOWER(col1) ASC | df.sort_values(“col1”.str.lower(), ascending=True) |
df.orderBy(abs(col(“col2”)).desc()) | |||
Union | df1.union(df2) | SELECT * FROM table1 UNION SELECT * FROM table2 | pd.concat([df1, df2]) |
Join | PySpark | Spark SQL | Pandas |
Inner Join | df1.join(df2, “col”) | SELECT * FROM table1 INNER JOIN table2 ON col | pd.merge(df1, df2, on=”col”) |
Left Join | df1.join(df2, “col”, “left”) | SELECT * FROM table1 LEFT JOIN table2 ON col | pd.merge(df1, df2, on=”col”, how=”left”) |
Right Join | df1.join(df2, “col”, “right”) | SELECT * FROM table1 RIGHT JOIN table2 ON col | |
Full Outer Join | df1.join(df2, “col”, “outer”) | SELECT * FROM table1 FULL OUTER JOIN table2 ON col | |
Left Semi Join | df1.join(df2, “col”, “left_semi”) | SELECT * FROM table1 LEFT SEMI JOIN table2 ON col | |
Left Anti Join | df1.join(df2, “col”, “left_anti”) | SELECT * FROM table1 LEFT ANTI JOIN table2 ON col | |
Transpose / Pivot | PySpark | Spark SQL | Pandas |
Transpose | df.transpose() | N/A | df.T |
Pivot | df.groupBy(“col”).pivot(“col2”).sum() | SELECT * FROM table PIVOT SUM(col) FOR col2 IN (values) | df.pivot_table(values=”col”, index=”col2″, columns=”col3″) |
Window Function | PySpark | Spark SQL | Pandas |
Row Number | df.withColumn(“row_num”, row_number().over(Window.orderBy(“col”))) | SELECT *, ROW_NUMBER() OVER (ORDER BY col) AS row_num FROM table | df[‘row_num’] = df.sort_values(“col”).index + 1 |
Rank | df.withColumn(“rank”, rank().over(Window.orderBy(“col”))) | SELECT *, RANK() OVER (ORDER BY col) AS rank FROM table | df[‘rank’] = df[‘col’].rank(method=’min’) |
Dense Rank | df.withColumn(“dense_rank”, dense_rank().over(Window.orderBy(“col”))) | SELECT *, DENSE_RANK() OVER (ORDER BY col) AS dense_rank FROM table | df[‘dense_rank’] = df[‘col’].rank(method=’dense’) |
Percent Rank | df.withColumn(“percent_rank”, percent_rank().over(Window.orderBy(“col”))) | SELECT *, PERCENT_RANK() OVER (ORDER BY col) AS percent_rank FROM table | Not directly available in Pandas; calculated using cumulative count formula |
NTile | df.withColumn(“ntile”, ntile(n).over(Window.orderBy(“col”))) | SELECT *, NTILE(n) OVER (ORDER BY col) AS ntile FROM table | Not available in Pandas, can use pd.qcut() for quantile grouping |
Lag | df.withColumn(“lag_val”, lag(“col”, n).over(Window.orderBy(“col”))) | SELECT *, LAG(col, n) OVER (ORDER BY col) AS lag_val FROM table | df[‘lag_val’] = df[‘col’].shift(n) |
Lead | df.withColumn(“lead_val”, lead(“col”, n).over(Window.orderBy(“col”))) | SELECT *, LEAD(col, n) OVER (ORDER BY col) AS lead_val FROM table | df[‘lead_val’] = df[‘col’].shift(-n) |
Cumulative Sum | df.withColumn(“cum_sum”, sum(“col”).over(Window.orderBy(“col”).rowsBetween(-sys.maxsize, 0))) | SELECT *, SUM(col) OVER (ORDER BY col ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cum_sum FROM table | df[‘cum_sum’] = df[‘col’].cumsum() |
Moving Average | df.withColumn(“moving_avg”, avg(“col”).over(Window.orderBy(“col”).rowsBetween(-n, 0))) | SELECT *, AVG(col) OVER (ORDER BY col ROWS BETWEEN n PRECEDING AND CURRENT ROW) AS moving_avg FROM table | df[‘moving_avg’] = df[‘col’].rolling(window=n).mean() |
First Value | df.withColumn(“first_val”, first(“col”).over(Window.orderBy(“col”))) | SELECT *, FIRST_VALUE(col) OVER (ORDER BY col) AS first_val FROM table | df[‘first_val’] = df[‘col’].iloc[0] |
Last Value | df.withColumn(“last_val”, last(“col”).over(Window.orderBy(“col”))) | SELECT *, LAST_VALUE(col) OVER (ORDER BY col) AS last_val FROM table | df[‘last_val’] = df[‘col’].iloc[-1] |
String Manipulation | PySpark | Spark SQL | Pandas |
String Manipulation | F.concat, F.upper, F.lower, F.substring, F.replace | CONCAT(col1, ‘text’), UPPER(col2), LOWER(col3), SUBSTRING | df[‘col’].str.upper(), str.lower(), str.contains(‘text’) |
Concatenate | df.withColumn(“new_col”, concat(col(“col1”), col(“col2”))) | `SELECT col1 | |
Length | df.withColumn(“length”, length(col(“col”))) | SELECT LENGTH(col) AS length FROM table | df[‘length’] = df[‘col’].str.len() |
Lowercase | df.withColumn(“lower”, lower(col(“col”))) | SELECT LOWER(col) AS lower FROM table | df[‘lower’] = df[‘col’].str.lower() |
Uppercase | df.withColumn(“upper”, upper(col(“col”))) | SELECT UPPER(col) AS upper FROM table | df[‘upper’] = df[‘col’].str.upper() |
Trim | df.withColumn(“trim”, trim(col(“col”))) | SELECT TRIM(col) AS trim FROM table | df[‘trim’] = df[‘col’].str.strip() |
Split | df.withColumn(“split”, split(col(“col”), “,”)) | SELECT SPLIT(col, ‘,’) AS split FROM table | df[‘split’] = df[‘col’].str.split(‘,’) |
Replace | df.withColumn(“replace”, replace(col(“col”), “old”, “new”)) | SELECT REPLACE(col, ‘old’, ‘new’) AS replace FROM table | df[‘replace’] = df[‘col’].str.replace(‘old’, ‘new’) |
Date Functions | PySpark | Spark SQL | Pandas |
Date Functions | F.date_format, F.to_date, F.datediff, F.add_months | DATE_FORMAT(col, ‘format’), DATE_ADD(col, days) | pd.to_datetime(df[‘col’]) |
Current Date | df.withColumn(“current_date”, current_date()) | SELECT CURRENT_DATE AS current_date FROM table | df[‘current_date’] = pd.Timestamp.today() |
Current Timestamp | df.withColumn(“current_timestamp”, current_timestamp()) | SELECT CURRENT_TIMESTAMP AS current_timestamp FROM table | df[‘current_timestamp’] = pd.Timestamp.now() |
Date Format | df.withColumn(“date_format”, date_format(col(“date”), “yyyy-MM-dd”)) | SELECT DATE_FORMAT(date, ‘yyyy-MM-dd’) AS date_format FROM table | df[‘date_format’] = df[‘date’].dt.strftime(‘%Y-%m-%d’) |
Day of Month | df.withColumn(“day_of_month”, dayofmonth(col(“date”))) | SELECT DAYOFMONTH(date) AS day_of_month FROM table | df[‘day_of_month’] = df[‘date’].dt.day |
Day of Week | df.withColumn(“day_of_week”, dayofweek(col(“date”))) | SELECT DAYOFWEEK(date) AS day_of_week FROM table | df[‘day_of_week’] = df[‘date’].dt.dayofweek |
Month | df.withColumn(“month”, month(col(“date”))) | SELECT MONTH(date) AS month FROM table | df[‘month’] = df[‘date’].dt.month |
Year | df.withColumn(“year”, year(col(“date”))) | SELECT YEAR(date) AS year FROM table | df[‘year’] = df[‘date’].dt.year |
Add Days | df.withColumn(“add_days”, date_add(col(“date”), 5)) | SELECT DATE_ADD(date, 5) AS add_days FROM table | df[‘add_days’] = df[‘date’] + pd.Timedelta(days=5) |
Add Months | df.withColumn(“add_months”, add_months(col(“date”), 5)) | SELECT ADD_MONTHS(date, 5) AS add_months FROM table | df[‘add_months’] = df[‘date’] + pd.DateOffset(months=5) |
Control Statements | PySpark | Spark SQL | Pandas |
Control Statements | Use if-else with F.when for conditional columns | Use CASE WHEN … THEN for conditional logic in SQL | np.where(condition, value_if_true, value_if_false) |
Conditional (IF/CASE) | df.withColumn(“col”, F.when(df.col > 1, “A”).otherwise(“B”)) | CASE WHEN col > 1 THEN ‘A’ ELSE ‘B’ END | np.where(df[‘col’] > 1, “A”, “B”) |
For Loop | for row in df.collect(): print(row) | N/A | for index, row in df.iterrows(): print(row) |
Regex | PySpark | Spark SQL | Pandas |
Contains | df.filter(df[“col”].rlike(“pattern”)) | SELECT * FROM table WHERE col RLIKE ‘pattern’ | df[df[“col”].str.contains(“pattern”)] |
Replace | from pyspark.sql.functions import regexp_replace | REGEXP_REPLACE(col, ‘pattern’, ‘replace’) | df[“col”].str.replace(“pattern”, “replace”) |
df.withColumn(“col”, regexp_replace(“col”, “pattern”, “replace”)) | |||
Handling Missing Data | PySpark | Spark SQL | Pandas |
Fill NaN | df.fillna({‘col’: 0}) | SELECT COALESCE(col, 0) FROM table | df.fillna({‘col’: 0}) |
Drop NaN | df.na.drop() | SELECT * FROM table WHERE col IS NOT NULL | df.dropna() |
Optimization | PySpark | Spark SQL | Pandas |
Optimization – Cache | df.cache() – caches data in memory for faster access | CACHE TABLE table_name | Caching not directly supported |
Optimization – Repartition | df.repartition(numPartitions, “col”) to balance data across partitions | N/A in SQL, can partition with Hive tables | df.repartition(num_partitions) improves operations indirectly |
Optimization – Serialization | df.write.format(“parquet”).save(“path”), columnar formats like Parquet for better I/O efficiency | N/A, serialization handled outside SQL | Parquet is best used in large-scale data processing |
Partitions | df.repartition(numPartitions, “column”) creates partitions based on specified column. | CREATE TABLE table_name PARTITIONED BY (col1 STRING) allows data to be organized by partition. | |
Bucketing | df.write.bucketBy(numBuckets, “column”).saveAsTable(“table_name”) for distributing data. | CREATE TABLE table_name CLUSTERED BY (col1) INTO numBuckets BUCKETS for bucketing data in a table. | |
Segmentation | Segmentation done by filtering DataFrames based on specific criteria, e.g., df.filter(df.col > 1). | SELECT * FROM table WHERE col > value for segmenting data based on specific criteria. | |
Broadcasting | spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, size) for broadcast joins. | SELECT /*+ BROADCAST(t2) */ * FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key for broadcast hint. | |
select | Example Code Snippet | Description / Use Case | |
Basic Select | df.select(“col1”, “col2”).show() | Select specific columns | |
Rename Columns | df.select(col(“col1”).alias(“new_col1”), col(“col2”).alias(“new_col2”)).show() | Rename columns directly in select | |
Arithmetic Operation | df.select((col(“col1”) + 10).alias(“col1_plus_10”), (col(“col2”) * 2).alias(“col2_times_2”)).show() | Perform arithmetic operations and alias them | |
Conditional Logic | df.select(when(col(“col1”) > 50, “High”).otherwise(“Low”).alias(“level”)).show() | Apply conditional logic using when and otherwise | |
Combining Columns | df.select(concat(col(“col1”), lit(“_”), col(“col2”)).alias(“combined_col”)).show() | Concatenate multiple columns with literals | |
String Manipulation | df.select(upper(col(“name”)).alias(“name_upper”), lower(col(“city”)).alias(“city_lower”)).show() | Use string functions like upper and lower | |
Date Manipulation | df.select(date_add(col(“date_col”), 7).alias(“next_week”)).show() | Manipulate date columns, such as adding days | |
Complex Expressions | df.select((round((col(“col1”) * col(“col2”)) / 100, 2)).alias(“calculated_col”)).show() | Chain multiple operations within a single expression | |
Aggregations | df.groupBy(“category”).agg(sum(“value”).alias(“total_value”)).select(“category”, “total_value”).show() | Combine select with agg to create summary statistics | |
Filtered Selection | df.select(“col1”, “col2”).filter(col(“col1”) > 50).show() | Select specific columns with an additional filter | |
Mathematical Functions | df.select(sin(col(“col1”)).alias(“sin_col1”), log(col(“col2”)).alias(“log_col2”)).show() | Apply mathematical functions such as sin and log | |
Array Creation | df.select(array(“col1”, “col2”, “col3”).alias(“array_col”)).show() | Create an array column by combining multiple columns | |
JSON Parsing | df.select(get_json_object(col(“json_col”), “$.name”).alias(“name”)).show() | Parse JSON fields from a column using get_json_object | |
Casting Data Types | df.select(col(“col1”).cast(“int”).alias(“col1_int”), col(“col2”).cast(“string”).alias(“col2_str”)).show() | Cast columns to different data types | |
Complex Aggregation | df.groupBy(“category”).agg(count(when(col(“value”) > 50, 1)).alias(“count_above_50”)).select(“category”, “count_above_50”).show() | Perform conditional aggregation with count | |
Using SQL Functions | df.select(coalesce(col(“col1”), col(“col2”)).alias(“first_non_null”)).show() | Use SQL functions such as coalesce for null handling | |
Window Functions | df.withColumn(“rank”, dense_rank().over(Window.partitionBy(“category”).orderBy(col(“col1”).desc()))).select(“category”, “col1”, “rank”).show() | Apply window functions for rank and row number calculations | |
Array Conditionals | df.select(when(col(“col1”) > 50, array(“col2”, “col3”)).otherwise(array(“col4”)).alias(“conditional_array”)).show() | Use conditional logic to create arrays based on column values | |
Complex Type Creation | df.select(struct(col(“col1”).alias(“field1”), col(“col2”).alias(“field2”)).alias(“struct_col”)).show() | Create complex data types like structs within select | |
Pivot Operation | df.groupBy(“category”).pivot(“pivot_column”).agg(sum(“value”)).select(“*”).show() | Perform a pivot operation to reshape data based on column values | |
Null Handling | df.select(col(“col1”).isNull().alias(“col1_is_null”), col(“col2”).isNotNull().alias(“col2_is_not_null”)).show() | Check for null and non-null values in columns | |
Column Substring | df.select(col(“col1”).substr(1, 3).alias(“col1_substr”)).show() | Extract substring from column values | |
Conditional Replacement | df.select(col(“col1”).alias(“original”), when(col(“col1”) == “unknown”, “NA”).otherwise(col(“col1”)).alias(“updated”)).show() | Replace values in a column based on conditions | |
withColumn | Example Code Snippet | Description / Use Case | |
Add New Column | df.withColumn(“new_col”, lit(100)).show() | Adds a new column with a constant value 100. | |
Modify Existing Column | df.withColumn(“col1”, col(“col1”) * 10).show() | Modifies an existing column by multiplying its values by 10. | |
Conditional Column | df.withColumn(“status”, when(col(“age”) > 18, “Adult”).otherwise(“Minor”)).show() | Adds a new column status based on a conditional expression. | |
Column Based on Multiple Conditions | df.withColumn(“grade”, when(col(“score”) >= 90, “A”).when(col(“score”) >= 75, “B”).otherwise(“C”)).show() | Creates a new column grade with multiple conditions to assign values based on score. | |
Date and Time Operations | df.withColumn(“year”, year(col(“date”))).withColumn(“month”, month(col(“date”))).show() | Extracts year and month from a date column, creating new columns. | |
Concatenate Columns | df.withColumn(“full_name”, concat(col(“first_name”), lit(” “), col(“last_name”))).show() | Concatenates two or more string columns with a separator. | |
Cast Column Type | df.withColumn(“age_int”, col(“age”).cast(“int”)).show() | Converts the data type of an existing column to another type. | |
Array Operations | df.withColumn(“first_element”, col(“array_col”)[0]).show() | Extracts the first element from an array column. | |
Aggregate within Column | df.withColumn(“sum_sales”, expr(“array_sum(sales_array)”)).show() | Performs aggregation (like sum) within an array column. | |
Using SQL Expressions | df.withColumn(“discounted_price”, expr(“price * 0.9”)).show() | Uses SQL expression to perform operations directly within withColumn. | |
Generate Column Based on Other Columns | df.withColumn(“profit”, col(“revenue”) – col(“cost”)).show() | Creates a new column by performing arithmetic operations between existing columns. | |
withColumnRenamed Basic | df.withColumnRenamed(“old_col”, “new_col”).show() | Renames a column from old_col to new_col. | |
Multiple withColumnRenamed | df.withColumnRenamed(“old_col1”, “new_col1”).withColumnRenamed(“old_col2”, “new_col2”).show() | Renames multiple columns sequentially. | |
Chained withColumn Operations | df.withColumn(“double_age”, col(“age”) * 2).withColumn(“half_age”, col(“age”) / 2).show() | Adds multiple new columns by chaining multiple withColumn operations. | |
Apply UDF in withColumn | df.withColumn(“squared_age”, square_udf(col(“age”))).show() | Uses a User Defined Function (UDF) to create a new column based on existing column values. | |
Create Column with Random Values | df.withColumn(“random_val”, rand()).show() | Adds a new column with random float values between 0 and 1. | |
withColumn with Window Function | window_spec = Window.partitionBy(“group_col”).orderBy(“date_col”) | Adds a new column with ranking within partitions using a window function. | |
df.withColumn(“rank”, rank().over(window_spec)).show() | |||
Conditional Replacement | df.withColumn(“score”, when(col(“score”) < 0, 0).otherwise(col(“score”))).show() | Replaces negative values in score column with 0. | |
Chain withColumn for Transformation Pipeline | df.withColumn(“new1”, col(“col1”) + 1).withColumn(“new2”, col(“new1”) * 2).drop(“new1”).show() | Demonstrates column transformations in sequence, chaining multiple transformations and dropping intermediate steps. | |
filter | Example Code Snippet | Description / Use Case | |
Basic Filter with Condition | df.filter(col(“age”) > 18).show() | Filters rows where age is greater than 18. | |
Filter with SQL Expression | df.filter(“age > 18”).show() | Same as above but using a SQL expression string for condition. | |
Multiple Conditions (AND) | df.filter((col(“age”) > 18) & (col(“city”) == “New York”)).show() | Filters rows where age is greater than 18 and city is “New York”. | |
Multiple Conditions (OR) | `df.filter((col(“age”) < 18) | (col(“city”) == “Los Angeles”)).show()` | |
IN Condition | df.filter(col(“city”).isin([“New York”, “Los Angeles”])).show() | Filters rows where city is either “New York” or “Los Angeles”. | |
NOT IN Condition | df.filter(~col(“city”).isin([“Chicago”, “Houston”])).show() | Filters rows where city is not in the specified list of values. | |
NULL Check | df.filter(col(“email”).isNull()).show() | Filters rows where email column has null values. | |
NOT NULL Check | df.filter(col(“email”).isNotNull()).show() | Filters rows where email column is not null. | |
Filter with Like (Pattern Matching) | df.filter(col(“name”).like(“A%”)).show() | Filters rows where name starts with “A”. | |
Filter with RLIKE (Regex Matching) | df.filter(col(“name”).rlike(“^[A-Z].*”)).show() | Filters rows where name starts with an uppercase letter. | |
Using BETWEEN | df.filter(col(“age”).between(18, 30)).show() | Filters rows where age is between 18 and 30 (inclusive). | |
Filter with Array Contains | df.filter(array_contains(col(“hobbies”), “reading”)).show() | Filters rows where hobbies array column contains the value “reading”. | |
Filter by Length of String Column | df.filter(length(col(“name”)) > 5).show() | Filters rows where the length of name column is greater than 5. | |
Filter with Date Condition | df.filter(col(“date”) > lit(“2023-01-01”)).show() | Filters rows with date column after January 1, 2023. | |
Filter Based on Calculated Column | df.filter((col(“salary”) – col(“expenses”)) > 5000).show() | Filters rows based on a calculated difference between salary and expenses. | |
Substring Filter | df.filter(substring(col(“name”), 1, 3) == “Joh”).show() | Filters rows where the first 3 characters of name are “Joh”. | |
Using StartsWith and EndsWith | df.filter(col(“email”).startswith(“john”)).show() | Filters rows where email starts with “john” or ends with “.com”. | |
df.filter(col(“email”).endswith(“.com”)).show() | |||
Using exists in Array Column | df.filter(expr(“exists(array_col, x -> x > 10)”)).show() | Filters rows where at least one element in array_col is greater than 10. | |
Filter with Row Number (Window Function) | window_spec = Window.partitionBy(“city”).orderBy(col(“age”).desc()) | Filters to get the oldest person in each city by using a window function. | |
df.withColumn(“rank”, row_number().over(window_spec)).filter(col(“rank”) == 1).show() | |||
Filter on Nested Fields (Struct) | df.filter(col(“address.city”) == “San Francisco”).show() | Filters rows based on a nested field in a struct-type column, like address.city. | |
Filter Using UDF | df.filter(custom_udf(col(“age”))).show() | Filters rows using a custom UDF that returns a Boolean value. | |
Filter with CASE WHEN Logic | df.filter(when(col(“score”) > 90, “High”).otherwise(“Low”) == “High”).show() | Filters rows with a conditional transformation using when. | |
Random Sampling with Filter | df.filter(rand() < 0.1).show() | Filters a random sample of approximately 10% of the data. | |
Filter Rows with Aggregated Condition | df.groupBy(“city”).count().filter(col(“count”) > 100).show() | Filters groups with aggregate conditions, keeping only cities with more than 100 records. | |
selectExpr | Example Code Snippet | Description / Use Case | |
Basic SelectExpr | df.selectExpr(“col1”, “col2”).show() | Select specific columns using selectExpr syntax | |
Arithmetic Operation | df.selectExpr(“col1 + 10 as col1_plus_10”, “col2 * 2 as col2_times_2”).show() | Perform arithmetic operations directly within selectExpr | |
Conditional Logic | df.selectExpr(“CASE WHEN col1 > 50 THEN ‘High’ ELSE ‘Low’ END AS level”).show() | Apply conditional logic with SQL-style CASE WHEN | |
Combining Columns | df.selectExpr(“concat(col1, ‘_’, col2) as combined_col”).show() | Concatenate columns using SQL syntax | |
String Manipulation | df.selectExpr(“upper(name) as name_upper”, “lower(city) as city_lower”).show() | Use SQL functions to manipulate strings, e.g., upper, lower | |
Date Manipulation | df.selectExpr(“date_add(date_col, 7) as next_week”).show() | Perform date arithmetic within expressions, here adding 7 days | |
Complex Expressions | df.selectExpr(“round(col1 * col2 / 100, 2) as calculated_col”).show() | Use complex expressions to perform multiple operations in one column | |
Aggregations | df.groupBy(“category”).agg(expr(“sum(value) as total_value”)).show() | Aggregate values within selectExpr, here summing value | |
Filtering Columns | df.selectExpr(“col1”, “col2”).where(“col1 > 50”).show() | Filter data in combination with selectExpr | |
Conditional Aggregations | df.groupBy(“category”).agg(expr(“sum(case when value > 50 then 1 else 0 end) as count_above_50”)).show() | Count rows based on conditions within selectExpr | |
Mathematical Functions | df.selectExpr(“sin(col1) as sin_col1”, “log(col2) as log_col2”).show() | Use mathematical functions such as sin, log in expressions | |
Array Creation | df.selectExpr(“array(col1, col2, col3) as array_col”).show() | Create an array column by combining multiple columns | |
JSON Parsing | df.selectExpr(“get_json_object(json_col, ‘$.name’) as name”).show() | Parse JSON data stored within a column | |
Casting Types | df.selectExpr(“cast(col1 as int) as col1_int”, “cast(col2 as string) as col2_str”).show() | Change data types of columns within selectExpr | |
Aliasing Expressions | df.selectExpr(“col1 + col2 as total_sum”, “col2 – col3 as diff”).show() | Alias complex expressions directly in selectExpr | |
Using SQL Functions | df.selectExpr(“coalesce(col1, col2, col3) as first_non_null”).show() | Use SQL functions such as coalesce for data cleaning | |
Window Functions | df.withColumn(“rank”, expr(“dense_rank() over (partition by category order by col1 desc)”)).show() | Apply window functions for rank-based calculations | |
Conditional Array | df.selectExpr(“CASE WHEN col1 > 50 THEN array(col2, col3) ELSE array(col4) END as conditional_array”).show() | Create conditional arrays using selectExpr | |
Complex Type Handling | df.selectExpr(“named_struct(‘field1’, col1, ‘field2’, col2) as struct_col”).show() | Create complex types like structs within selectExpr | |
HintsToday
Hints and Answers for Everything
recent posts
- Memory Management in PySpark- CPU Cores, executors, executor memory
- Memory Management in PySpark- Scenario 1, 2
- Develop and maintain CI/CD pipelines using GitHub for automated deployment, version control
- Complete guide to building and managing data workflows in Azure Data Factory (ADF)
- Complete guide to architecting and implementing data governance using Unity Catalog on Databricks
about
Discover more from HintsToday
Subscribe to get the latest posts sent to your email.
Posted in Interview Prep
Leave a Reply