Checking Schema, Data Types & Displaying Records with Conditions in PySpark 🚀
1. Checking Schema & Data Types
Method 1: Using .printSchema()
Prints the schema of the DataFrame in a tree format.
df.printSchema()
Example Output:
root
|-- ID: integer (nullable = true)
|-- Name: string (nullable = true)
|-- Age: integer (nullable = true)
|-- Department: string (nullable = true)
|-- Salary: double (nullable = true)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
for field in schema.fields:
print(f"Column: {field.name}, DataType: {field.dataType.simpleString()}")
#df.schema.fields: A list of StructField objects, each representing a column in your DataFrame.
#field.dataType: Retrieves the data type (like StringType(), IntegerType()) for that column.
#simpleString(): Converts the data type object to a simple, readable string (e.g., "string", "int").
Method 2: Using .dtypes
Returns a list of tuples containing column names and their data types.
df.dtypes
'''df.dtypes is an attribute of a PySpark DataFrame that returns a list of tuples, where each tuple contains the column name and its data type as a string.'''
Alternative Using df.schema.fields
You can also get a similar output to df.dtypes using:
[(field.name, field.dataType.simpleString()) for field in df.schema.fields]
Example Output:
[('ID', 'int'), ('Name', 'string'), ('Age', 'int'), ('Department', 'string'), ('Salary', 'double')]
Key Differences Between df.dtypes
and df.schema.fields
Feature | df.dtypes | df.schema.fields |
---|---|---|
Type of Output | List of tuples ([(col_name, dtype)] ) | List of StructField objects |
Data Type Format | String representation (e.g., "int" , "string" ) | Full DataType object (e.g., IntegerType() , StringType() ) |
Use Case | Quick lookup of column names & types | More detailed schema operations |
Method 3: Using .schema
Returns a StructType
object describing the schema.
df.schema
Example Output:
StructType([StructField('ID', IntegerType(), True),
StructField('Name', StringType(), True),
StructField('Age', IntegerType(), True),
StructField('Department', StringType(), True),
StructField('Salary', DoubleType(), True)])
2. Displaying Records
Method 1: .show()
Displays default 20 rows.
df.show()
Method 2: Show Specific Number of Rows
df.show(5)
Method 3: Show with Truncated Columns
df.show(truncate=False)
3. Filtering & Displaying Records with Multiple Conditions
Using .filter()
with Multiple Conditions
df_filtered = df.filter((df["Age"] > 30) & (df["Salary"] > 50000))
df_filtered.show()
Using .where()
(Same as .filter()
)
df_filtered = df.where((df["Age"] > 30) & (df["Department"] == "IT"))
df_filtered.show()
Using SQL Expressions (expr()
)
from pyspark.sql.functions import expr
df_filtered = df.filter(expr("Age > 30 AND Salary > 50000"))
df_filtered.show()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, sum, avg, count, explode, row_number
from pyspark.sql.window import Window
# Initialize Spark Session
spark = SparkSession.builder.appName("PySpark_Useful_Functions").getOrCreate()
# Sample Data
data = [(1, "Alice", 25, "HR", 50000),
(2, "Bob", 30, "IT", 60000),
(3, "Charlie", 35, "IT", 70000),
(4, "David", 40, "Finance", 80000),
(5, "Eve", 45, "Finance", 90000)]
columns = ["ID", "Name", "Age", "Department", "Salary"]
df = spark.createDataFrame(data, columns)
# 1. Alias (Renaming Columns Temporarily)
df_alias = df.select(col("Name").alias("Full_Name"), col("Age"))
df_alias.show()
# 2. Distinct (Remove Duplicates)
df_distinct = df.select("Department").distinct()
df_distinct.show()
# 3. Filtering Data
df_filtered = df.filter((df["Age"] > 30) & (df["Department"] == "IT"))
df_filtered.show()
# 4. Adding & Modifying Columns
df_new = df.withColumn("New_Column", lit("DefaultValue"))
df_casted = df.withColumn("Salary", df["Salary"].cast("double"))
df_new.show()
df_casted.printSchema()
# 5. Aggregations (Sum, Average, Count)
df_grouped = df.groupBy("Department").agg(
sum("Salary").alias("Total_Salary"),
avg("Age").alias("Average_Age")
)
df_grouped.show()
# 6. Sorting
df_sorted = df.orderBy("Age", ascending=False)
df_sorted.show()
# 7. Joining DataFrames
extra_data = [(1, "US"), (2, "Canada"), (3, "UK"), (4, "Germany"), (5, "India")]
columns_extra = ["ID", "Country"]
df_extra = spark.createDataFrame(extra_data, columns_extra)
df_joined = df.join(df_extra, "ID", "inner")
df_joined.show()
# 8. Exploding Nested Data
df_nested = df.withColumn("Hobbies", lit("['Reading', 'Sports']"))
df_exploded = df_nested.withColumn("Hobby", explode(lit(["Reading", "Sports"])))
df_exploded.show()
# 9. Collecting Rows
rows = df.collect()
for row in rows:
print(row)
# 10. Row Numbering & Ranking
windowSpec = Window.partitionBy("Department").orderBy("Salary")
df_ranked = df.withColumn("Rank", row_number().over(windowSpec))
df_ranked.show()
# 11. Checking Schema & Data Types
df.printSchema()
print(df.dtypes)
print(df.schema)
# 12. Displaying Data with .show() Options
# Default show (20 rows, truncated at 20 characters)
df.show()
# Show a specific number of rows
df.show(5)
# Prevent truncation (full column display)
df.show(truncate=False)
# Adjust truncation length
df.show(truncate=50)
# Show data in vertical format
df.show(n=5, vertical=True)
# Combine options
df.show(n=10, truncate=40, vertical=True)
# Stop Spark Session
spark.stop()
The .show()
method in PySpark has multiple options to customize how the DataFrame is displayed. Here’s a breakdown:
1. Basic Usage
df.show()
- Displays the first 20 rows by default.
- Truncates long strings to 20 characters.
2. Show a Specific Number of Rows
df.show(5) # Show first 5 rows
- Limits the output to 5 rows.
3. Prevent Truncation (Full Column Display)
df.show(truncate=False)
- By default, columns are truncated to 20 characters.
- Setting
truncate=False
displays the full content of each column.
✅ Example:
data = [(1, "A very long text that should not be truncated"),
(2, "Short text")]
df = spark.createDataFrame(data, ["ID", "Description"])
df.show(truncate=False)
Output:
+---+----------------------------------------------+
| ID | Description |
+---+----------------------------------------------+
| 1 | A very long text that should not be truncated |
| 2 | Short text |
+---+----------------------------------------------+
4. Adjust Column Truncation Length
df.show(truncate=50)
- Instead of truncating at 20 characters, this truncates at 50 characters.
5. Display with Row Indexes (vertical=True
)
df.show(n=5, vertical=True)
- Displays each row vertically, which is useful when there are too many columns.
✅ Example Output:
-RECORD 0------------------------
ID | 1
Name | Alice
Age | 25
Department | HR
Salary | 50000.0
-RECORD 1------------------------
ID | 2
Name | Bob
Age | 30
Department | IT
Salary | 60000.0
6. Combine Options
df.show(n=10, truncate=40, vertical=True)
- Shows 10 rows
- Truncates column data at 40 characters
- Displays data in vertical format
Summary of .show()
Options
Option | Description | Example |
---|---|---|
df.show() | Default (20 rows, truncates long values at 20 chars) | df.show() |
df.show(n) | Displays first n rows | df.show(5) |
df.show(truncate=False) | Displays full column values without truncation | df.show(truncate=False) |
df.show(truncate=n) | Truncates column values at n characters | df.show(truncate=50) |
df.show(vertical=True) | Displays rows vertically instead of in tabular format | df.show(vertical=True) |
# Apply Dynamic Logic to All Columns: Trim All String Columns
df = df.select([trim(col(c)).alias(c) if dtype == "string" else col(c) for c, dtype in df.dtypes])
Does python not have trim function inbuilt then why we want to import from pyspark sql?
You’re right to ask about this! In Python, we do have a built-in strip()
function for trimming strings. However, when working with PySpark DataFrames, we need to use PySpark functions instead of native Python functions. Here’s why:
1. PySpark Works on Distributed Data
PySpark operates on distributed data across multiple machines using the Spark engine. Standard Python functions like strip()
only work on local Python objects, not on distributed DataFrame columns.
2. Need for PySpark SQL Functions
PySpark provides its own set of SQL functions (pyspark.sql.functions
), which are optimized for distributed processing. The function trim()
from pyspark.sql.functions
works on entire DataFrame columns efficiently.
3. Why Not Use strip()
Instead?
If you tried something like:
df = df.withColumn("col1", df["col1"].strip()) # ❌ This won't work!
It would throw an error, because strip()
is a method of Python strings, not a PySpark column.
Instead, you must use:
from pyspark.sql.functions import trim
df = df.withColumn("col1", trim(df["col1"])) # ✅ This works in PySpark
4. Applying to All String Columns
The logic I suggested:
from pyspark.sql.functions import trim, col
df = df.select([trim(col(c)).alias(c) if dtype == "string" else col(c) for c, dtype in df.dtypes])
- Uses
trim(col(c))
to apply trimming only to string-type columns. - Keeps other column types unchanged.
Final Thought
If you’re working outside PySpark, use Python’s strip()
. But inside PySpark, always use pyspark.sql.functions.trim()
for better performance and distributed execution.
Yes! PySpark provides a rich set of string functions in the pyspark.sql.functions
module. Here’s a list of commonly used string functions along with their descriptions:
🔹 1. String Manipulation Functions
Function | Description | Example |
---|---|---|
trim(col) | Removes leading and trailing spaces from a string. | trim(col("name")) |
ltrim(col) | Removes leading spaces from a string. | ltrim(col("name")) |
rtrim(col) | Removes trailing spaces from a string. | rtrim(col("name")) |
lower(col) | Converts a string to lowercase. | lower(col("name")) → "hello" |
upper(col) | Converts a string to uppercase. | upper(col("name")) → "HELLO" |
initcap(col) | Capitalizes the first letter of each word. | initcap(col("name")) → "Hello World" |
reverse(col) | Reverses the string. | reverse(col("name")) → "olleH" |
concat(col1, col2, …) | Concatenates multiple string columns. | concat(col("first"), col("last")) |
concat_ws(sep, col1, col2, …) | Concatenates strings with a separator. | concat_ws("-", col("first"), col("last")) → "John-Doe" |
format_string(fmt, col1, col2, …) | Formats strings using C-style format. | format_string("Name: %s", col("name")) |
repeat(col, n) | Repeats a string n times. | repeat(col("name"), 3) → "HelloHelloHello" |
🔹 2. Substring & Splitting Functions
Function | Description | Example |
---|---|---|
substring(col, pos, len) | Extracts a substring starting at pos (1-based index) with length len. | substring(col("name"), 2, 3) → "ell" |
split(col, pattern) | Splits a string using a regex pattern and returns an array. | split(col("email"), "@") |
regexp_extract(col, pattern, group) | Extracts regex pattern from a string. | regexp_extract(col("email"), "(\\w+)@(\\w+)", 1) |
regexp_replace(col, pattern, replacement) | Replaces occurrences of a regex pattern in a string. | regexp_replace(col("name"), "a", "o") |
🔹 3. Padding Functions
Function | Description | Example |
---|---|---|
lpad(col, len, pad) | Left-pads a string with pad to a total length of len. | lpad(col("name"), 10, "*") → "*****Hello" |
rpad(col, len, pad) | Right-pads a string with pad to a total length of len. | rpad(col("name"), 10, "*") → "Hello*****" |
🔹 4. Finding & Replacing Functions
Function | Description | Example |
---|---|---|
locate(substr, col, pos=1) | Finds position of substring substr (1-based index). Returns 0 if not found. | locate("l", col("name")) → 3 |
instr(col, substr) | Finds position of substring (same as locate ). | instr(col("name"), "l") |
translate(col, from_str, to_str) | Replaces characters in a string (like tr in Unix). | translate(col("name"), "aeiou", "12345") |
overlay(col, replace, pos, len) | Replaces part of a string with another string. | overlay(col("name"), "XYZ", 2, 3) |
🔹 5. Length & Case Checking Functions
Function | Description | Example |
---|---|---|
length(col) | Returns the length of a string. | length(col("name")) |
char_length(col) | Alias for length() . | char_length(col("name")) |
octet_length(col) | Returns the number of bytes in a string. | octet_length(col("name")) |
bit_length(col) | Returns the bit length of a string. | bit_length(col("name")) |
🔹 6. Checking String Patterns
Function | Description | Example |
---|---|---|
like(col, pattern) | Checks if the column matches a SQL LIKE pattern (% = any characters, _ = single character). | col("name").like("J%") |
rlike(col, pattern) | Checks if the column matches a regex pattern. | col("name").rlike("^J.*") |
contains(col, substr) | Checks if the column contains a substring. | col("name").contains("John") |
startswith(col, substr) | Checks if the column starts with a substring. | col("name").startswith("J") |
endswith(col, substr) | Checks if the column ends with a substring. | col("name").endswith("n") |
Example Usage
Here’s how you can use these functions in a PySpark DataFrame:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, upper, length, split, concat_ws
# Initialize Spark
spark = SparkSession.builder.appName("StringFunctions").getOrCreate()
# Sample DataFrame
data = [(" John ", "john.doe@example.com"),
("Alice", "alice@example.com"),
("Bob ", "bob@example.com")]
df = spark.createDataFrame(data, ["name", "email"])
# Apply string functions
df = df.select(
trim(col("name")).alias("trimmed_name"), # Remove spaces
upper(col("name")).alias("uppercase_name"), # Convert to uppercase
length(col("name")).alias("name_length"), # Get string length
split(col("email"), "@").alias("email_parts"), # Split email
concat_ws("-", col("name"), col("email")).alias("full_info") # Concatenate columns
)
df.show(truncate=False)
Conclusion
PySpark has many built-in string functions that help with cleaning, formatting, and processing text data efficiently. Using native PySpark functions ensures performance optimization in a distributed environment.
So for string functions We will use inbuilt pyspark functions but then what about list and dictionary objects which are frequently used in pyspark script what functions we will use and where it will get executed considering distributed nature!!
Great question! In PySpark, handling lists and dictionaries depends on whether they are used inside:
- PySpark DataFrame operations (distributed execution)
- Python driver code (local execution)
1. List & Dictionary in PySpark (Distributed Execution)
When working with DataFrame columns, PySpark has built-in functions to manipulate lists and dictionaries in a distributed manner. These functions run on Spark workers across the cluster.
Handling Lists (ArrayType in PySpark)
PySpark provides SQL functions for handling list (array) columns inside DataFrames.
- Create Lists in a Column:
from pyspark.sql.functions import array, col df = df.withColumn("new_col", array(col("col1"), col("col2"))) # Creates a list [col1, col2]
- Extract Elements from Lists:
from pyspark.sql.functions import col df = df.withColumn("first_element", col("list_col")[0]) # Get first element of array column
- Check If a Value Exists in a List Column:
from pyspark.sql.functions import array_contains df = df.withColumn("has_value", array_contains(col("list_col"), "target_value"))
Handling Dictionaries (MapType in PySpark)
PySpark has a MapType
for handling dictionary-like data.
- Create a Dictionary Column:
from pyspark.sql.types import MapType, StringType from pyspark.sql.functions import create_map, lit df = df.withColumn("dict_col", create_map(lit("key1"), col("value1"), lit("key2"), col("value2")))
- Extract Value from Dictionary:
df = df.withColumn("value1", col("dict_col")["key1"]) # Accessing a dictionary key in PySpark
- Check If Key Exists in a Dictionary Column:
from pyspark.sql.functions import map_keys df = df.withColumn("has_key", map_keys(col("dict_col")).contains("key1"))
2. List & Dictionary in Python Driver Code (Local Execution)
If you’re working outside the DataFrame (in normal Python code), you use Python’s built-in list and dictionary functions. These operations run on the driver node, not on Spark workers.
Example:
# Local execution (not distributed)
my_list = [1, 2, 3]
my_dict = {"key1": "value1", "key2": "value2"}
print(my_list[0]) # Output: 1
print(my_dict["key1"]) # Output: "value1"
3. Where Do These Executions Happen?
Operation | Execution Location | Example |
---|---|---|
Python lists/dicts | Local (Driver Node) | my_list[0] , my_dict["key"] |
PySpark array (list) | Distributed (Workers) | col("array_col")[0] |
PySpark map (dict) | Distributed (Workers) | col("map_col")["key"] |
4. Converting Between PySpark & Python Objects
If you need to process data using Python’s list/dict functions, you must collect the data to the driver (use with caution for large data!).
list_from_spark = df.select("array_col").rdd.map(lambda row: row[0]).collect() # Collect list column
dict_from_spark = df.select("map_col").rdd.map(lambda row: row[0]).collect() # Collect dict column
Final Takeaway
- Use PySpark functions (
array
,create_map
, etc.) for distributed list/dict processing. - Use Python’s built-in list/dict methods only in driver code.
- Avoid
collect()
on large DataFrames—it pulls data into the driver, defeating Spark’s distributed nature.
Different ways to collect a PySpark DataFrame as rows and use them in different scenarios:
1. Collecting DataFrame as a List of Rows
df_list = df.collect()
for row in df_list:
print(row) # Each row is a Row object
🔹 Use Case: Useful when working with small DataFrames and when processing needs to be done in Python.
2. Collecting DataFrame as a List of Lists
df_list_of_lists = [list(row) for row in df.collect()]
print(df_list_of_lists)
🔹 Use Case: If you need a list of values for each row instead of Row
objects.
3. Collecting DataFrame as a Dictionary (Column-wise)
df_dict = {col: df.select(col).rdd.flatMap(lambda x: x).collect() for col in df.columns}
print(df_dict)
🔹 Use Case: When we want to access values based on column names.
4. Counting Number of Nulls in Each Column
from pyspark.sql.functions import col, sum
null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts.show()
🔹 Use Case: Checking missing values for data quality analysis.
5. Collecting Data Types of Columns
df_schema = {col_name: dtype for col_name, dtype in df.dtypes}
print(df_schema) # Output: {'col1': 'int', 'col2': 'string', ...}
🔹 Use Case: Understanding the schema of a DataFrame dynamically.
6. Collecting Only Column Names in a List
columns_list = df.columns
print(columns_list)
🔹 Use Case: If you want to loop through columns or dynamically select columns.
from pyspark.sql.functions import col, isnan, count, when
# Create a dictionary to hold the null counts
null_counts = {}
for column in df.columns:
count_val = df.select(
count(when(isnan(col(column)) | col(column).isNull(), 1)).alias("null_count")
).collect()[0]["null_count"]
null_counts[column] = count_val
# Now you can print out the results
for column, count_val in null_counts.items():
print(f"{column} has {count_val} null values")
print(null_counts)
from pyspark.sql.functions import isnan, when, count, col
for column in df.columns:
exec(f"{column}_nc = df.select(count(when(isnan(col(column)) | col(column).isNull(), 1))).collect()[0][0]")
for column in df.columns:
print(f"{column} has {eval(f'{column}_nc')} null values")
from pyspark.sql.functions import col, isnan, count, when
# Create a dictionary to hold the null counts
null_counts = {}
for column in df.columns:
count_val = df.select(
count(when(isnan(col(column)) | col(column).isNull(), 1)).alias("null_count")
).collect()[0]["null_count"]
null_counts[column] = count_val
# Now you can print out the results
for column, count_val in null_counts.items():
print(f"{column} has {count_val} null values")
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, current_timestamp, lit
from pyspark.sql.types import StringType, StructType, StructField, IntegerType, DoubleType, TimestampType
# Create your SparkSession if not already created
spark = SparkSession.builder.getOrCreate()
def get_table_create_time(table_name):
"""
Extracts the table creation time from the formatted description.
Note: The exact extraction might depend on your Spark/Metastore.
"""
desc_df = spark.sql(f"DESCRIBE FORMATTED {table_name}")
# Filter rows that might contain 'CreateTime' info (case may vary).
create_time_row = desc_df.filter(desc_df.col_name.contains("CreateTime")).head()
if create_time_row:
# Assume the formatted output has the create time in the 'data_type' field.
# You might need to adjust the parsing depending on your metastore's format.
return create_time_row.data_type.strip()
return None
def compute_dq_for_df(df, source_name, primary_key, created_time):
"""
For each column in df, compute:
- Data type (from schema)
- Null count
- Non-null count
- Non-null ratio (non-null count / total rows)
Returns a DataFrame with the following columns:
Source, CreatedTime, ColumnName, DataType, NullCount, NonNullCount, TotalCount, NonNullRatio
"""
total_count = df.count()
results = []
for field in df.schema.fields:
col_name = field.name
data_type = field.dataType.simpleString()
# Compute null count: covers both None and NaN (for numeric columns)
null_count = df.filter( col(col_name).isNull() | isnan(col(col_name)) ).count()
non_null_count = total_count - null_count
ratio = non_null_count / total_count if total_count > 0 else None
results.append((source_name, created_time, col_name, data_type, null_count, non_null_count, total_count, ratio))
schema = StructType([
StructField("Source", StringType(), True),
StructField("CreatedTime", StringType(), True), # or TimestampType() if parsed
StructField("ColumnName", StringType(), True),
StructField("DataType", StringType(), True),
StructField("NullCount", IntegerType(), True),
StructField("NonNullCount", IntegerType(), True),
StructField("TotalCount", IntegerType(), True),
StructField("NonNullRatio", DoubleType(), True)
])
return spark.createDataFrame(results, schema=schema)
# List to collect DQ report DataFrames
dq_report_dfs = []
### Case 1: Table sources ###
table_names = ["table1", "table2"] # list your table names here
for table in table_names:
# Get creation time from metadata (could be None if not found)
create_time = get_table_create_time(table)
# Load the table as DataFrame
df_table = spark.table(table)
dq_df = compute_dq_for_df(df_table, source_name=table, primary_key="your_primary_key", created_time=create_time)
dq_report_dfs.append(dq_df)
### Case 2: CSV-based DataFrames ###
csv_files = {
"csv_data1": "/path/to/data1.csv",
"csv_data2": "/path/to/data2.csv"
}
for source_name, csv_path in csv_files.items():
# Read CSV into DataFrame (add any specific options as needed)
df_csv = spark.read.option("header", True).csv(csv_path)
# For CSV files, use current timestamp (or another logic) as the creation time
created_time = str(spark.sql("SELECT current_timestamp()").head()[0])
dq_df = compute_dq_for_df(df_csv, source_name=source_name, primary_key="your_primary_key", created_time=created_time)
dq_report_dfs.append(dq_df)
# Combine all DQ report DataFrames into one final report
if dq_report_dfs:
final_dq_report = dq_report_dfs[0]
for df_temp in dq_report_dfs[1:]:
final_dq_report = final_dq_report.union(df_temp)
# Show the DQ report
final_dq_report.show(truncate=False)
# Optionally, write the final report to a table for future use
final_dq_report.write.mode("overwrite").saveAsTable("dq_report_table")
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, isnan
from datetime import datetime
# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()
def compute_dq_summary(df, source_name, created_time):
"""
Computes a DQ summary for the entire DataFrame and returns a single-row dictionary.
The returned dictionary has:
- source_name: identifier of the data source (table or CSV)
- created_time: creation time as string
- data_types: dictionary {column: data type}
- null_counts: dictionary {column: null count}
- non_null_counts: dictionary {column: non-null count}
- total_count: total number of rows in the DataFrame
- null_ratios: dictionary {column: non-null ratio}
"""
total_count = df.count()
# Create a dictionary for data types from the schema
data_types = {field.name: field.dataType.simpleString() for field in df.schema.fields}
# Initialize dictionaries to hold column-wise metrics
null_counts = {}
non_null_counts = {}
null_ratios = {}
for col_name in df.columns:
# Count nulls (including NaN for numeric columns)
n_null = df.filter(col(col_name).isNull() | isnan(col(col_name))).count()
n_non_null = total_count - n_null
ratio = n_non_null / total_count if total_count > 0 else None
null_counts[col_name] = n_null
non_null_counts[col_name] = n_non_null
null_ratios[col_name] = ratio
# Build the summary dictionary
summary = {
"source_name": source_name,
"created_time": created_time,
"data_types": data_types,
"null_counts": null_counts,
"non_null_counts": non_null_counts,
"total_count": total_count,
"null_ratios": null_ratios
}
return summary
# Example usage for a DataFrame from a table:
table_name = "your_table_name"
df_table = spark.table(table_name)
# For a table, assume you have a method to retrieve the creation time from metadata
# Here, we just use the current timestamp for illustration
created_time_table = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
summary_table = compute_dq_summary(df_table, source_name=table_name, created_time=created_time_table)
# Example usage for a CSV-based DataFrame:
csv_source = "csv_data1"
df_csv = spark.read.option("header", True).csv("/path/to/data1.csv")
created_time_csv = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
summary_csv = compute_dq_summary(df_csv, source_name=csv_source, created_time=created_time_csv)
# Combine the summaries into a list
summaries = [summary_table, summary_csv]
# Create a DataFrame from the list of dictionaries
dq_report_df = spark.createDataFrame([Row(**s) for s in summaries])
# Show the report
dq_report_df.show(truncate=False)
# Optionally, save this report as a table
dq_report_df.write.mode("overwrite").saveAsTable("dq_report_summary")
Leave a Reply