• For Better understanding on Spark SQL windows Function and Best Usecases do refer our post Window functions in Oracle Pl/Sql and Hive explained and compared with examples.

    Window functions in Spark SQL are powerful tools that allow you to perform calculations across a set of table rows that are somehow related to the current row. These functions are particularly useful for tasks that require a global view of the data, such as running totals, ranking, and time-series analysis.

    Overview of Window Functions

    In Spark SQL, window functions are used with the OVER clause and can be categorized into several types:

    Window Specification

    To use a window function, you need to define a window specification that includes:

    • Partitioning: Defines the subsets of data to which the function is applied.
    • Ordering: Defines the order of rows within each partition.
    • Frame: Defines the subset of rows relative to the current row.

    Basic Syntax of Window Fuction in Spark SQL

    SELECT column1, column2, 
    aggregate_function(column) OVER (PARTITION BY column_name ORDER BY column_name [ROWS/RANGE frame_clause]) AS alias
    FROM table_name;

    Components of a Window Function

    1. Aggregate or Analytical Function:
      • Examples: SUM(), AVG(), COUNT(), ROW_NUMBER(), RANK(), etc.
    2. OVER Clause:
      • Specifies the window over which the function operates.
    3. PARTITION BY Clause (optional):
      • Divides the dataset into partitions.
      • Each partition is processed independently.
    4. ORDER BY Clause (optional):
      • Specifies the ordering of rows within a partition.
    5. Frame Specification (optional):
      • Defines the range of rows within a partition for computation.
      • Options: ROWS, RANGE.


    1. PARTITION BY Clause

    • Divides the dataset into groups (partitions) for which the window function is applied independently.
    • Syntax:PARTITION BY column_name
    • Example:AVG(Salary) OVER (PARTITION BY Department) This computes the average salary separately for each department.

    2. ORDER BY Clause

    • Defines the order of rows within each partition.
    • Syntax:ORDER BY column_name [ASC | DESC] [NULLS FIRST | NULLS LAST]
      • ASC: Ascending order (default).
      • DESC: Descending order.
      • NULLS FIRST: Places NULL values at the beginning of the order.
      • NULLS LAST: Places NULL values at the end of the order.
    • Example:RANK() OVER (PARTITION BY Department ORDER BY Salary DESC NULLS LAST)

    3. Frame Specification

    • Defines the range of rows to consider for the window function.
    • Frame types:
      • ROWS: Based on row positions.
      • RANGE: Based on value ranges.

    Frame Syntax

    [ROWS | RANGE] BETWEEN frame_start AND frame_end

    Frame Options

    1. UNBOUNDED PRECEDING:
      • Includes all rows from the beginning of the partition.
      • Example: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW.
    2. CURRENT ROW:
      • Includes the current row only.
      • Example: ROWS BETWEEN CURRENT ROW AND CURRENT ROW.
    3. UNBOUNDED FOLLOWING:
      • Includes all rows until the end of the partition.
      • Example: ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING.
    4. Specific Number of Rows:
      • Includes a fixed number of preceding or following rows.
      • Example: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING.

    Examples

    • Cumulative sum:SUM(Sales) OVER (PARTITION BY CustomerID ORDER BY OrderDate ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
    • Moving average:AVG(Salary) OVER (PARTITION BY Department ORDER BY Salary ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)

    4. Handling NULL Values in ORDER BY

    • By default:
      • NULLS FIRST for ascending order.
      • NULLS LAST for descending order.

    Example

    RANK() OVER (PARTITION BY Department ORDER BY Salary DESC NULLS FIRST)

    5. Supported Functions with OVER Clause

    Spark SQL supports many window functions, including:

    Aggregate Functions

    • SUM(), AVG(), COUNT(), MIN(), MAX()

    Ranking Functions

    • ROW_NUMBER(), RANK(), DENSE_RANK(), NTILE(n)

    Value Functions

    • FIRST_VALUE(), LAST_VALUE(), LEAD(column, offset, default), LAG(column, offset, default)

    6. Combining PARTITION BY, ORDER BY, and Frames

    You can combine all components for more control:

    Example:

    SELECT EmpID,
           Department,
           Salary,
           AVG(Salary) OVER (PARTITION BY Department ORDER BY Salary ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS running_avg,
           RANK() OVER (PARTITION BY Department ORDER BY Salary DESC NULLS LAST) AS salary_rank
    FROM Employee;
    

    This example:

    1. Computes a running average for the last three salaries within each department.
    2. Ranks employees by salary in descending order, placing NULL salaries at the end.

    Summary of Options

    ClauseDescriptionSyntax Example
    PARTITION BYDivides rows into partitionsPARTITION BY Department
    ORDER BYOrders rows within partitionsORDER BY Salary DESC NULLS FIRST
    ROWSRow-based range for window frameROWS BETWEEN 2 PRECEDING AND CURRENT ROW
    RANGEValue-based range for window frameRANGE BETWEEN INTERVAL 1 DAY PRECEDING AND CURRENT ROW
    NULLSSpecifies how nulls are handled in orderingORDER BY Salary DESC NULLS LAST

    By combining these options, Spark SQL allows flexible and efficient computation over distributed datasets!



    Examples

    1. Rank Employees by Salary in Each Department

    SELECT EmpID,
           Emp_name,
           Department,
           Salary,
           RANK() OVER (PARTITION BY Department ORDER BY Salary DESC) AS rank
    FROM Employee;
    

    Explanation:

    • PARTITION BY Department: Separate ranking by department.
    • ORDER BY Salary DESC: Ranks employees by salary in descending order.

    2. Calculate Cumulative Sum of Sales

    SELECT CustomerID, 
    OrderDate,
    ProductID,
    SUM(Sales) OVER (PARTITION BY CustomerID ORDER BY OrderDate) AS cumulative_sales
    FROM Orders;

    Explanation:

    • PARTITION BY CustomerID: Cumulative sales calculated per customer.
    • ORDER BY OrderDate: Sales are accumulated in chronological order.

    3. Find Running Average Salary

    SELECT EmpID,
           Emp_name,
           Department,
           AVG(Salary) OVER (PARTITION BY Department ORDER BY Salary ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS running_avg
    FROM Employee;
    

    Explanation:

    • ROWS BETWEEN 2 PRECEDING AND CURRENT ROW: Calculates the average salary of the current row and the two preceding rows.

    4. Lead and Lag Example

    SELECT EmpID,
           Emp_name,
           Department,
           Salary,
           LAG(Salary, 1, 0) OVER (PARTITION BY Department ORDER BY Salary) AS previous_salary,
           LEAD(Salary, 1, 0) OVER (PARTITION BY Department ORDER BY Salary) AS next_salary
    FROM Employee;
    

    Explanation:

    • LAG(Salary, 1, 0): Fetches the previous salary within the partition. Default is 0 if there is no previous value.
    • LEAD(Salary, 1, 0): Fetches the next salary within the partition. Default is 0 if there is no next value.

    5. Find Employees Above Average Salary in Their Department

    SELECT EmpID,
           Emp_name,
           Department,
           Salary,
           AVG(Salary) OVER (PARTITION BY Department) AS avg_salary
    FROM Employee
    WHERE Salary > AVG(Salary) OVER (PARTITION BY Department);
    

    Explanation:

    • AVG(Salary) OVER (PARTITION BY Department): Calculates average salary for each department.
    • The WHERE clause filters employees whose salary is above the department average.

    Notes

    1. Spark SQL does not support DISTINCT in window functions.
      • For example, COUNT(DISTINCT column) OVER (...) is not allowed in Spark SQL.
    2. Optimized for Distributed Computing:
      • Unlike traditional SQL, Spark SQL handles large-scale datasets by distributing computations across the cluster.
    3. Frame Specification:
      • Use ROWS for row-level computations.
      • Use RANGE for value-based computations.

    Integration in PySpark

    Spark SQL window functions can also be used within PySpark via .selectExpr():

    df.selectExpr("EmpID", 
    "Salary",
    "AVG(Salary) OVER (PARTITION BY Department ORDER BY EmpID) AS avg_salary").show()

    Best Use Cases for Window Functions

    1. Running Totals and Moving Averages

    Calculate running totals or moving averages over a specified window of rows.

    SELECT
        customer_id,
        order_date,
        amount,
        SUM(amount) OVER (PARTITION BY customer_id ORDER BY order_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_total
    FROM orders;
    

    2. Ranking

    Assign ranks to rows based on the values in a specific column.

    SELECT
        customer_id,
        order_date,
        amount,
        RANK() OVER (PARTITION BY customer_id ORDER BY amount DESC) AS rank
    FROM orders;
    

    3. Time-Series Analysis

    Perform operations like lead and lag for time-series data.

    SELECT
        customer_id,
        order_date,
        amount,
        LAG(amount, 1) OVER (PARTITION BY customer_id ORDER BY order_date) AS previous_amount,
        LEAD(amount, 1) OVER (PARTITION BY customer_id ORDER BY order_date) AS next_amount
    FROM orders;
    

    4. Percentile and Distribution Analysis

    Compute percentiles and cumulative distributions.

    SELECT
        customer_id,
        order_date,
        amount,
        PERCENT_RANK() OVER (PARTITION BY customer_id ORDER BY amount) AS percent_rank,
        CUME_DIST() OVER (PARTITION BY customer_id ORDER BY amount) AS cumulative_distribution
    FROM orders;
    

    5. Identifying First and Last Records in Groups

    Identify the first and last records within each group.

    SELECT
        customer_id,
        order_date,
        amount,
        FIRST_VALUE(amount) OVER (PARTITION BY customer_id ORDER BY order_date) AS first_amount,
        LAST_VALUE(amount) OVER (PARTITION BY customer_id ORDER BY order_date) AS last_amount
    FROM orders;
    

    6.For SAS programmers:-How to translate sas first. and last. in spark sql?

    In Spark SQL, there’s no direct equivalent of SAS FIRST. and LAST. variables. However, you can achieve similar functionality using window functions like row_number() or first_value() and last_value() depending on your specific needs.

    Here’s how you can translate the concept in Spark SQL:

    1. Using row_number():

    This approach assigns a unique row number within each group defined by the BY variable(s). You can then use conditional logic to identify the first and last rows based on the row number.

    SELECT *,
           CASE WHEN row_number() OVER (PARTITION BY variable1, variable2 ORDER BY sort_column) = 1 THEN 1 ELSE 0 END AS is_first,
           CASE WHEN row_number() OVER (PARTITION BY variable1, variable2 ORDER BY sort_column) = COUNT(*) OVER (PARTITION BY variable1, variable2) THEN 1 ELSE 0 END AS is_last
    FROM your_data
    ORDER BY variable1, variable2, sort_column;
    

    Explanation:

    • We use row_number() with a PARTITION BY clause based on your BY variable(s) and an ORDER BY clause to define the order within each group (replace sort_column with the actual column for sorting).
    • The CASE statement assigns 1 to the first row (where row_number is 1) and 0 to others within the group for is_first.
    • Similarly, the second CASE statement identifies the last row by checking if the row_number is equal to the total count of rows within the group (using COUNT(*) OVER (PARTITION BY...)).

    2. Using first_value() and last_value():

    These functions can be used if you only need the values of specific columns for the first and last rows in each group.

    SELECT *,
           first_value(column_to_get) OVER (PARTITION BY variable1, variable2 ORDER BY sort_column) AS first_value,
           last_value(column_to_get) OVER (PARTITION BY variable1, variable2 ORDER BY sort_column) AS last_value
    FROM your_data
    ORDER BY variable1, variable2, sort_column;
    

    Explanation:

    • first_value() and last_value() are used within the SELECT clause with a similar windowing definition as before.
    • They return the value of the specified column_to_get for the first and last rows within each group, respectively.

    Choosing the Right Approach:

    • If you need additional information beyond just identifying the first and last rows (like row numbers), use row_number().
    • If you only need the values of specific columns for the first and last rows, use first_value() and last_value().

    Remember:

    • Ensure your data is sorted by the BY variable(s) before applying these window functions.
    • Adjust the windowing specifications (partitioning and ordering) based on your actual requirements.

    By leveraging these window functions in Spark SQL, you can effectively achieve functionalities similar to SAS FIRST. and LAST. variables for identifying and processing the first and last observations within groups defined by your BY variable(s).

    Scenario

    Assume we have a dataset with columns group1, group2, order1, and order2, and we want to identify the first and last occurrence of each group based on the combined ordering of order1 and order2.

    SAS Code Example

    data example;
    set example;
    by group1 group2 order1 order2;
    if first.group1 then first_group1 = 1;
    else first_group1 = 0;
    if last.group1 then last_group1 = 1;
    else last_group1 = 0;
    if first.group2 then first_group2 = 1;
    else first_group2 = 0;
    if last.group2 then last_group2 = 1;
    else last_group2 = 0;
    run;

    Equivalent in Spark SQL

    To translate this logic into Spark SQL, you can use the ROW_NUMBER function along with window functions to identify the first and last occurrence of each group.

    Spark SQL Code Example

    1. Create the DataFrame and Register as a Temporary View:
      • This step involves creating a DataFrame and registering it as a temporary view for SQL operations.
    2. Define the Window Specification:
      • The window specification is partitioned by the grouping variables and ordered by the ordering variables.
    3. Identify the First and Last Occurrences:
      • Use ROW_NUMBER with appropriate ordering to mark the first and last occurrences.

    Here’s how you can achieve this:

    from pyspark.sql import SparkSession

    # Initialize Spark session
    spark = SparkSession.builder.appName("SAS First Last Equivalent").getOrCreate()

    # Sample data
    data = [
    ("group1", "subgroup1", "2023-01-01", "10:00", 100),
    ("group1", "subgroup1", "2023-01-01", "10:30", 200),
    ("group1", "subgroup2", "2023-01-01", "11:00", 300),
    ("group2", "subgroup1", "2023-01-01", "09:00", 150),
    ("group2", "subgroup1", "2023-01-01", "09:30", 250),
    ("group2", "subgroup2", "2023-01-02", "08:00", 130),
    ("group2", "subgroup2", "2023-01-02", "08:30", 170)
    ]

    columns = ["group1", "group2", "order1", "order2", "amount"]

    # Create DataFrame
    df = spark.createDataFrame(data, columns)

    # Register the DataFrame as a temporary view
    df.createOrReplaceTempView("example")

    # Use Spark SQL to identify first and last occurrences
    sql_query = """
    WITH RankedData AS (
    SELECT
    group1,
    group2,
    order1,
    order2,
    amount,
    ROW_NUMBER() OVER (PARTITION BY group1 ORDER BY order1, order2) AS rn_group1_asc,
    ROW_NUMBER() OVER (PARTITION BY group1 ORDER BY order1 DESC, order2 DESC) AS rn_group1_desc,
    ROW_NUMBER() OVER (PARTITION BY group2 ORDER BY order1, order2) AS rn_group2_asc,
    ROW_NUMBER() OVER (PARTITION BY group2 ORDER BY order1 DESC, order2 DESC) AS rn_group2_desc
    FROM example
    )
    SELECT
    group1,
    group2,
    order1,
    order2,
    amount,
    CASE WHEN rn_group1_asc = 1 THEN 1 ELSE 0 END AS first_group1,
    CASE WHEN rn_group1_desc = 1 THEN 1 ELSE 0 END AS last_group1,
    CASE WHEN rn_group2_asc = 1 THEN 1 ELSE 0 END AS first_group2,
    CASE WHEN rn_group2_desc = 1 THEN 1 ELSE 0 END AS last_group2
    FROM RankedData
    """

    # Execute the SQL query
    result_df = spark.sql(sql_query)

    # Show the result
    result_df.show()

    Explanation

    1. RankedData Common Table Expression (CTE):
      • We create a CTE called RankedData where we calculate row numbers for each group based on the specified order.
      • ROW_NUMBER() OVER (PARTITION BY group1 ORDER BY order1, order2) assigns a row number to each row within group1 ordered by order1 and order2.
      • ROW_NUMBER() OVER (PARTITION BY group1 ORDER BY order1 DESC, order2 DESC) assigns a reverse row number to identify the last occurrence within group1.
      • The same logic is applied for group2.
    2. Select with CASE Statements:
      • We select from the RankedData CTE and use CASE statements to mark the first and last occurrences for each group.

    Sample Output

    The resulting DataFrame will include flags indicating the first and last occurrences for each group:

    +-------+---------+----------+-------+------+------------+-----------+------------+-----------+
    | group1| group2 | order1 | order2|amount|first_group1|last_group1|first_group2|last_group2|
    +-------+---------+----------+-------+------+------------+-----------+------------+-----------+
    | group1|subgroup1|2023-01-01| 10:00 | 100| 1| 0| 1| 0|
    | group1|subgroup1|2023-01-01| 10:30 | 200| 0| 0| 0| 1|
    | group1|subgroup2|2023-01-01| 11:00 | 300| 0| 1| 1| 1|
    | group2|subgroup1|2023-01-01| 09:00 | 150| 1| 0| 1| 0|
    | group2|subgroup1|2023-01-01| 09:30 | 250| 0| 1| 0| 1|
    | group2|subgroup2|2023-01-02| 08:00 | 130| 0| 0| 1| 0|
    | group2|subgroup2|2023-01-02| 08:30 | 170| 0| 1| 0| 1|
    +-------+---------+----------+-------+------+------------+-----------+------------+-----------+

    Examples from Official Spark SQL Doc:-
    CREATE TABLE employees (name STRING, dept STRING, salary INT, age INT);

    INSERT INTO employees VALUES ("Lisa", "Sales", 10000, 35);
    INSERT INTO employees VALUES ("Evan", "Sales", 32000, 38);
    INSERT INTO employees VALUES ("Fred", "Engineering", 21000, 28);
    INSERT INTO employees VALUES ("Alex", "Sales", 30000, 33);
    INSERT INTO employees VALUES ("Tom", "Engineering", 23000, 33);
    INSERT INTO employees VALUES ("Jane", "Marketing", 29000, 28);
    INSERT INTO employees VALUES ("Jeff", "Marketing", 35000, 38);
    INSERT INTO employees VALUES ("Paul", "Engineering", 29000, 23);
    INSERT INTO employees VALUES ("Chloe", "Engineering", 23000, 25);

    SELECT * FROM employees;
    +-----+-----------+------+-----+
    | name| dept|salary| age|
    +-----+-----------+------+-----+
    |Chloe|Engineering| 23000| 25|
    | Fred|Engineering| 21000| 28|
    | Paul|Engineering| 29000| 23|
    |Helen| Marketing| 29000| 40|
    | Tom|Engineering| 23000| 33|
    | Jane| Marketing| 29000| 28|
    | Jeff| Marketing| 35000| 38|
    | Evan| Sales| 32000| 38|
    | Lisa| Sales| 10000| 35|
    | Alex| Sales| 30000| 33|
    +-----+-----------+------+-----+

    SELECT name, dept, salary, RANK() OVER (PARTITION BY dept ORDER BY salary) AS rank FROM employees;
    +-----+-----------+------+----+
    | name| dept|salary|rank|
    +-----+-----------+------+----+
    | Lisa| Sales| 10000| 1|
    | Alex| Sales| 30000| 2|
    | Evan| Sales| 32000| 3|
    | Fred|Engineering| 21000| 1|
    | Tom|Engineering| 23000| 2|
    |Chloe|Engineering| 23000| 2|
    | Paul|Engineering| 29000| 4|
    |Helen| Marketing| 29000| 1|
    | Jane| Marketing| 29000| 1|
    | Jeff| Marketing| 35000| 3|
    +-----+-----------+------+----+

    SELECT name, dept, salary, DENSE_RANK() OVER (PARTITION BY dept ORDER BY salary ROWS BETWEEN
    UNBOUNDED PRECEDING AND CURRENT ROW) AS dense_rank FROM employees;
    +-----+-----------+------+----------+
    | name| dept|salary|dense_rank|
    +-----+-----------+------+----------+
    | Lisa| Sales| 10000| 1|
    | Alex| Sales| 30000| 2|
    | Evan| Sales| 32000| 3|
    | Fred|Engineering| 21000| 1|
    | Tom|Engineering| 23000| 2|
    |Chloe|Engineering| 23000| 2|
    | Paul|Engineering| 29000| 3|
    |Helen| Marketing| 29000| 1|
    | Jane| Marketing| 29000| 1|
    | Jeff| Marketing| 35000| 2|
    +-----+-----------+------+----------+

    SELECT name, dept, age, CUME_DIST() OVER (PARTITION BY dept ORDER BY age
    RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cume_dist FROM employees;
    +-----+-----------+------+------------------+
    | name| dept|age | cume_dist|
    +-----+-----------+------+------------------+
    | Alex| Sales| 33|0.3333333333333333|
    | Lisa| Sales| 35|0.6666666666666666|
    | Evan| Sales| 38| 1.0|
    | Paul|Engineering| 23| 0.25|
    |Chloe|Engineering| 25| 0.75|
    | Fred|Engineering| 28| 0.25|
    | Tom|Engineering| 33| 1.0|
    | Jane| Marketing| 28|0.3333333333333333|
    | Jeff| Marketing| 38|0.6666666666666666|
    |Helen| Marketing| 40| 1.0|
    +-----+-----------+------+------------------+

    SELECT name, dept, salary, MIN(salary) OVER (PARTITION BY dept ORDER BY salary) AS min
    FROM employees;
    +-----+-----------+------+-----+
    | name| dept|salary| min|
    +-----+-----------+------+-----+
    | Lisa| Sales| 10000|10000|
    | Alex| Sales| 30000|10000|
    | Evan| Sales| 32000|10000|
    |Helen| Marketing| 29000|29000|
    | Jane| Marketing| 29000|29000|
    | Jeff| Marketing| 35000|29000|
    | Fred|Engineering| 21000|21000|
    | Tom|Engineering| 23000|21000|
    |Chloe|Engineering| 23000|21000|
    | Paul|Engineering| 29000|21000|
    +-----+-----------+------+-----+

    6.Another Scenario in SAS:- How to translate a SAS scenario using PROC SORT with BY (ordering based on multiple keys) and NODUPKEY (removing duplicate observations) into Spark SQL:

    SAS Scenario:

    Imagine you have a dataset named sales_data containing columns like customer_id, product_id, date, and sales_amount. You want to:

    1. Sort the data by customer_id, product_id, and date (in ascending order).
    2. Remove any duplicate observations based on the combination of customer_id, product_id, and date.

    SAS Code:

    proc sort data=sales_data;
      by customer_id product_id date;
      nodupkey;
    run;
    

    Spark SQL Translation:

    There are two main approaches to achieve this functionality in Spark SQL:

    Approach 1: Using DISTINCT and ORDER BY:

    SELECT DISTINCT customer_id, product_id, date, sales_amount
    FROM sales_data
    ORDER BY customer_id, product_id, date;
    

    Explanation:

    • DISTINCT: This clause eliminates duplicate rows based on all columns in the SELECT clause (in this case, customer_id, product_id, and date).
    • ORDER BY: This clause sorts the resulting unique data based on the specified order (ascending for all three columns).

    Approach 2: Using ROW_NUMBER() and WHERE:

    SELECT customer_id, product_id, date, sales_amount
    FROM (
      SELECT customer_id, product_id, date, sales_amount,
             row_number() OVER (PARTITION BY customer_id, product_id, date ORDER BY date) AS row_num
      FROM sales_data
    ) AS ranked_data
    WHERE row_num = 1;
    

    Explanation:

    1. Window Function (row_number):
      • A subquery assigns a unique row number (row_num) within each group defined by customer_id, product_id, and date.
      • The ORDER BY date ensures rows are ordered by the date for each customer/product combination.
    2. Filtering Unique Rows:
      • The outer query filters the ranked data to keep only rows where row_num is 1. This selects the first occurrence for each unique combination of customer_id, product_id, and date.

    Choosing the Right Approach:

    • If your primary goal is sorting and eliminating duplicates efficiently, DISTINCT with ORDER BY might be a simpler solution.
    • If you need the row number information for further processing, or if the data volume is large and DISTINCT performance becomes an issue, the window function approach (ROW_NUMBER()) might be preferable.

    Additional Considerations:

    • Both approaches achieve the desired outcome of removing duplicate observations based on the specified key combination and sorting the remaining data.
    • You can adjust the ORDER BY clause to define a different sorting order if needed.

    By using either of these approaches in Spark SQL, you can effectively translate the functionality of SAS PROC SORT with BY and NODUPKEY for handling multiple sorting keys and deduplication within your Spark DataFrame.

    7.SAS TO SQL

    data xyz;set abc;
    by xy;
    retain x1 y1;
    length x1 $400 y1 $100;
    if first.xy then do;
    x1=' ';
    y1=' ';
    end;
    x1=catx('',trim(x1),x2); y1=catx("",trim(y1),y2);
    if last.xy then output;
    run;

    provided SAS code is performing a group-by operation on xy, retaining values across rows within each group, concatenating those values, and outputting the final concatenated results for each group.

    # Define and execute SQL query
    sql_query = """
    WITH concatenated AS (
        SELECT
            xy,
            COLLECT_LIST(x2) OVER (PARTITION BY xy) AS x2_list,
            COLLECT_LIST(y2) OVER (PARTITION BY xy) AS y2_list,
            ROW_NUMBER() OVER (PARTITION BY xy ORDER BY xy) AS row_num,
            COUNT(*) OVER (PARTITION BY xy) AS count_xy
        FROM abc
    )
    SELECT
        xy,
        CONCAT_WS('_', x2_list) AS x1,
        CONCAT_WS('_', y2_list) AS y1
    FROM concatenated
    WHERE row_num = count_xy
    """
    
    # Execute the SQL query
    result = spark.sql(sql_query)
    
    # Show the results
    result.show()
    

  • PySpark Architecture Cheat Sheet

    1. Core Components of PySpark

    ComponentDescriptionKey Features
    Spark CoreThe foundational Spark component for scheduling, memory management, and fault tolerance.Task scheduling, data partitioning, RDD APIs.
    Spark SQLEnables interaction with structured data via SQL, DataFrames, and Datasets.Supports SQL queries, schema inference, integration with Hive.
    Spark StreamingAllows real-time data processing through micro-batching.DStreams, integration with Kafka, Flume, etc.
    Spark MLlibProvides scalable machine learning algorithms.Algorithms for classification, clustering, regression, etc.
    Spark GraphXSupports graph processing and analysis for complex networks.Graph algorithms, graph-parallel computation.

    2. PySpark Layered Architecture

    LayerDescriptionKey Functions
    Application LayerContains user applications and custom PySpark code.Custom data workflows and business logic.
    Spark API LayerProvides PySpark APIs to interact with Spark Core and other components.High-level abstractions for data manipulation, SQL, streaming.
    Spark Core LayerProvides core functionalities including task scheduling and fault tolerance.Data locality, memory management, RDD transformations.
    Execution LayerManages the execution of Spark tasks across the cluster.Task scheduling, load balancing, error handling.
    Storage LayerManages data storage with distributed storage solutions.Supports HDFS, S3, Cassandra, and other storage integrations.

    3. Spark Cluster Architecture

    ComponentDescriptionKey Role
    Driver NodeRuns the Spark application, creates Spark Context, and coordinates tasks execution.Manages jobs, scheduling, and resource allocation.
    Executor NodesRun tasks assigned by the driver node and process data on each partition.Execute tasks, store intermediate results, and return output.
    Cluster ManagerManages the resources of the Spark cluster.Allocates resources, manages executor lifecycle.
    Distributed File SystemStores data across the cluster, ensuring high availability.HDFS, S3, or other compatible storage for data sharing.

    4. Spark Execution Process

    StepDescriptionKey Points
    1. Application SubmissionUser submits a Spark application via command line or a UI.Initializes job creation in Spark.
    2. Job CreationSpark creates jobs and splits them into stages and tasks based on transformations.Directed Acyclic Graph (DAG) creation based on data flow.
    3. Task AssignmentDriver assigns tasks to executors based on data locality and resource availability.Utilizes data partitioning for parallelism.
    4. Task ExecutionExecutors run assigned tasks on data partitions.Processes data in parallel across the cluster.
    5. Result CollectionDriver collects results from all executors, aggregates, and returns the final output.Outputs final results to the user or designated storage.

    5. Spark RDD Architecture

    ComponentDescriptionKey Characteristics
    RDD (Resilient Distributed Dataset)Immutable, distributed collection of objects across the cluster.Fault-tolerant, lineage-based recovery, in-memory processing.
    PartitionSubset of data within an RDD stored on a single node.Enables parallel processing of data.
    TaskSmallest unit of work that operates on a single partition.Executes transformations or actions on data.

    6. Spark DataFrame Architecture

    ComponentDescriptionKey Characteristics
    DataFrameDistributed collection of data organized into named columns.Schema-based data handling, optimized storage, SQL compatibility.
    DatasetStrongly-typed distributed collection of data (Java/Scala).Type safety, combines features of RDDs and DataFrames.
    EncoderConverts data between JVM objects and Spark’s internal format for optimized serialization.Efficient serialization/deserialization for faster processing.

    7. Spark SQL Architecture

    ComponentDescriptionKey Functions
    Catalyst OptimizerOptimizes Spark SQL queries for enhanced performance.Logical plan, physical plan optimization.
    Query PlannerPlans the execution of SQL queries by selecting the best execution strategy.Converts optimized logical plan into physical execution plan.
    Execution EngineExecutes SQL queries using Spark’s distributed computing framework.Leverages cluster resources for parallel query execution.

    8. Spark Streaming Architecture

    ComponentDescriptionKey Features
    DStream (Discretized Stream)Continuous data stream split into micro-batches for processing.Batch processing in near real-time.
    ReceiverIngests data from external sources like Kafka, Flume, etc.Acts as data source for streaming jobs.
    ProcessorProcesses data within DStream by applying transformations and actions.Provides transformations similar to RDDs.

    9. Spark Master-Slave Architecture

    ComponentDescriptionKey Role
    Master NodeCoordinates resource allocation, task scheduling, and overall job management.Central controller for Spark cluster.
    Worker NodesExecute tasks on assigned data partitions as directed by the Master.Run computations, store data, and handle intermediate results.
    ExecutorProcess-running unit on each worker node, responsible for executing tasks.Runs task code, caches data, and sends results to the driver.
    TaskSmallest unit of work on data partitions assigned to executors by the driver.Executes transformations or actions on data partitions.
    Driver ProgramInitiates the Spark application and coordinates overall job execution.Submits tasks to the master and receives results.
    Cluster ManagerManages resources for the entire Spark cluster (YARN, Mesos, Kubernetes).Manages the lifecycle of executors and resource distribution.

    10. Key Concepts in PySpark

    ConceptDescriptionBenefits
    Lazy EvaluationTransformations are not executed until an action is called.Optimizes query execution by grouping operations.
    Fault ToleranceSpark recovers lost RDDs using lineage information when nodes fail.Increases reliability in distributed environments.
    In-Memory ProcessingStores intermediate data in memory instead of writing to disk.Enables faster data processing by avoiding I/O overhead.

    11. Common Use Cases for PySpark

    • Batch Processing: Large-scale ETL (Extract, Transform, Load) jobs.
    • Stream Processing: Real-time analytics, monitoring systems.
    • Machine Learning: Training models at scale using Spark MLlib.
    • Graph Processing: Social network analysis, recommendation systems.
    • Data Warehousing: Leveraging Spark SQL for querying structured datasets.

    More Parts of The Post:-

    Pages: 1 2 3 4

  • Yup! Scientists find a ‘Unique’ Black Hole that is hungier than ever in the Universe! Scientists have observed a fascinating phenomenon involving a supermassive black hole, AT2022dsb, which appears to be devouring a star in a “tidal disruption event” (TDE).

    When a star ventures too close to a black hole, the intense gravitational forces stretch it out in a process known as “spaghettification.”

    As the star’s material spirals toward the black hole, it forms a glowing, donut-shaped torus of hot gas around it, resembling a cosmic feeding frenzy.

    The Discovery and Unique Nature of This Black Hole

    Black holes have long been known as the universe’s “hungry giants,” entities with such immense gravitational pull that not even light can escape them. This recently observed black hole, however, stands out because of the vast amounts of material it’s actively pulling in, including entire stars that stray too close. When a black hole consumes a star, it distorts and stretches it, a process astronomers poetically refer to as “spaghettification.”

    The massive gravitational forces rip apart the star, and its matter spirals toward the event horizon, creating bright, swirling bands of light and energy. This radiant display, known as an accretion disk, reveals the black hole’s “feasting” process and provides scientists with clues about its behavior and growth.

    Observing the Accretion Disk

    The image above shows a vivid portrayal of a black hole in action, illustrating the intense gravitational forces pulling in stellar material. The accretion disk forms as material spirals inward, reaching incredible temperatures and emitting high levels of X-ray radiation that astronomers can observe from Earth.

    Cosmic Impact and Significance

    The discovery of this “voracious” black hole offers astronomers a unique chance to study black hole behavior more closely, including how they grow and influence their surroundings. By examining the material in the accretion disk, scientists hope to understand more about how black holes evolve over time and how they might impact the galaxies that host them. This discovery reinforces the idea that black holes are not just passive entities; they are dynamic and play a crucial role in the cosmic ecosystem by regulating star formation and influencing galactic structures.

    Conclusion

    As scientists continue to observe and learn from this remarkable cosmic phenomenon, we gain more insight into one of the most mysterious and powerful forces in the universe. This voracious black hole serves as a powerful reminder of the awe-inspiring and sometimes destructive beauty of our cosmos.

  • Here’s an enhanced Spark SQL cheatsheet with additional details, covering join types, union types, and set operations like EXCEPT and INTERSECT, along with options for table management (DDL operations like UPDATEINSERTDELETE, etc.). This comprehensive sheet is designed to help with quick Spark SQL reference.


    CategoryConceptSyntax / ExampleDescription
    Basic StatementsSELECTSELECT col1, col2 FROM table WHERE condition;Retrieves specific columns from a table based on a condition.
    DISTINCTSELECT DISTINCT col FROM table;Returns unique values in the specified column.
    LIMITSELECT * FROM table LIMIT 10;Restricts the number of rows returned by a query.
    JoinsINNER JOINSELECT * FROM t1 JOIN t2 ON t1.id = t2.id;Returns rows that have matching values in both tables.
    LEFT JOINSELECT * FROM t1 LEFT JOIN t2 ON t1.id = t2.id;Returns all rows from the left table and matched rows from the right table; unmatched rows get NULL in columns from the right.
    RIGHT JOINSELECT * FROM t1 RIGHT JOIN t2 ON t1.id = t2.id;Returns all rows from the right table and matched rows from the left table; unmatched rows get NULL in columns from the left.
    FULL OUTER JOINSELECT * FROM t1 FULL JOIN t2 ON t1.id = t2.id;Returns rows when there is a match in either left or right table, including unmatched rows.
    CROSS JOINSELECT * FROM t1 CROSS JOIN t2;Returns the Cartesian product of the two tables.
    Set OperationsUNIONSELECT * FROM t1 UNION SELECT * FROM t2;Combines result sets from multiple queries, removing duplicates by default.
    UNION ALLSELECT * FROM t1 UNION ALL SELECT * FROM t2;Combines result sets from multiple queries without removing duplicates.
    INTERSECTSELECT * FROM t1 INTERSECT SELECT * FROM t2;Returns only the rows present in both queries.
    EXCEPTSELECT * FROM t1 EXCEPT SELECT * FROM t2;Returns rows present in the first query but not in the second query.
    EXCEPT ALLSELECT * FROM t1 EXCEPT ALL SELECT * FROM t2;Returns all rows in the first query that aren’t in the second, including duplicates.
    Table ManagementCREATE TABLECREATE TABLE table_name (id INT, name STRING);Creates a new table with specified columns and data types.
    DESCRIBEDESCRIBE TABLE table_name;Shows the structure and metadata of a table.
    ALTER TABLEALTER TABLE table_name ADD COLUMNS (age INT);Adds columns or modifies a table’s structure.
    DROP TABLEDROP TABLE IF EXISTS table_name;Deletes a table if it exists.
    TRUNCATE TABLETRUNCATE TABLE table_name;Removes all rows from a table without deleting the table structure.
    INSERT INTOINSERT INTO table_name VALUES (1, 'name');Adds new rows to a table.
    INSERT OVERWRITEINSERT OVERWRITE table_name SELECT * FROM other_table;Replaces existing data in a table with the results of a query.
    UPDATEUPDATE table_name SET col = 'value' WHERE condition;Updates specific columns based on a condition (SQL-style syntax may vary by environment).
    DELETEDELETE FROM table_name WHERE condition;Deletes specific rows based on a condition (available in Delta tables, SQL-style syntax).
    Window Functionsrow_number()ROW_NUMBER() OVER (PARTITION BY col ORDER BY col2 DESC)Assigns a unique number to each row within a partition.
    rank()RANK() OVER (PARTITION BY col ORDER BY col2 DESC)Assigns a rank to rows within a partition based on specified column(s).
    lead()lag()LEAD(col) OVER (ORDER BY col2)Accesses data from the following or preceding row.
    Data Manipulation FunctionswithColumn()df.withColumn("newCol", df.oldCol + 1)Adds or replaces a column with the specified expression.
    withColumnRenamed()df.withColumnRenamed("oldName", "newName")Renames a column.
    selectExpr()df.selectExpr("col AS newCol", "col2 + 1")Selects columns or expressions using SQL syntax.
    String Functionsconcat()SELECT concat(col1, col2) FROM table;Concatenates strings from multiple columns.
    substring()SELECT substring(col, 1, 5) FROM table;Extracts a substring from a string column.
    lower() / upper()SELECT lower(col) FROM table;Converts all characters in a string to lowercase or uppercase.
    Date and Time Functionscurrent_date()SELECT current_date();Returns the current date.
    datediff()SELECT datediff(end_date, start_date) FROM table;Returns the difference in days between two dates.
    year()month()day()SELECT year(col) FROM table;Extracts parts of a date.
    Aggregate Functionscollect_list()SELECT collect_list(col) FROM table;Aggregates values into a list for each group.
    collect_set()SELECT collect_set(col) FROM table;Aggregates values into a unique set for each group.
    avg()sum()count()SELECT sum(col), count(col) FROM table GROUP BY group_col;Performs aggregation functions like averaging, summing, or counting.
    Optimization Techniquescache()df.cache()Caches the DataFrame in memory to optimize performance on repeated actions.
    repartition()df.repartition(4, "col")Redistributes data across partitions for load balancing.
    broadcast()broadcast(df)Optimizes joins by broadcasting smaller DataFrames to all nodes.
    Predicate Pushdownspark.sql("SELECT * FROM table WHERE col = 'value'")Pushes filters down to the data source, reducing data scanned.
    UDFsRegister UDFspark.udf.register("addOne", lambda x: x + 1)Registers a custom Python function as a UDF.
    Using a UDFSELECT addOne(col) FROM table;Applies a UDF to a column in Spark SQL.
    Schema ManagementprintSchema()df.printSchema()Displays the schema of a DataFrame.
    schemadf.schemaReturns the schema as a StructType object.
    Schema Mergespark.read.option("mergeSchema", "true")Merges schemas when reading from multiple files.
    Complex TypesArraysARRAY<int>Defines an array type, e.g., ARRAY<int>.
    StructSTRUCT<name: STRING, age: INT>Defines a nested structure.
    Miscellaneousmonotonically_increasing_id()SELECT monotonically_increasing_id() AS id FROM table;Generates unique IDs for rows.
    input_file_name()SELECT input_file_name() FROM table;Retrieves the file name associated with each row.
    coalesce()SELECT coalesce(col1, col2) FROM table;Returns the first non-null value from the specified columns.
    ConceptDescriptionSyntax/Example
    Basic SelectRetrieves data from a table.SELECT column1, column2 FROM table;
    WHERE ClauseFilters records based on conditions.SELECT * FROM table WHERE condition;
    AggregationsSummarizes data (e.g., SUM, COUNT, AVG).SELECT SUM(column) FROM table GROUP BY column2;
    Window FunctionsPerforms calculations across rows, like cumulative sums, rank, or row numbers.SELECT column, SUM(value) OVER (PARTITION BY column ORDER BY date) AS running_total FROM table;
    JoinsCombines rows from two or more tables based on a related column.SELECT * FROM table1 JOIN table2 ON table1.id = table2.id;
    SubqueriesNested queries for complex operations or transformations.SELECT * FROM (SELECT column1 FROM table WHERE condition);
    CTE (WITH Clause)Temporary result set for improved readability and reuse in complex queries.WITH temp AS (SELECT column FROM table) SELECT * FROM temp WHERE condition;
    UNION/UNION ALLCombines results from multiple SELECT statements.SELECT column FROM table1 UNION SELECT column FROM table2;
    PivotConverts rows into columns for specified values (aggregate with columns).SELECT * FROM (SELECT * FROM table) PIVOT (SUM(value) FOR column IN (‘value1’, ‘value2’));
    UnpivotConverts columns into rows, useful for restructuring wide tables.SELECT * FROM table UNPIVOT (value FOR name IN (col1, col2, col3));
    ViewsVirtual table based on a SELECT query, allows simplified access.CREATE VIEW view_name AS SELECT column FROM table;
    Temporary TablesTemporary storage for session-specific tables, deleted when the session ends.CREATE TEMPORARY TABLE temp_table AS SELECT * FROM table;
    Group By with HAVINGGroups data by specified columns and applies conditions to aggregated data.SELECT column, COUNT(*) FROM table GROUP BY column HAVING COUNT(*) > value;
    Case StatementsConditional logic within SQL queries for creating calculated columns.SELECT column, CASE WHEN condition THEN ‘result1’ ELSE ‘result2’ END AS new_column FROM table;
    Window FrameSpecifies the range for window functions, often cumulative or sliding (rows between clauses).SUM(column) OVER (ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);

    Spark SQL Functions with Complex Use Cases

    FunctionExample Use Case
    collect_list()Aggregate column values into a list for each group, useful in tracking customer purchase history: SELECT customer_id, collect_list(product) FROM purchases GROUP BY customer_id;
    concat()Concatenate multiple address fields into one formatted address: SELECT concat(street, ', ', city, ', ', zip) AS address FROM addresses;
    row_number()Number rows within each group, useful for ranking: SELECT *, row_number() OVER (PARTITION BY category ORDER BY sales DESC) AS rank FROM sales_data;
    date_add()Calculate future dates, such as a payment due date: SELECT order_id, date_add(order_date, 30) AS due_date FROM orders;
    when() and coalesce()Assign risk categories while handling nulls: SELECT customer_id, when(age > 60, 'high').when(age > 30, 'medium').otherwise('low') AS risk, coalesce(income, 0) AS income FROM customers;
    array_contains()Filter for specific tags in an array column: SELECT * FROM posts WHERE array_contains(tags, 'pyspark');
    explode()Expand array items into individual rows: SELECT order_id, explode(items) AS item FROM orders;

    Conditional aggregation in Spark SQL:

    Here’s an example of conditional aggregation in Spark SQL:

    SELECT
      SUM(CASE WHEN age > 30 THEN 1 ELSE 0 END) AS count_over_30,
      SUM(CASE WHEN age <= 30 THEN 1 ELSE 0 END) AS count_under_30
    FROM
      customers;

    In this example, we’re using a CASE statement to conditionally sum the values. If the age is greater than 30, we sum 1, otherwise we sum 0.

    Using IF Function

    Alternatively, you can use the IF function:

    SELECT
      SUM(IF(age > 30, 1, 0)) AS count_over_30,
      SUM(IF(age <= 30, 1, 0)) AS count_under_30
    FROM
      customers;

    Spark SQL commands to manage views

    Here are the Spark SQL commands to manage views:

    Creating Views

    1. Creating Virtual Views

    CREATE VIEW my_view AS SELECT * FROM my_table;

    2. Creating Temporary Views

    CREATE TEMPORARY VIEW my_temp_view AS SELECT * FROM my_table;

    3. Creating or Replacing Temporary Views

    CREATE OR REPLACE TEMPORARY VIEW my_temp_view AS SELECT * FROM my_table;

    Deleting Views

    1. Deleting Temporary Views if Exists

    DROP VIEW IF EXISTS my_temp_view;

    Checking Views

    1. Checking Views DDL

    DESCRIBE FORMATTED my_view;

    2. Checking Extended Query

    EXPLAIN EXTENDED SELECT * FROM my_view;

    3. Checking in Spark Catalog

    SHOW TABLES IN my_database;  // lists all tables and views
    SHOW VIEWS IN my_database;    // lists only views

    Note:

    • CREATE VIEW creates a virtual view, which is a stored query that doesn’t store data.
    • CREATE TEMPORARY VIEW creates a temporary view that is only available in the current Spark session.
    • CREATE OR REPLACE TEMPORARY VIEW creates or replaces a temporary view.
    • DROP VIEW IF EXISTS deletes a view if it exists.
    • DESCRIBE FORMATTED shows the DDL of a view.
    • EXPLAIN EXTENDED shows the extended query plan of a view.
    • SHOW TABLES and SHOW VIEWS list tables and views in the Spark catalog.

    Here are the different types of joins in Spark SQL, along with examples:

    1. Inner Join

    Returns only the rows that have a match in both tables.

    SELECT *
    FROM table1
    INNER JOIN table2
    ON table1.id = table2.id;

    2. Left Outer Join (or Left Join)

    Returns all the rows from the left table and the matching rows from the right table. If there’s no match, the result will contain NULL values.

    SELECT *
    FROM table1
    LEFT JOIN table2
    ON table1.id = table2.id;

    3. Right Outer Join (or Right Join)

    Similar to the left outer join, but returns all the rows from the right table and the matching rows from the left table.

    SELECT *
    FROM table1
    RIGHT JOIN table2
    ON table1.id = table2.id;

    4. Full Outer Join (or Full Join)

    Returns all the rows from both tables, with NULL values in the columns where there are no matches.

    SELECT *
    FROM table1
    FULL OUTER JOIN table2
    ON table1.id = table2.id;

    5. Semi Join

    Returns only the rows from the left table that have a match in the right table.

    SELECT *
    FROM table1
    JOIN table2
    ON table1.id = table2.id;

    6. Anti Join

    Returns only the rows from the left table that do not have a match in the right table.

    SELECT *
    FROM table1
    LEFT ANTI JOIN table2
    ON table1.id = table2.id;

    7. Cross Join

    Returns the Cartesian product of both tables.

    SELECT *
    FROM table1
    CROSS JOIN table2;

    Note: Spark SQL also supports using the USING clause to specify the join condition, like this:

    SELECT *
    FROM table1
    JOIN table2
    USING (id);
  • Here’s a categorized Spark SQL function reference, which organizes common Spark SQL functions by functionality. This can help with selecting the right function based on the operation you want to perform.

    1. Aggregate Functions

    FunctionDescriptionExample
    avg()Calculates the average value.SELECT avg(age) FROM table;
    count()Counts the number of rows.SELECT count(*) FROM table;
    max()Finds the maximum value.SELECT max(salary) FROM table;
    min()Finds the minimum value.SELECT min(age) FROM table;
    sum()Calculates the sum of a column.SELECT sum(salary) FROM table;
    stddev()Calculates the standard deviation.SELECT stddev(salary) FROM table;
    variance()Calculates the variance.SELECT variance(salary) FROM table;

    2. Analytic Functions

    FunctionDescriptionExample
    row_number()Assigns a unique number to each row in a window.ROW_NUMBER() OVER (PARTITION BY city)
    rank()Assigns a rank to each row in a partition.RANK() OVER (ORDER BY salary DESC)
    dense_rank()Similar to rank but without gaps.DENSE_RANK() OVER (ORDER BY age ASC)
    ntile(n)Divides rows into n buckets.NTILE(4) OVER (ORDER BY age)
    lead()Accesses a row after the current row.LEAD(salary, 1) OVER (ORDER BY age)
    lag()Accesses a row before the current row.LAG(salary, 1) OVER (ORDER BY age)

    3. String Functions

    FunctionDescriptionExample
    concat()Concatenates multiple strings.SELECT concat(first_name, last_name) FROM table;
    substring()Extracts a substring from a string.SELECT substring(name, 1, 3) FROM table;
    length()Returns the length of a string.SELECT length(name) FROM table;
    lower()Converts string to lowercase.SELECT lower(name) FROM table;
    upper()Converts string to uppercase.SELECT upper(name) FROM table;
    trim()Trims spaces from both ends of a string.SELECT trim(name) FROM table;
    replace()Replaces a substring within a string.SELECT replace(name, 'a', 'b') FROM table;
    split()Splits a string into an array.SELECT split(email, '@') FROM table;

    4. Date and Time Functions

    FunctionDescriptionExample
    current_date()Returns the current date.SELECT current_date();
    current_timestamp()Returns the current timestamp.SELECT current_timestamp();
    datediff()Returns difference in days between two dates.SELECT datediff(date1, date2) FROM table;
    year(), month(), day()Extracts year, month, day from date.SELECT year(birthdate) FROM table;
    date_add()Adds days to a date.SELECT date_add(date, 10) FROM table;
    date_sub()Subtracts days from a date.SELECT date_sub(date, 10) FROM table;
    to_date()Converts string to date.SELECT to_date(string_date) FROM table;
    to_timestamp()Converts string to timestamp.SELECT to_timestamp(string_timestamp) FROM table;

    5. Mathematical Functions

    FunctionDescriptionExample
    abs()Returns absolute value.SELECT abs(-10) FROM table;
    ceil()Rounds up to the nearest integer.SELECT ceil(salary) FROM table;
    floor()Rounds down to the nearest integer.SELECT floor(salary) FROM table;
    round()Rounds to a specified number of decimal places.SELECT round(salary, 2) FROM table;
    sqrt()Returns the square root.SELECT sqrt(age) FROM table;
    pow()Returns a number raised to a power.SELECT pow(salary, 2) FROM table;
    exp()Returns e^x (exponential).SELECT exp(age) FROM table;
    log()Returns the logarithm of a number.SELECT log(salary) FROM table;

    6. Array Functions

    FunctionDescriptionExample
    array()Creates an array from multiple values.SELECT array('a', 'b', 'c');
    size()Returns the number of elements in an array.SELECT size(array_column) FROM table;
    array_contains()Checks if an array contains a specified value.SELECT array_contains(array_column, 'value') FROM table;
    explode()Creates a new row for each element in the array.SELECT explode(array_column) FROM table;
    sort_array()Sorts the elements of an array in ascending order.SELECT sort_array(array_column) FROM table;
    array_distinct()Removes duplicate values from an array.SELECT array_distinct(array_column) FROM table;

    7. Conditional Functions

    FunctionDescriptionExample
    when()Conditional expression that works like an if-else.SELECT when(age > 18, 'adult').otherwise('minor') FROM table;
    coalesce()Returns the first non-null value.SELECT coalesce(col1, col2) FROM table;
    ifnull()Returns the second value if the first is null.SELECT ifnull(col1, 'unknown') FROM table;
    nullif()Returns NULL if the two values are equal.SELECT nullif(col1, col2) FROM table;
    nvl()Replaces NULL with a specified value.SELECT nvl(col, 'default') FROM table;

    8. Miscellaneous Functions

    FunctionDescriptionExample
    lit()Converts a literal value to a column.SELECT lit(1) FROM table;
    rand()Generates a random number between 0 and 1.SELECT rand() FROM table;
    monotonically_increasing_id()Returns a unique ID for each row.SELECT monotonically_increasing_id() FROM table;
    input_file_name()Returns the file name of the source data.SELECT input_file_name() FROM table;

    This categorized list provides a quick reference for Spark SQL functions based on what kind of operation they perform, making it useful for development and troubleshooting in Spark SQL queries.

    The collect_list() function is categorized under Aggregate Functions in Spark SQL. It aggregates data by collecting values into a list within each group, without removing duplicates. Here’s a quick example and then an exploration of some interesting, complex use cases for various functions, including collect_list().


    1. Advanced Use Cases of Aggregate Functions

    collect_list()

    FunctionExampleDescription
    collect_list()SELECT id, collect_list(name) FROM table GROUP BY id;Collects all values of a column into a list for each group, preserving duplicates.

    Complex Use Case: If you have a dataset where each customer has multiple orders, and you want to get a list of order dates for each customer:

    SELECT customer_id, collect_list(order_date) AS order_dates
    FROM orders
    GROUP BY customer_id;
    

    This is useful in generating lists of values within groups, such as viewing the product purchase history of customers or tracking all updates to a particular row over time.


    2. String Functions with Complex Use Cases

    concat() and replace()

    FunctionExampleDescription
    concat()SELECT concat(city, ', ', state) AS location FROM table;Joins multiple columns or strings together.
    replace()SELECT replace(phone, '-', '') AS phone_no FROM table;Replaces parts of a string based on a pattern.

    Complex Use Case: Concatenating multiple address fields to form a single address and cleaning up data with replace():

    SELECT concat(street, ', ', city, ', ', replace(zip, '-', '')) AS full_address
    FROM addresses;
    

    This would be useful for standardizing or preparing addresses for mailing systems by merging fields and removing unnecessary characters.


    3. Analytic Functions with Interesting Examples

    row_number(), rank(), lead(), and lag()

    FunctionExampleDescription
    row_number()ROW_NUMBER() OVER (PARTITION BY city ORDER BY age DESC)Assigns a unique number to each row in a window.
    lead()LEAD(salary, 1) OVER (ORDER BY age)Retrieves the next row’s value in the current row’s column.
    lag()LAG(salary, 1) OVER (ORDER BY age)Retrieves the previous row’s value in the current row’s column.

    Complex Use Case: Track sales growth over time by calculating month-over-month difference in sales:

    SELECT month, sales,
           sales - LAG(sales, 1) OVER (ORDER BY month) AS sales_diff
    FROM sales_data;
    

    This lets you analyze trends or identify dips and peaks in performance by using the previous row’s data directly.


    4. Date and Time Functions for Advanced Operations

    date_add(), datediff(), year()

    FunctionExampleDescription
    date_add()SELECT date_add(order_date, 30) AS due_date FROM table;Adds a specific number of days to a date.
    datediff()SELECT datediff(due_date, order_date) AS days_to_ship FROM table;Calculates the difference in days between two dates.
    year(), month()SELECT year(birthdate) AS birth_year FROM table;Extracts parts of a date.

    Complex Use Case: Calculate the monthly retention rate by finding customers who ordered in consecutive months:

    SELECT customer_id,
           month(order_date) AS month,
           count(*) AS orders_this_month,
           LEAD(month(order_date), 1) OVER (PARTITION BY customer_id ORDER BY order_date) - month(order_date) = 1 AS retained
    FROM orders
    GROUP BY customer_id, month;
    

    This example is powerful for retention analysis, determining if customers return month after month.


    5. Array Functions for Multi-Value Column Manipulation

    array(), array_contains(), explode()

    FunctionExampleDescription
    array()SELECT array(name, email) AS contact FROM table;Combines multiple columns into an array.
    array_contains()SELECT array_contains(tags, 'Spark') AS has_spark FROM table;Checks if an array contains a value.
    explode()SELECT id, explode(items) AS item FROM orders;Expands array elements into individual rows.

    Complex Use Case: Splitting tags into individual rows for better indexing and searching:

    SELECT post_id, explode(tags) AS tag
    FROM blog_posts;
    

    This approach can help with filtering, analytics, or search functionalities in scenarios where each tag or attribute of an item needs to be analyzed individually.


    6. Conditional Functions for Complex Case Logic

    when(), coalesce(), ifnull()

    FunctionExampleDescription
    when()SELECT when(age > 18, 'adult').otherwise('minor') AS category FROM table;Implements conditional logic similar to if-else.
    coalesce()SELECT coalesce(col1, col2) AS result FROM table;Returns the first non-null value from a list of columns.
    ifnull()SELECT ifnull(name, 'unknown') FROM table;Replaces NULL values with a specified value.

    Complex Use Case: Assign customers to risk categories based on age and income, while handling null values with coalesce():

    SELECT customer_id,
           when(age >= 60, 'high risk')
           .when(age >= 30, 'medium risk')
           .otherwise('low risk') AS risk_category,
           coalesce(income, 0) AS income
    FROM customers;
    

    This is especially useful in customer segmentation, where you may need to categorize data based on multiple conditions and handle null values simultaneously.


    7. Miscellaneous Functions for Unique Use Cases

    monotonically_increasing_id(), input_file_name()

    FunctionExampleDescription
    monotonically_increasing_id()SELECT monotonically_increasing_id() AS id FROM table;Generates unique IDs for rows without requiring a primary key.
    input_file_name()SELECT input_file_name() FROM table;Returns the file name from which a row originated.

    Complex Use Case: Track records to their source file in an ETL process, useful in error tracking:

    SELECT *, input_file_name() AS source_file
    FROM data_files;
    

    This helps you trace errors back to their data origin, making it valuable in complex ETL jobs involving multiple data sources.


    These advanced use cases highlight the flexibility and power of Spark SQL functions when applied to complex data manipulation, making it easier to handle various analytical scenarios and transformations.

  • Comparative overview of partitionsbucketingsegmentation, and broadcasting in PySparkSpark SQL, and Hive QL in tabular form, along with examples

    Here’s a comparative overview of partitionsbucketingsegmentation, and broadcasting in PySparkSpark SQL, and Hive QL in tabular form, along with examples:

    ConceptPySparkSpark SQLHive QL
    Partitionsdf.repartition(numPartitions, "column") creates partitions based on specified column.CREATE TABLE table_name PARTITIONED BY (col1 STRING) allows data to be organized by partition.ALTER TABLE table_name ADD PARTITION (col1='value') adds a new partition to an existing table.
    Bucketingdf.write.bucketBy(numBuckets, "column").saveAsTable("table_name") for distributing data.CREATE TABLE table_name CLUSTERED BY (col1) INTO numBuckets BUCKETS for bucketing data in a table.CREATE TABLE table_name CLUSTERED BY (col1) INTO numBuckets BUCKETS for bucketing strategy.
    SegmentationSegmentation done by filtering DataFrames based on specific criteria, e.g., df.filter(df.col > 1).SELECT * FROM table WHERE col > value for segmenting data based on specific criteria.SELECT * FROM table WHERE col > value for segmentation in queries.
    Broadcastingspark.conf.set("spark.sql.autoBroadcastJoinThreshold", size) for broadcast joins.SELECT /*+ BROADCAST(t2) */ * FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key for broadcast hint.SET hive.auto.convert.join = true allows automatic broadcasting of smaller tables during joins.

    Examples:

    • Partitions: In PySpark, if you have a DataFrame df, you can repartition it as follows: df = df.repartition(4, "column_name")
    • Bucketing: When bucketing in PySpark: df.write.bucketBy(5, "column_name").saveAsTable("bucketed_table")
    • Segmentation: Segmenting DataFrames:pythonCopy codedf_segment = df.filter(df["column"] > 100)
    • Broadcasting: Broadcasting a DataFrame for optimization: from import SparkSession spark = SparkSession.builder.getOrCreate() spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10485760) # 10MB

    This table provides a concise yet comprehensive guide to partitioning, bucketing, segmentation, and broadcasting across different frameworks.

    We can use the NTILE window function to create buckets within a dataset in both PySpark and Spark SQL. The NTILE(n) function divides the result set into n buckets or groups, assigning a bucket number to each row. Here’s how you can apply it:

    PySpark Example

    from pyspark.sql import Window
    from pyspark.sql import functions as F
    
    windowSpec = Window.orderBy("column_name")
    df_with_buckets = df.withColumn("bucket", F.ntile(4).over(windowSpec))
    

    Spark SQL Example

    SELECT *, NTILE(4) OVER (ORDER BY column_name) AS bucket
    FROM table_name;
    

    This creates four buckets based on the ordering of column_name. Each row is assigned a bucket number from 1 to 4.

    Important concepts in the PySpark DataFrame API / Spark SQL

    Here’s the updated comprehensive overview of important concepts in the PySpark DataFrame API, now including options for showing DataFrame content, schema, and columns:

    CategoryFunctionDescriptionExample
    Data Readingspark.read.csv()Read a CSV file into a DataFrame.df = spark.read.csv("path/to/file.csv")
    spark.read.json()Read a JSON file into a DataFrame.df = spark.read.json("path/to/file.json")
    spark.read.parquet()Read a Parquet file into a DataFrame.df = spark.read.parquet("path/to/file.parquet")
    DataFrame CreationcreateDataFrame()Create a DataFrame from an RDD or a list.df = spark.createDataFrame(data, schema)
    from_records()Create a DataFrame from structured data (like a list of tuples).df = spark.createDataFrame(data, ["col1", "col2"])
    Transformationselect()Select specific columns from the DataFrame.df.select("column1", "column2")
    filter()Filter rows based on a condition.df.filter(df["column"] > 100)
    groupBy()Group rows by a specific column and perform aggregations.df.groupBy("column").count()
    withColumn()Add a new column or replace an existing column.df.withColumn("new_col", df["col1"] + 1)
    join()Join two DataFrames together.df1.join(df2, "common_col")
    union()Combine two DataFrames with the same schema.df1.union(df2)
    drop()Drop specified columns from the DataFrame.df.drop("column1")
    distinct()Return a new DataFrame with distinct rows.df.distinct()
    orderBy()Sort the DataFrame based on one or more columns.df.orderBy("column1", ascending=False)
    pivot()Pivot a DataFrame to reshape it.df.groupBy("column1").pivot("column2").agg(F.sum("value"))
    transpose()Transpose the DataFrame, flipping rows and columns.df.T (not directly available; use other methods to achieve)
    Window FunctionsWindow.partitionBy()Create a window specification for calculations over specified partitions.windowSpec = Window.partitionBy("column").orderBy("value")
    row_number()Assign a unique row number to rows within a partition.df.withColumn("row_num", F.row_number().over(windowSpec))
    UDFs (User Defined Functions)udf.register()Register a Python function as a UDF.spark.udf.register("my_udf", my_function)
    withColumn()Use a UDF to create a new column.df.withColumn("new_col", udf("my_udf")(df["col"]))
    String ManipulationF.concat()Concatenate multiple strings into one.df.withColumn("new_col", F.concat(df["col1"], df["col2"]))
    F.substring()Extract a substring from a string column.df.withColumn("sub_col", F.substring("col", 1, 3))
    F.lower()Convert a string column to lowercase.df.withColumn("lower_col", F.lower("col"))
    Date ManipulationF.current_date()Get the current date.df.withColumn("current_date", F.current_date())
    F.date_add()Add days to a date column.df.withColumn("new_date", F.date_add("date_col", 5))
    F.year()Extract the year from a date column.df.withColumn("year", F.year("date_col"))
    Schema and Data TypesStructType()Define the schema of a DataFrame.schema = StructType([...])
    DataTypeDefine data types for DataFrame columns.IntegerType(), StringType(), ...
    Actionsshow(n)Displays the first n rows of the DataFrame. Defaults to 20.df.show(5)
    show(truncate=True)Displays all columns but truncates long strings.df.show(truncate=False)
    printSchema()Prints the schema of the DataFrame, showing column names and types.df.printSchema()
    df.columnsReturns a list of column names in the DataFrame.columns = df.columns
    count()Count the number of rows in the DataFrame.row_count = df.count()
    collect()Retrieve all rows from the DataFrame as a list.data = df.collect()
    Optimizationpersist()Store DataFrame in memory/disk for re-use.df.persist()
    cache()Cache the DataFrame in memory.df.cache()
    repartition()Change the number of partitions of the DataFrame.df.repartition(4)
    coalesce()Reduce the number of partitions without a full shuffle.df.coalesce(2)

    This table now includes options for showing DataFrame content, schema, and columns along with various other functionalities in the PySpark DataFrame API.

    Here’s a cheat sheet for Spark SQL with common queries, functionalities, and examples:

    CategorySQL CommandDescriptionExample
    Data ReadingSELECT * FROM tableRetrieve all columns from a table.SELECT * FROM employees
    FROMSpecify the table from which to select data.SELECT name, age FROM employees
    Data FilteringWHEREFilter rows based on a condition.SELECT * FROM employees WHERE age > 30
    Data TransformationSELECT ... ASRename selected columns.SELECT name AS employee_name FROM employees
    JOINCombine rows from two or more tables based on a related column.SELECT * FROM employees JOIN departments ON employees.dep_id = departments.id
    GROUP BYGroup rows that have the same values in specified columns.SELECT department, COUNT(*) FROM employees GROUP BY department
    HAVINGFilter groups based on a condition after grouping.SELECT department, COUNT(*) FROM employees GROUP BY department HAVING COUNT(*) > 5
    AggregationsCOUNT()Count the number of rows.SELECT COUNT(*) FROM employees
    SUM()AVG()MIN()MAX()Perform summation, average, minimum, and maximum calculations.SELECT AVG(salary) FROM employees
    SortingORDER BYSort the result set by one or more columns.SELECT * FROM employees ORDER BY salary DESC
    Window FunctionsROW_NUMBER() OVERAssign a unique number to rows within a partition.SELECT name, ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS rank FROM employees
    Set OperationsUNIONCombine the results of two queries, removing duplicates.SELECT name FROM employees UNION SELECT name FROM contractors
    UNION ALLCombine results including duplicates.SELECT name FROM employees UNION ALL SELECT name FROM contractors
    Data ManipulationINSERT INTOInsert new records into a table.INSERT INTO employees VALUES ('John Doe', 28, 'Sales')
    UPDATEModify existing records in a table.UPDATE employees SET age = age + 1 WHERE name = 'John Doe'
    DELETERemove records from a table.DELETE FROM employees WHERE name = 'John Doe'
    Table ManagementCREATE TABLECreate a new table.CREATE TABLE new_table (id INT, name STRING)
    DROP TABLEDelete a table and its data.DROP TABLE new_table
    ALTER TABLEModify an existing table (e.g., add a column).ALTER TABLE employees ADD COLUMN hire_date DATE
    String FunctionsCONCAT()Concatenate two or more strings.SELECT CONCAT(first_name, ' ', last_name) AS full_name FROM employees
    SUBSTRING()Extract a substring from a string.SELECT SUBSTRING(name, 1, 3) FROM employees
    Date FunctionsCURRENT_DATERetrieve the current date.SELECT CURRENT_DATE
    DATEDIFF()Calculate the difference between two dates.SELECT DATEDIFF('2024-01-01', '2023-01-01') AS days_difference
    YEAR()MONTH()DAY()Extract year, month, or day from a date.SELECT YEAR(hire_date) FROM employees

    This cheat sheet covers various aspects of Spark SQL, including data reading, transformation, filtering, and aggregations, along with examples for clarity. You can adapt these queries to your specific dataset and requirements.

    comparison cheatcode for date manipulation in PySpark SQL vs Hive QL:

    FunctionPySpark SQL SyntaxHiveQL SyntaxDescription
    Current Datecurrent_date()current_date()Returns current date.
    Add Daysdate_add(date, days)date_add(date, days)Adds specified days to a date.
    Subtract Daysdate_sub(date, days)date_sub(date, days)Subtracts specified days from a date.
    Add Monthsadd_months(date, months)add_months(date, months)Adds specified months to a date.
    Truncate Datedate_trunc(format, date)trunc(date, 'format')Truncates date to specified unit.
    Date Differencedatediff(end_date, start_date)datediff(end_date, start_date)Returns days between dates.
    Year Extractionyear(date)year(date)Extracts the year from a date.
    Month Extractionmonth(date)month(date)Extracts the month from a date.
    Day Extractionday(date) or dayofmonth(date)day(date) or dayofmonth(date)Extracts the day from a date.
    Format Datedate_format(date, 'format')date_format(date, 'format')Formats date to specified pattern.
    Last Day of Monthlast_day(date)last_day(date)Returns last day of the month.
    Unix Timestampunix_timestamp(date, 'pattern')unix_timestamp(date, 'pattern')Converts date to Unix timestamp.
    From Unix Timestampfrom_unixtime(unix_time, 'format')from_unixtime(unix_time, 'format')Converts Unix timestamp to date.
    Next Daynext_day(date, 'day_of_week')next_day(date, 'day_of_week')Returns next specified day of the week.

    Example

    To add 5 days to the current date:

    • PySpark SQLSELECT date_add(current_date(), 5)
    • Hive QLSELECT date_add(current_date(), 5)

    Both PySpark SQL and HiveQL share similar functions, but PySpark SQL has better integration with Python datetime types and additional flexibility.

    PySpark SQL provides better integration with Python datetime types due to its compatibility with the pyspark.sql.functions library, which allows for seamless handling of Python datetime objects. This flexibility enables direct manipulation of dates and times, conversion between types, and usage of Python’s datetime features within PySpark’s DataFrames.

    Additionally, PySpark SQL allows using user-defined functions (UDFs) that interact directly with Python datetime libraries, enhancing its adaptability for complex time-based transformations compared to HiveQL’s fixed SQL-like functions.

    Here’s a table showing examples of how PySpark SQL integrates with Python datetime types via the pyspark.sql.functions library:

    FunctionDescriptionPySpark ExampleExplanation
    Current DateReturns today’s date.from pyspark.sql.functions import current_date
    df.select(current_date().alias("current_date"))
    Directly fetches the current date using current_date() function.
    Convert to DateTypeConverts a Python datetime to Spark DateType.from datetime import datetime
    from pyspark.sql.functions import lit, to_date
    df.select(to_date(lit(datetime.now())))
    Allows converting Python datetime objects directly to Spark DateType using to_date() function.
    Add DaysAdds days to a date.from pyspark.sql.functions import date_add
    df.select(date_add(current_date(), 5))
    Adds 5 days to the current date.
    Date FormattingFormats date to a specified pattern.from pyspark.sql.functions import date_format
    df.select(date_format(current_date(), 'yyyy-MM-dd'))
    Formats the date into yyyy-MM-dd format.
    Extracting Parts of DateExtracts year, month, day, etc., from a date.from pyspark.sql.functions import year, month, dayofmonth
    df.select(year(current_date()), month(current_date()))
    Extracts year and month directly from the current date.
    Date DifferenceFinds the difference between two dates.from pyspark.sql.functions import datediff
    df.select(datediff(current_date(), lit('2022-01-01')))
    Calculates the difference in days between today and a specified date.
    Handling Null DatesChecks and fills null date values.df.fillna({'date_column': '1970-01-01'})Replaces null date values with a default value (e.g., epoch start date).
    Convert String to DateConverts a string to DateType.df.select(to_date(lit("2023-10-31"), "yyyy-MM-dd"))Converts a string to DateType using a specified format.
    Using Python UDFs with DatesApplies Python functions directly to datetime columns.from pyspark.sql.functions import udf
    df.withColumn('year', udf(lambda x: x.year, IntegerType())(col('date_column')))
    Allows creating UDFs with Python’s datetime methods, e.g., extracting the year directly from date columns.

    Explanation:

    • PySpark’s integration with Python datetime types simplifies date manipulations by allowing direct conversion and application of Python datetime methods.
    • Functions like to_datedate_format, and UDFs make PySpark SQL highly flexible for working with dates, giving more control over formatting, extraction, and handling nulls or custom date logic in transformations.

    This compatibility makes it easier to write complex date manipulations within PySpark, directly utilizing Python’s rich datetime functionality.

    Pages: 1 2

  • The pandas Series is a one-dimensional array-like data structure that can store data of any type, including integers, floats, strings, or even Python objects. Each element in a Series is associated with a unique index label, making it easy to perform data retrieval and operations based on labels.

    Here’s a detailed guide on using Series in pandas, with complex examples and a cheat sheet.


    1. Creating a Series

    There are several ways to create a Series:

    1.1. Creating a Series from a List

    import pandas as pd
    # Creating a Series from a list with a custom index
    data = [10, 20, 30, 40]
    s = pd.Series(data, index=['a', 'b', 'c', 'd'])
    print(s)
    

    1.2. Creating a Series from a Dictionary

    # Series from a dictionary (keys become indices)
    data = {'a': 10, 'b': 20, 'c': 30}
    s = pd.Series(data)
    print(s)
    

    1.3. Creating a Series with Scalar Value

    p# Series with a scalar value
    s = pd.Series(5, index=['a', 'b', 'c'])
    print(s)
    

    2. Accessing Data in a Series

    You can access elements in a Series by index label or integer position.

    2.1. Accessing by Label

    # Accessing by index label
    print(s['b'])  # Outputs 20
    

    2.2. Accessing by Position

    # Accessing by integer position
    print(s.iloc[1])  # Outputs 20
    

    2.3. Accessing Multiple Elements

    # Accessing multiple elements
    print(s[['a', 'c']])  # Outputs values at 'a' and 'c' indices
    

    3. Operations on Series

    3.1. Mathematical Operations

    You can perform mathematical operations on Series directly.

    s = pd.Series([1, 2, 3, 4])
    # Element-wise addition
    print(s + 2)  # Adds 2 to each element
    

    3.2. Series Arithmetic with Another Series

    When performing arithmetic between two Series, pandas aligns the indices.

    s1 = pd.Series([1, 2, 3], index=['a', 'b', 'c'])
    s2 = pd.Series([1, 2, 3], index=['b', 'c', 'd'])
    
    # Element-wise addition with alignment
    print(s1 + s2)  # Missing values will be NaN
    

    Output:

    a    NaN
    b    3.0
    c    5.0
    d    NaN
    dtype: float64
    

    3.3. Applying Functions to Series

    You can apply functions element-wise using apply() or map().

    # Using apply() to square each element
    s = pd.Series([1, 2, 3, 4])
    s_squared = s.apply(lambda x: x ** 2)
    print(s_squared)
    

    3.4. Handling Missing Values

    Series can contain NaN (null) values, and pandas provides functions to handle them.

    s = pd.Series([1, None, 3, None, 5])
    
    # Drop missing values
    print(s.dropna())
    
    # Fill missing values
    print(s.fillna(0))
    

    4. Advanced Indexing Techniques

    4.1. Boolean Indexing

    You can filter Series elements based on conditions.

    s = pd.Series([10, 20, 30, 40], index=['a', 'b', 'c', 'd'])
    
    # Get elements greater than 20
    print(s[s > 20])
    

    4.2. Index Alignment and Reindexing

    Aligning Series based on indices or creating new indices.

    s = pd.Series([1, 2, 3], index=['a', 'b', 'c'])
    
    # Reindexing the Series
    s_reindexed = s.reindex(['a', 'b', 'c', 'd'], fill_value=0)
    print(s_reindexed)
    

    5. Aggregation and Statistical Functions

    Series provides many aggregation and statistical methods.

    s = pd.Series([10, 20, 30, 40])
    
    # Get the sum, mean, and standard deviation
    print(s.sum())   # Sum of elements
    print(s.mean())  # Mean of elements
    print(s.std())   # Standard deviation
    

    6. String Operations on Series

    String operations can be applied directly using the str accessor.

    s = pd.Series(['apple', 'banana', 'cherry'])
    
    # Convert each element to uppercase
    print(s.str.upper())
    
    # Check if each element contains the letter 'a'
    print(s.str.contains('a'))
    

    7. Combining Series

    You can combine Series using concatenation or appending.

    7.1. Concatenation

    s1 = pd.Series([1, 2], index=['a', 'b'])
    s2 = pd.Series([3, 4], index=['c', 'd'])
    
    # Concatenate Series
    s_combined = pd.concat([s1, s2])
    print(s_combined)
    

    7.2. Appending

    # Append s2 to s1
    s_appended = s1.append(s2)
    print(s_appended)
    

    8. Working with Index in Series

    8.1. Setting a Custom Index

    s = pd.Series([10, 20, 30], index=['x', 'y', 'z'])
    print(s)
    

    8.2. Resetting the Index

    s_reset = s.reset_index(drop=True)
    print(s_reset)
    

    Cheat Sheet for pandas Series

    OperationSyntax/ExampleDescription
    Creating a Seriespd.Series(data, index=index)Create Series from list, dict, or scalar.
    Access by Labels['label']Access element by label.
    Access by Positions.iloc[position]Access element by position.
    Slicings[start:end]Slice Series by position or label.
    Math Operationss + 2, s1 + s2Element-wise math, aligns indices.
    Apply Functionss.apply(func)Apply function to each element.
    Boolean Indexings[s > 20]Filter Series based on condition.
    Drop Missing Valuess.dropna()Removes NaN values.
    Fill Missing Valuess.fillna(value)Fills NaN values with specified value.
    Reindexs.reindex(new_index)Change or expand index.
    Aggregations.sum(), s.mean(), s.std()Aggregation functions.
    String Operationss.str.upper(), s.str.contains('a')Apply string operations.
    Concatenationpd.concat([s1, s2])Concatenate two or more Series.
    Reset Indexs.reset_index(drop=True)Reset Series index.
    Unique Valuess.unique()Returns unique values in Series.
    Value Countss.value_counts()Counts unique values in Series.
    Sortings.sort_values(), s.sort_index()Sorts by values or index.
    Combine with map()s.map(lambda x: x * 2)Apply function element-wise (similar to apply).
    Aligning with Another Seriess1 + s2Aligns indices and performs element-wise operations.
    Replacing Valuess.replace({old_val: new_val})Replace specific values in Series.

    Example Use Cases for Series

    1. Financial Analysis: A Series can store daily stock prices or monthly sales figures, allowing for easy aggregation and visualization.
    2. Data Cleaning: Use Series to handle individual columns in a DataFrame, e.g., applying string functions to clean text data.
    3. Index-based Calculations: If each index is a timestamp, Series enables time-based slicing and statistical calculations on time series data.
    4. One-off Calculations: Series is efficient for quick, one-dimensional analyses (e.g., finding averages, sums, counts) without creating a DataFrame.

    This cheat sheet and guide should help you work more effectively with pandas Series, allowing you to handle one-dimensional data with ease. Let me know if you need further examples or detailed explanations!

  • This tutorial covers a wide range of pandas operations and advanced concepts with examples that are practical and useful in real-world scenarios. The key topics include:

    1. Creating DataFrames, Series from various sources.
    2. Checking and changing data types.
    3. Looping through DataFrames efficiently.
    4. Handling missing data.
    5. Advanced data manipulation techniques like filtering, sorting, grouping, and merging.
    6. Window functions.
    7. Error handling and control mechanisms.

    In pandas, there are several core data structures designed for handling different types of data, enabling efficient and flexible data manipulation. These data structures include:

    1. Series
    2. DataFrame
    3. Index
    4. MultiIndex
    5. Panel (Deprecated)

    Each of these structures serves different purposes, from storing one-dimensional data to complex, multi-dimensional datasets.


    1. Series

    A Series is a one-dimensional array with labels, capable of holding any data type (integers, strings, floats, Python objects, etc.). It can be thought of as a column in a DataFrame or a labeled list. Each item in a Series has a unique index.

    Characteristics of Series:

    • One-dimensional.
    • Stores homogeneous data (though technically it can hold mixed data types).
    • Has an index for each element, which can be customized.

    Creating a Series:

    import pandas as pd
    
    # Creating a Series from a list
    data = [10, 20, 30, 40]
    s = pd.Series(data, index=['a', 'b', 'c', 'd'])
    print(s)
    

    Output:

    a    10
    b    20
    c    30
    d    40
    dtype: int64
    

    2. DataFrame

    A DataFrame is a two-dimensional, tabular data structure with labeled axes (rows and columns). It is the most commonly used pandas structure for handling datasets, where each column is a Series, and columns can have different data types.

    Characteristics of DataFrame:

    • Two-dimensional (rows and columns).
    • Stores heterogeneous data types (each column can be of a different type).
    • Has labeled axes (row index and column labels).
    • Flexible and highly customizable.

    Creating a DataFrame:

    # Creating a DataFrame from a dictionary
    data = {
        'Name': ['Alice', 'Bob', 'Charlie'],
        'Age': [24, 27, 22],
        'Salary': [70000, 80000, 60000]
    }
    
    df = pd.DataFrame(data)
    print(df)
    

    Output:

          Name  Age  Salary
    0    Alice   24   70000
    1      Bob   27   80000
    2  Charlie   22   60000
    

    3. Index

    An Index is an immutable data structure that labels the rows and columns of a pandas Series or DataFrame. It is a key part of pandas’ design, allowing for efficient data alignment and retrieval.

    Characteristics of Index:

    • It is immutable (cannot be changed after creation).
    • Allows for efficient label-based data retrieval.
    • Supports custom and multi-level indexing.

    Creating an Index:

    # Creating a custom Index for a DataFrame
    index = pd.Index(['a', 'b', 'c'])
    data = {'Value': [10, 20, 30]}
    df = pd.DataFrame(data, index=index)
    print(df)
    

    Output:

       Value
    a     10
    b     20
    c     30
    

    4. MultiIndex

    A MultiIndex is an advanced index structure that allows for multiple levels or hierarchical indexing. This enables you to create a DataFrame with more complex indexing, which is useful when working with data that has multiple dimensions (e.g., time series data with both date and time).

    Characteristics of MultiIndex:

    • Supports hierarchical (multi-level) indexing.
    • Facilitates working with higher-dimensional data in a 2D DataFrame.
    • Provides powerful data slicing and subsetting capabilities.

    Creating a MultiIndex DataFrame:

    arrays = [
        ['A', 'A', 'B', 'B'],
        ['one', 'two', 'one', 'two']
    ]
    index = pd.MultiIndex.from_arrays(arrays, names=('Upper', 'Lower'))
    
    data = {'Value': [1, 2, 3, 4]}
    df = pd.DataFrame(data, index=index)
    print(df)
    

    Output:

               Value
    Upper Lower
    A     one       1
          two       2
    B     one       3
          two       4
    

    5. Panel (Deprecated)

    Panel was a three-dimensional data structure in pandas that allowed for storing 3D data, making it possible to have multiple DataFrames within one structure. However, it was deprecated in pandas 0.25.0 and later removed, as the community now recommends using MultiIndex DataFrames or xarray (for multidimensional data).

    Alternative:

    • For handling three or more dimensions, use a MultiIndex DataFrame or the xarray library, which is specifically designed for multidimensional arrays.

    Summary Table

    Data StructureDimensionsDescriptionUsage
    Series1DOne-dimensional array with labeled indices.Storing single columns or lists with labels.
    DataFrame2DTwo-dimensional, table-like structure with labeled rows and columns.Primary data structure for handling tabular data.
    Index1DImmutable labels for rows and columns in Series and DataFrames.Used for indexing and aligning data.
    MultiIndex2D (multi-level)Hierarchical indexing allowing multiple levels of row/column labels.Organizing complex data, such as time series with multiple dimensions.
    Panel3D (Deprecated)Three-dimensional data structure (removed from pandas).Use MultiIndex DataFrames or xarray instead for multidimensional data.

    Use Cases for Each Data Structure

    1. Series:
      • Ideal for handling individual columns in isolation.
      • Useful when working with single-column data or performing calculations on one dimension.
    2. DataFrame:
      • Perfect for most data analysis tasks with two-dimensional data.
      • Used for data cleaning, manipulation, transformation, and visualization.
    3. Index:
      • Helps to uniquely identify each row or column in Series and DataFrames.
      • Useful for efficient data selection and alignment.
    4. MultiIndex:
      • Essential for handling multi-dimensional data within a 2D structure.
      • Allows for advanced slicing and subsetting in complex datasets.

    Creating DataFrames from various sources.

    Pandas provides various ways to create DataFrames from different data sources. In this section, we’ll go over the most common and practical methods for creating DataFrames using data from:

    1. Python data structures (lists, dictionaries, tuples, arrays)
    2. CSV files
    3. Excel files
    4. JSON files
    5. SQL databases
    6. Web data (HTML tables)
    7. Other sources (e.g., clipboard, API)

    1. Creating DataFrames from Python Data Structures

    1.1. From a Dictionary

    A dictionary in Python is a very common way to create a DataFrame, where keys represent column names and values represent data.

    import pandas as pd
    
    # Example 1: Dictionary of lists
    data = {
        'Name': ['Alice', 'Bob', 'Charlie'],
        'Age': [25, 30, 35],
        'Salary': [50000, 60000, 70000]
    }
    
    df = pd.DataFrame(data)
    print(df)
    

    fruits = pd.DataFrame({‘Apple’:[30],’Banana’:[21]})

    why not fruits = pd.DataFrame({‘Apple’:30,’Banana’:21}) works

    The reason fruits = pd.DataFrame({'Apple':30,'Banana':21}) doesn’t work as expected is due to how Pandas’ DataFrame constructor handles dictionary inputs.

    Short answer:

    When passing a dictionary to pd.DataFrame(), Pandas expects the dictionary values to be lists or arrays, not scalars.

    Long answer:

    When you pass a dictionary to pd.DataFrame(), Pandas treats each dictionary key as a column name and each corresponding value as a column of data. Pandas expects the column data to be:

    1. Lists: [value1, value2, ..., valueN]
    2. Arrays: np.array([value1, value2, ..., valueN])
    3. Series: pd.Series([value1, value2, ..., valueN])

    However, in your example, {'Apple': 30, 'Banana': 21}, the dictionary values are scalars (integers), not lists or arrays. Pandas doesn’t know how to handle scalar values as column data.

    What happens when you pass scalars:

    When you pass scalars, Pandas will attempt to create a DataFrame with a single row, but it will throw an error because it expects an index (row label) for that single row.

    Why {'Apple':[30],'Banana':[21]} works:

    By wrapping the scalar values in lists ([30] and [21]), you’re providing Pandas with the expected input format. Pandas creates a DataFrame with a single row and two columns, where each column contains a single value.

    Alternative solution:

    If you prefer not to wrap scalars in lists, you can use the pd.DataFrame() constructor with the index parameter to specify the row label:

    fruits = pd.DataFrame({'Apple': 30, 'Banana': 21}, index=[0])

    This creates a DataFrame with a single row labeled 0.

    Best practice:

    For consistency and clarity, it’s recommended to always pass lists or arrays as dictionary values when creating DataFrames:

    fruits = pd.DataFrame({'Apple': [30], 'Banana': [21]})

    1.2. From a List of Lists

    You can create a DataFrame by passing a list of lists and specifying column names.

    # Example 2: List of lists
    data = [
        ['Alice', 25, 50000],
        ['Bob', 30, 60000],
        ['Charlie', 35, 70000]
    ]
    
    df = pd.DataFrame(data, columns=['Name', 'Age', 'Salary'])
    print(df)
    

    1.3. From a List of Dictionaries

    Each dictionary in the list represents a row, where keys are column names.

    # Example 3: List of dictionaries
    data = [
        {'Name': 'Alice', 'Age': 25, 'Salary': 50000},
        {'Name': 'Bob', 'Age': 30, 'Salary': 60000},
        {'Name': 'Charlie', 'Age': 35, 'Salary': 70000}
    ]
    
    df = pd.DataFrame(data)
    print(df)
    

    1.4. From Tuples

    A DataFrame can also be created from a list of tuples.

    # Example 4: List of tuples
    data = [
        ('Alice', 25, 50000),
        ('Bob', 30, 60000),
        ('Charlie', 35, 70000)
    ]
    
    df = pd.DataFrame(data, columns=['Name', 'Age', 'Salary'])
    print(df)
    

    data = [ (‘Alice’, 25, 50000), (‘Bob’, 30, 60000), (‘Charlie’, 35, 70000) ]

    df = pd.DataFrame(data, columns=[‘Name’, ‘Age’, ‘Salary’]) , so How is A tuple being presented in Pandas in above example?

    data = [
        ('Alice', 25, 50000),
        ('Bob', 30, 60000),
        ('Charlie', 35, 70000)
    ]
    
    df = pd.DataFrame(data, columns=['Name', 'Age', 'Salary'])
    print(df)

    Each tuple (('Alice', 25, 50000), etc.) represents a row in the DataFrame. The tuple elements are not scalars in this context; instead, they are row values.

    Pandas treats each tuple as a single row, where:

    • The first element of the tuple ('Alice') corresponds to the 'Name' column.
    • The second element (25) corresponds to the 'Age' column.
    • The third element (50000) corresponds to the 'Salary' column.

    By passing a list of tuples, you’re providing Pandas with a clear indication of the row structure.

    Under the hood, Pandas converts each tuple into a:

    • list (in Python 3.x) or
    • tuple (in Python 2.x)

    which is then used to create the DataFrame.

    Equivalent representation:

    You can achieve the same result using lists instead of tuples:

    data = [
        ['Alice', 25, 50000],
        ['Bob', 30, 60000],
        ['Charlie', 35, 70000]
    ]
    
    df = pd.DataFrame(data, columns=['Name', 'Age', 'Salary'])
    print(df)

    Or, using dictionaries:

    data = [
        {'Name': 'Alice', 'Age': 25, 'Salary': 50000},
        {'Name': 'Bob', 'Age': 30, 'Salary': 60000},
        {'Name': 'Charlie', 'Age': 35, 'Salary': 70000}
    ]
    
    df = pd.DataFrame(data)
    print(df)

    All three representations (tuples, lists, and dictionaries) convey the same information to Pandas: a collection of rows with corresponding column values.

    1.5. From a NumPy Array

    You can create a DataFrame using a NumPy array.

    import numpy as np
    
    # Example 5: NumPy array
    data = np.array([[25, 50000], [30, 60000], [35, 70000]])
    df = pd.DataFrame(data, columns=['Age', 'Salary'])
    print(df)
    

    2. Creating DataFrames from Files

    2.1. From a CSV File

    The most common way to create a DataFrame is by loading data from a CSV file.

    # Example 6: CSV file
    df = pd.read_csv('path/to/file.csv')
    print(df.head())
    
    • Additional options:
      • Use sep=";" for custom delimiters.
      • Use index_col=0 to set a specific column as the index.

    2.2. From an Excel File

    You can read data from Excel files using pd.read_excel().

    # Example 7: Excel file
    df = pd.read_excel('path/to/file.xlsx', sheet_name='Sheet1')
    print(df.head())
    

    2.3. From a JSON File

    Data can also be read from JSON files.

    # Example 8: JSON file
    df = pd.read_json('path/to/file.json')
    print(df.head())
    

    3. Creating DataFrames from SQL Databases

    3.1. From SQL Databases

    You can fetch data from a SQL database using pd.read_sql() or pd.read_sql_query().

    import sqlite3
    
    # Example 9: SQL database (SQLite)
    conn = sqlite3.connect('my_database.db')
    df = pd.read_sql('SELECT * FROM my_table', conn)
    print(df.head())
    
    • You can also connect to databases like MySQL, PostgreSQL, etc., using appropriate connectors like mysql-connector or psycopg2.

    4. Creating DataFrames from Web Data

    4.1. From HTML Tables

    You can directly extract tables from a webpage using pd.read_html(). This returns a list of DataFrames for each table found.

    # Example 10: Web data (HTML tables)
    url = 'https://en.wikipedia.org/wiki/List_of_countries_by_population_(United_Nations)'
    dfs = pd.read_html(url)
    print(dfs[0].head())  # Display the first table
    

    5. Creating DataFrames from the Clipboard

    5.1. From Clipboard

    You can copy data (e.g., from a CSV or Excel table) to your clipboard and read it into a DataFrame using pd.read_clipboard().

    # Example 11: Clipboard
    df = pd.read_clipboard()
    print(df)
    

    6. Creating DataFrames Programmatically

    6.1. Empty DataFrame

    An empty DataFrame can be created by simply calling the DataFrame constructor.

    # Example 12: Empty DataFrame
    df = pd.DataFrame()
    print(df)
    

    6.2. With Initial Columns

    You can create an empty DataFrame with predefined columns.

    # Example 13: DataFrame with predefined columns
    df = pd.DataFrame(columns=['Name', 'Age', 'Salary'])
    print(df)
    

    6.3. Appending Rows to DataFrame

    You can programmatically append rows to a DataFrame.

    # Example 14: Appending rows
    df = pd.DataFrame(columns=['Name', 'Age', 'Salary'])
    df = df.append({'Name': 'Alice', 'Age': 25, 'Salary': 50000}, ignore_index=True)
    df = df.append({'Name': 'Bob', 'Age': 30, 'Salary': 60000}, ignore_index=True)
    print(df)
    

    7. Creating DataFrames from APIs

    7.1. From an API Response (JSON)

    You can create a DataFrame from JSON responses from APIs.

    import requests
    
    # Example 15: API Response (JSON)
    url = 'https://api.example.com/data'
    response = requests.get(url)
    data = response.json()
    
    # Convert JSON data to DataFrame
    df = pd.DataFrame(data)
    print(df.head())
    

    8. Creating DataFrames with MultiIndex

    8.1. MultiIndex DataFrame

    You can create a DataFrame with hierarchical indexing (MultiIndex).

    # Example 16: MultiIndex
    arrays = [['North', 'North', 'South', 'South'], ['City1', 'City2', 'City3', 'City4']]
    index = pd.MultiIndex.from_arrays(arrays, names=('Region', 'City'))
    
    data = {'Population': [100000, 200000, 150000, 130000], 'GDP': [300, 400, 500, 600]}
    df = pd.DataFrame(data, index=index)
    print(df)
    

    There are many ways to create pandas DataFrames from various sources and data structures, including:

    • Python data structures: lists, dictionaries, tuples, arrays.
    • Files: CSV, Excel, JSON.
    • SQL databases: SQLite, MySQL, PostgreSQL.
    • Web data: HTML tables.
    • Clipboard: Copy-pasting directly into a DataFrame.
    • APIs: Fetching JSON data from web APIs

    Viewing, accessing and checking DataFrames in Pandas

    In pandas, viewing, accessing, checking data types, and generating descriptive statistics are fundamental steps when working with DataFrames. These operations help in understanding the structure, content, and type of the data before performing any data analysis or manipulation.

    Let’s explore these concepts with detailed explanations, functions, and examples.


    1. Viewing Data in pandas DataFrames

    Before performing any transformations or analysis, it’s important to view the data to understand its structure and content.

    1.1. Viewing the First or Last Rows

    • df.head(n): Returns the first n rows (default: 5 rows).
    • df.tail(n): Returns the last n rows (default: 5 rows).
    import pandas as pd
    
    # Example DataFrame
    data = {'Name': ['Alice', 'Bob', 'Charlie', 'David', 'Eva'],
            'Age': [25, 30, 35, 40, 28],
            'Salary': [50000, 60000, 70000, 80000, 55000]}
    
    df = pd.DataFrame(data)
    
    # View the first 3 rows
    print(df.head(3))
    
    # View the last 2 rows
    print(df.tail(2))
    

    1.2. Viewing Specific Columns

    You can select and view specific columns of the DataFrame.

    • df['column_name']: Access a single column.
    • df[['col1', 'col2']]: Access multiple columns.
    # View the 'Name' column
    print(df['Name'])
    
    # View 'Name' and 'Salary' columns
    print(df[['Name', 'Salary']])
    

    1.3. Viewing DataFrame Shape

    • df.shape: Returns a tuple representing the dimensions (rows, columns) of the DataFrame.
    # Get the shape of the DataFrame
    print(df.shape)  # Output: (5, 3) -> 5 rows, 3 columns
    

    1.4. Viewing DataFrame Columns

    • df.columns: Returns an Index object containing the column names.
    # View column names
    print(df.columns)
    

    1.5. Viewing Data Types of Columns

    • df.dtypes: Returns the data types of each column in the DataFrame.
    # View data types of each column
    print(df.dtypes)
    

    2. Accessing Data in pandas DataFrames

    There are various ways to access specific rows and columns in pandas.

    2.1. Accessing Rows by Index

    You can access rows using iloc[] (integer-location) or loc[] (label-location).

    • df.iloc[]: Access rows by their integer position.
    • df.loc[]: Access rows and columns by label.
    # Access the first row (index 0)
    print(df.iloc[0])
    
    # Access row where index is 2
    print(df.loc[2])
    

    2.2. Accessing Rows and Columns

    • df.loc[row, column]: Access a specific element using labels.
    • df.iloc[row, column]: Access a specific element using integer positions.
    # Access the 'Salary' of the second row
    print(df.loc[1, 'Salary'])
    
    # Access the 'Name' of the third row
    print(df.iloc[2, 0])
    

    2.3. Accessing Rows Based on Conditions

    You can filter rows based on conditions (boolean indexing).

    # Access rows where Age > 30
    filtered_df = df[df['Age'] > 30]
    print(filtered_df)
    

    3. Checking and Changing Data Types of Columns

    Understanding the data types of columns is crucial when performing data analysis, as different operations are valid for different data types.

    3.1. Checking Data Types

    • df.dtypes: Returns the data type of each column.
    # Check the data types of all columns
    print(df.dtypes)
    

    3.2. Changing Data Types

    You can change the data type of columns using astype().

    # Convert 'Age' column to float
    df['Age'] = df['Age'].astype(float)
    print(df.dtypes)
    
    # Convert 'Salary' column to string
    df['Salary'] = df['Salary'].astype(str)
    print(df.dtypes)
    

    3.3. Converting to Datetime

    You can convert columns to datetime using pd.to_datetime().

    # Convert 'Date' column to datetime format
    df['Hire_Date'] = pd.to_datetime(df['Hire_Date'])
    print(df.dtypes)
    

    4. Descriptive Statistics in pandas

    Descriptive statistics allow you to quickly summarize the central tendency, dispersion, and shape of a dataset’s distribution.

    4.1. Generating Descriptive Statistics

    • df.describe(): Provides summary statistics (count, mean, std, min, max, etc.) for numerical columns.
    • df.describe(include='all'): Includes both numerical and categorical columns in the summary.
    # Summary statistics for numerical columns
    print(df.describe())
    
    # Summary statistics including categorical columns
    print(df.describe(include='all'))
    

    4.2. Summary for Specific Columns

    You can generate descriptive statistics for specific columns.

    # Summary statistics for 'Salary' column
    print(df['Salary'].describe())
    

    4.3. Counting Unique Values

    • df['column'].nunique(): Counts the number of unique values in a column.
    • df['column'].unique(): Lists the unique values in a column.
    # Count unique values in the 'Age' column
    print(df['Age'].nunique())
    
    # Get unique values in the 'Name' column
    print(df['Name'].unique())
    

    4.4. Counting Frequency of Values

    • df['column'].value_counts(): Returns a series containing counts of unique values.
    # Get frequency counts of unique values in 'Age' column
    print(df['Age'].value_counts())
    

    4.5. Correlation Between Columns

    • df.corr(): Computes pairwise correlation of numerical columns.
    # Compute correlation matrix
    print(df.corr())
    

    5. Functions for Displaying Data

    In pandas, there are several functions for displaying data in a structured and readable format.

    5.1. df.head() and df.tail()

    These functions display the first or last n rows of the DataFrame.

    # Display first 5 rows
    print(df.head())
    
    # Display last 3 rows
    print(df.tail(3))
    

    5.2. df.sample()

    Randomly sample rows from the DataFrame.

    # Randomly sample 3 rows
    print(df.sample(3))
    

    5.3. df.to_string()

    Converts the DataFrame to a string, useful for displaying the entire DataFrame in environments that truncate the output.

    # Display entire DataFrame as a string
    print(df.to_string())
    

    5.4. Viewing Large DataFrames

    • pd.set_option(): Configure pandas options to display more rows or columns in large DataFrames.
    # Set pandas option to display more rows
    pd.set_option('display.max_rows', 100)
    
    # Set pandas option to display more columns
    pd.set_option('display.max_columns', 20)
    
    # Display the DataFrame with updated options
    print(df)
    

    5.5. Transposing Data

    • df.T: Transposes the DataFrame (swap rows and columns).
    # Transpose the DataFrame
    print(df.T)
    

    Summary

    Here’s a quick summary of key operations for viewing, accessing, checking data types, and descriptive statistics in pandas DataFrames:

    Viewing and Accessing Data:

    • head(), tail(): Viewing first/last rows.
    • shape: DataFrame dimensions.
    • columns, dtypes: Viewing columns and data types.
    • iloc[], loc[]: Accessing rows and columns.
    • boolean indexing: Filtering data based on conditions.

    Checking and Changing Data Types:

    • dtypes: Check column data types.
    • astype(): Change column data types.
    • to_datetime(): Convert to datetime format.

    Descriptive Statistics:

    • describe(): Summary statistics.
    • nunique(), unique(), value_counts(): Working with unique values and frequency counts.
    • corr(): Correlation between numerical columns.

    These operations help in quickly exploring, inspecting, and understanding data before moving on to deeper analysis or transformations. Let me know if you need further details or examples!

    Access, Filter, and Retrieve specific rows and columns from a pandas DataFrame- Selection in Pandas

    Selection in pandas refers to the various ways in which you can access, filter, and retrieve specific rows and columns from a pandas DataFrame or Series. pandas offers a variety of methods for selecting data, which include basic indexing, label-based indexing, conditional selection, and more advanced techniques.

    Let’s cover all the important selection techniques with explanations and examples.


    1. Selecting Columns in pandas

    1.1. Selecting a Single Column

    You can select a single column from a DataFrame by using df['column_name'] or df.column_name.

    import pandas as pd
    
    # Sample DataFrame
    data = {'Name': ['Alice', 'Bob', 'Charlie', 'David'],
            'Age': [25, 30, 35, 40],
            'Salary': [50000, 60000, 70000, 80000]}
    
    df = pd.DataFrame(data)
    
    # Selecting a single column (two methods)
    print(df['Name'])    # Method 1
    print(df.Name)       # Method 2
    

    1.2. Selecting Multiple Columns

    To select multiple columns, you pass a list of column names to df[].

    # Select 'Name' and 'Salary' columns
    print(df[['Name', 'Salary']])
    

    2. Selecting Rows in pandas

    2.1. Selecting Rows by Index Position with iloc[]

    iloc[] is used to select rows and columns by their integer position.

    • df.iloc[row_index, column_index]: Access specific row and column by position.
    # Select the first row (index 0)
    print(df.iloc[0])
    
    # Select the first two rows
    print(df.iloc[:2])
    
    # Select the value from the second row and 'Salary' column
    print(df.iloc[1, 2])
    

    2.2. Selecting Rows by Label with loc[]

    loc[] is used to select rows and columns by labels (index labels or column names).

    • df.loc[row_label, column_label]: Access specific row and column by label.
    # Select the row with index 1 (Bob)
    print(df.loc[1])
    
    # Select the 'Salary' of Bob (index 1)
    print(df.loc[1, 'Salary'])
    
    # Select the 'Name' and 'Salary' of the row where index is 2
    print(df.loc[2, ['Name', 'Salary']])
    

    3. Conditional Selection (Boolean Indexing)

    You can use boolean conditions to filter rows based on column values.

    3.1. Filtering Rows Based on Conditions

    # Select rows where Age is greater than 30
    print(df[df['Age'] > 30])
    
    # Select rows where Salary is less than or equal to 60000
    print(df[df['Salary'] <= 60000])
    

    3.2. Combining Multiple Conditions

    You can combine multiple conditions using & (AND) or | (OR) operators, and you must enclose each condition in parentheses.

    # Select rows where Age > 30 and Salary > 60000
    print(df[(df['Age'] > 30) & (df['Salary'] > 60000)])
    
    # Select rows where Age < 40 or Salary < 60000
    print(df[(df['Age'] < 40) | (df['Salary'] < 60000)])
    

    3.3. Filtering Rows Based on String Conditions

    You can filter rows based on string values using str.contains().

    # Select rows where 'Name' contains the letter 'a'
    print(df[df['Name'].str.contains('a')])
    

    4. Selecting by Index

    You can set an index and then select data based on the index.

    4.1. Setting a New Index

    # Set 'Name' as the new index
    df.set_index('Name', inplace=True)
    
    # Now select row by 'Name' (which is the new index)
    print(df.loc['Alice'])
    

    4.2. Resetting the Index

    You can reset the index back to the default (integer-based) using reset_index().

    # Reset the index back to default
    df.reset_index(inplace=True)
    print(df)
    

    5. Selecting Data with at[] and iat[]

    5.1. Using at[] for Scalar Access by Label

    at[] is used to access a single value by label (similar to loc[] but optimized for single value access).

    # Get the 'Salary' of the row with index 1 (Bob)
    print(df.at[1, 'Salary'])
    

    5.2. Using iat[] for Scalar Access by Position

    iat[] is used to access a single value by position (similar to iloc[] but optimized for single value access).

    # Get the 'Salary' of the second row (index 1)
    print(df.iat[1, 2])
    

    6. Selecting Rows and Columns by Data Type

    6.1. Selecting Columns by Data Type

    You can select columns based on their data type using select_dtypes().

    # Select only numeric columns
    numeric_df = df.select_dtypes(include='number')
    print(numeric_df)
    
    # Select only object (string) columns
    string_df = df.select_dtypes(include='object')
    print(string_df)
    

    7. Selecting Specific Data Based on a List of Labels

    You can filter rows or columns based on a list of labels.

    7.1. Selecting Rows Based on a List of Indexes

    # Select rows with index 0 and 2
    print(df.loc[[0, 2]])
    

    7.2. Selecting Columns Based on a List of Column Names

    # Select 'Name' and 'Age' columns
    print(df[['Name', 'Age']])
    

    8. Slicing Rows and Columns

    You can slice rows and columns using iloc[] and loc[] to select a range of rows or columns.

    8.1. Slicing Rows with iloc[]

    # Select the first 3 rows
    print(df.iloc[:3])
    
    # Select rows from index 1 to 3 (exclusive)
    print(df.iloc[1:3])
    

    8.2. Slicing Columns with iloc[]

    # Select all rows and the first two columns
    print(df.iloc[:, :2])
    
    # Select rows 0-2 and columns 1-2
    print(df.iloc[0:3, 1:3])
    

    8.3. Slicing Rows and Columns with loc[]

    # Select rows 0 to 2 and columns 'Name' and 'Age'
    print(df.loc[0:2, ['Name', 'Age']])
    

    9. Selecting Data by Index Ranges

    You can select data based on index ranges using loc[].

    # Select rows from index 1 to 3 (inclusive)
    print(df.loc[1:3])
    

    10. Advanced Selections with query()

    The query() method allows you to select rows based on complex conditions using SQL-like syntax.

    # Select rows where Age > 30 and Salary < 80000
    filtered_df = df.query('Age > 30 and Salary < 80000')
    print(filtered_df)
    

    11. Masking Data

    You can use mask() to mask certain data based on conditions, replacing them with a default value (e.g., NaN).

    # Mask Salary values greater than 60000 with NaN
    df_masked = df.mask(df['Salary'] > 60000)
    print(df_masked)
    

    12. Selecting Missing Data (NaN values)

    You can select rows where specific columns have missing data (NaN values).

    # Select rows where 'Salary' has NaN values
    missing_salary = df[df['Salary'].isna()]
    print(missing_salary)
    

    In pandas, selection can be done in various ways depending on what you want to access:

    1. Columns: Select one or more columns using df['col'] or df[['col1', 'col2']].
    2. Rows: Select rows using iloc[] (by position) or loc[] (by label).
    3. Conditional selection: Filter rows based on conditions using boolean indexing (df[df['col'] > value]).
    4. Indexing: Use set_index(), loc[], and reset_index() for selection based on index labels.
    5. Slicing: Slice rows and columns with iloc[] or loc[].
    6. Advanced queries: Use query() for SQL-like row filtering.

    These selection techniques are fundamental in data wrangling and allow you to manipulate and analyze your data efficiently. Let me know if you need more examples or specific details on any of the concepts!

    Merging, joining, concatenating, comparing, Sorting and Advanced Filtering DataFrames

    In pandas, merging, joining, concatenating, and comparing DataFrames are common tasks used to combine or compare datasets. Each operation is tailored for specific use cases, and understanding the difference between them is important for efficient data manipulation.

    Let’s explore each operation, covering both basic and advanced use cases, along with detailed examples.


    1. Merging DataFrames (pd.merge())

    merge() is used for combining two DataFrames based on a common key(s). It’s similar to SQL joins (INNER, LEFT, RIGHT, FULL OUTER).

    1.1. Inner Join (Default)

    An inner join returns only the rows where the values in the joining columns match in both DataFrames.

    import pandas as pd
    
    # Sample DataFrames
    df1 = pd.DataFrame({
        'ID': [1, 2, 3, 4],
        'Name': ['Alice', 'Bob', 'Charlie', 'David']
    })
    
    df2 = pd.DataFrame({
        'ID': [3, 4, 5, 6],
        'Salary': [70000, 80000, 90000, 100000]
    })
    
    # Merge with inner join (default)
    df_inner = pd.merge(df1, df2, on='ID')
    print(df_inner)
    

    Output:

    plaintextCopy code   ID     Name  Salary
    0   3  Charlie   70000
    1   4    David   80000
    

    1.2. Left Join

    A left join returns all rows from the left DataFrame and only matching rows from the right DataFrame. Non-matching rows from the right DataFrame are filled with NaN.

    pythonCopy code# Left join
    df_left = pd.merge(df1, df2, on='ID', how='left')
    print(df_left)
    

    Output:

    plaintextCopy code   ID     Name   Salary
    0   1    Alice      NaN
    1   2      Bob      NaN
    2   3  Charlie  70000.0
    3   4    David  80000.0
    

    1.3. Right Join

    A right join returns all rows from the right DataFrame and only matching rows from the left DataFrame.

    pythonCopy code# Right join
    df_right = pd.merge(df1, df2, on='ID', how='right')
    print(df_right)
    

    Output:

    plaintextCopy code   ID     Name   Salary
    0   3  Charlie  70000.0
    1   4    David  80000.0
    2   5      NaN  90000.0
    3   6      NaN 100000.0
    

    1.4. Full Outer Join

    A full outer join returns all rows when there is a match in either the left or right DataFrame. Missing values are filled with NaN.

    pythonCopy code# Full outer join
    df_outer = pd.merge(df1, df2, on='ID', how='outer')
    print(df_outer)
    

    Output:

    plaintextCopy code   ID     Name   Salary
    0   1    Alice      NaN
    1   2      Bob      NaN
    2   3  Charlie  70000.0
    3   4    David  80000.0
    4   5      NaN  90000.0
    5   6      NaN 100000.0
    

    1.5. Merging on Multiple Columns

    You can merge DataFrames on multiple columns by passing a list to the on parameter.

    pythonCopy code# Sample DataFrames
    df1 = pd.DataFrame({
        'ID': [1, 2, 3, 4],
        'Name': ['Alice', 'Bob', 'Charlie', 'David'],
        'Dept': ['HR', 'Finance', 'IT', 'HR']
    })
    
    df2 = pd.DataFrame({
        'ID': [3, 4, 5, 6],
        'Dept': ['IT', 'HR', 'Finance', 'IT'],
        'Salary': [70000, 80000, 90000, 100000]
    })
    
    # Merge on multiple columns
    df_multi = pd.merge(df1, df2, on=['ID', 'Dept'], how='inner')
    print(df_multi)
    

    2. Joining DataFrames (df.join())

    The join() method is similar to merge(), but it is used primarily to combine DataFrames based on their index rather than a column key. It’s convenient when you need to join two DataFrames based on their index.

    2.1. Left Join by Default

    By default, join() performs a left join using the index of the DataFrame.

    pythonCopy codedf1 = pd.DataFrame({
        'Name': ['Alice', 'Bob', 'Charlie'],
        'Salary': [50000, 60000, 70000]
    }, index=[1, 2, 3])
    
    df2 = pd.DataFrame({
        'Dept': ['HR', 'Finance', 'IT']
    }, index=[1, 2, 4])
    
    # Left join using join()
    df_joined = df1.join(df2)
    print(df_joined)
    

    Output:

    plaintextCopy code      Name  Salary     Dept
    1    Alice   50000       HR
    2      Bob   60000  Finance
    3  Charlie   70000      NaN
    

    2.2. Specifying Different Types of Joins in join()

    You can specify the type of join using the how parameter (e.g., ‘left’, ‘right’, ‘inner’, ‘outer’).

    pythonCopy code# Outer join using join()
    df_outer_join = df1.join(df2, how='outer')
    print(df_outer_join)
    

    3. Concatenating DataFrames (pd.concat())

    The concat() function is used to concatenate DataFrames along a particular axis, either vertically (by rows) or horizontally (by columns).

    3.1. Concatenating Vertically (Default)

    By default, concat() stacks DataFrames vertically (row-wise).

    pythonCopy codedf1 = pd.DataFrame({'Name': ['Alice', 'Bob'], 'Salary': [50000, 60000]})
    df2 = pd.DataFrame({'Name': ['Charlie', 'David'], 'Salary': [70000, 80000]})
    
    # Concatenate vertically (row-wise)
    df_concat = pd.concat([df1, df2])
    print(df_concat)
    

    Output:

    plaintextCopy code      Name  Salary
    0    Alice   50000
    1      Bob   60000
    0  Charlie   70000
    1    David   80000
    

    3.2. Concatenating Horizontally (Column-wise)

    You can concatenate DataFrames horizontally (by columns) using axis=1.

    pythonCopy code# Concatenate horizontally (column-wise)
    df_concat_cols = pd.concat([df1, df2], axis=1)
    print(df_concat_cols)
    

    Output:

    plaintextCopy code      Name  Salary     Name  Salary
    0    Alice   50000  Charlie   70000
    1      Bob   60000    David   80000
    

    3.3. Resetting Index After Concatenation

    The index is preserved when concatenating, but you can reset the index using ignore_index=True.

    pythonCopy code# Concatenate vertically and reset index
    df_concat_reset = pd.concat([df1, df2], ignore_index=True)
    print(df_concat_reset)
    

    Output:

    plaintextCopy code      Name  Salary
    0    Alice   50000
    1      Bob   60000
    2  Charlie   70000
    3    David   80000
    

    4. Comparing DataFrames (pd.DataFrame.compare())

    compare() is used to compare two DataFrames and highlight the differences between them.

    4.1. Comparing Two DataFrames

    pythonCopy code# Sample DataFrames
    df1 = pd.DataFrame({
        'Name': ['Alice', 'Bob', 'Charlie'],
        'Salary': [50000, 60000, 70000]
    })
    
    df2 = pd.DataFrame({
        'Name': ['Alice', 'Bob', 'Charlie'],
        'Salary': [50000, 65000, 70000]
    })
    
    # Compare the two DataFrames
    df_compare = df1.compare(df2)
    print(df_compare)
    

    Output:

    plaintextCopy code        Salary
              self    other
    1       60000     65000
    

    4.2. Including All Columns in the Comparison

    You can include all columns in the comparison, even those without differences, using the keep_equal=True parameter.

    pythonCopy code# Compare and include equal values
    df_compare_all = df1.compare(df2, keep_equal=True)
    print(df_compare_all)
    


    5. Sorting in pandas

    You can sort a DataFrame by one or more columns in ascending or descending order using sort_values().

    5.1. Sorting by a Single Column

    • sort_values(by='column_name'): Sort the DataFrame by a single column in ascending order (default).
    pythonCopy code# Sample DataFrame
    df = pd.DataFrame({
        'Name': ['Alice', 'Bob', 'Charlie', 'David'],
        'Age': [25, 30, 35, 40],
        'Salary': [50000, 60000, 70000, 80000]
    })
    
    # Sort by 'Age' in ascending order
    df_sorted = df.sort_values(by='Age')
    print(df_sorted)
    

    Output:

    plaintextCopy code      Name  Age  Salary
    0    Alice   25   50000
    1      Bob   30   60000
    2  Charlie   35   70000
    3    David   40   80000
    

    5.2. Sorting by Multiple Columns

    You can sort by multiple columns, specifying whether each column should be sorted in ascending or descending order.

    pythonCopy code# Sort by 'Age' in ascending order and 'Salary' in descending order
    df_sorted_multi = df.sort_values(by=['Age', 'Salary'], ascending=[True, False])
    print(df_sorted_multi)
    

    Output:

          Name  Age  Salary
    0    Alice   25   50000
    1      Bob   30   60000
    2  Charlie   35   70000
    3    David   40   80000
    

    5.3. Sorting by Index

    • sort_index(): Sort the DataFrame by its index.
    # Sort by index
    df_sorted_index = df.sort_index()
    print(df_sorted_index)
    

    6. Advanced Filtering in pandas

    Advanced filtering allows you to retrieve specific subsets of data based on complex conditions, such as combining multiple conditions, filtering by partial string matches, and more.

    6.1. Filtering Rows Based on Multiple Conditions

    You can filter DataFrames based on multiple conditions by combining them with & (AND) or | (OR). Each condition must be enclosed in parentheses.

    # Filter rows where Age > 30 and Salary > 60000
    df_filtered = df[(df['Age'] > 30) & (df['Salary'] > 60000)]
    print(df_filtered)
    

    Output:

          Name  Age  Salary
    2  Charlie   35   70000
    3    David   40   80000
    

    6.2. Filtering Rows Based on String Conditions

    You can filter rows based on string conditions, such as checking if a column contains a certain substring using str.contains().

    # Filter rows where 'Name' contains the letter 'a'
    df_filtered_str = df[df['Name'].str.contains('a')]
    print(df_filtered_str)
    

    Output:

          Name  Age  Salary
    0    Alice   25   50000
    2  Charlie   35   70000
    3    David   40   80000
    

    6.3. Filtering Rows Based on Ranges

    You can filter rows where column values fall within a certain range using between().

    pythonCopy code# Filter rows where 'Salary' is between 55000 and 75000
    df_filtered_range = df[df['Salary'].between(55000, 75000)]
    print(df_filtered_range)
    

    Output:

    plaintextCopy code      Name  Age  Salary
    1      Bob   30   60000
    2  Charlie   35   70000
    

    6.4. Using query() for Advanced Filtering

    query() allows you to use SQL-like queries to filter your DataFrame based on complex conditions.

    pythonCopy code# Use query to filter rows where Age > 30 and Salary < 80000
    df_filtered_query = df.query('Age > 30 and Salary < 80000')
    print(df_filtered_query)
    

    Output:

    plaintextCopy code      Name  Age  Salary
    2  Charlie   35   70000
    

    Combining Sorting, Filtering, and Merging

    Now, let’s see how sorting and filtering can be combined with merge, join, concatenation, and comparison operations.

    6.5. Filtering and Sorting with Merge

    After merging two DataFrames, you can filter and sort the result.

    pythonCopy code# Sample DataFrames for merging
    df1 = pd.DataFrame({
        'ID': [1, 2, 3, 4],
        'Name': ['Alice', 'Bob', 'Charlie', 'David']
    })
    
    df2 = pd.DataFrame({
        'ID': [3, 4, 5, 6],
        'Salary': [70000, 80000, 90000, 100000]
    })
    
    # Merge DataFrames (inner join)
    df_merged = pd.merge(df1, df2, on='ID', how='inner')
    
    # Filter rows where Salary > 75000 and sort by 'Salary'
    df_merged_filtered_sorted = df_merged[df_merged['Salary'] > 75000].sort_values(by='Salary', ascending=False)
    print(df_merged_filtered_sorted)
    

    Output:

    plaintextCopy code   ID   Name  Salary
    1   4  David   80000
    0   3  Charlie  70000
    

    6.6. Filtering After Concatenation

    You can filter rows after concatenating multiple DataFrames.

    pythonCopy code# Sample DataFrames for concatenation
    df1 = pd.DataFrame({'Name': ['Alice', 'Bob'], 'Salary': [50000, 60000]})
    df2 = pd.DataFrame({'Name': ['Charlie', 'David'], 'Salary': [70000, 80000]})
    
    # Concatenate DataFrames
    df_concat = pd.concat([df1, df2])
    
    # Filter rows where 'Salary' > 60000
    df_concat_filtered = df_concat[df_concat['Salary'] > 60000]
    print(df_concat_filtered)
    

    Output:

    plaintextCopy code      Name  Salary
    2  Charlie   70000
    3    David   80000
    

    Full Example: Sorting, Filtering, Merging, and Concatenation

    Here’s a complete example that demonstrates how to use sorting, filtering, and merging in a real-world scenario:

    # Sample DataFrames
    df1 = pd.DataFrame({
        'ID': [1, 2, 3, 4],
        'Name': ['Alice', 'Bob', 'Charlie', 'David']
    })
    
    df2 = pd.DataFrame({
        'ID': [3, 4, 5, 6],
        'Dept': ['HR', 'IT', 'Finance', 'Marketing'],
        'Salary': [70000, 80000, 90000, 100000]
    })
    
    # Merge the DataFrames
    df_merged = pd.merge(df1, df2, on='ID', how='outer')
    
    # Filter rows where Salary > 60000 and Dept is 'HR' or 'IT'
    df_filtered = df_merged[(df_merged['Salary'] > 60000) & (df_merged['Dept'].isin(['HR', 'IT']))]
    
    # Sort the result by 'Salary' in descending order
    df_sorted = df_filtered.sort_values(by='Salary', ascending=False)
    
    print(df_sorted)
    

    Output:

    plaintextCopy code   ID     Name Dept  Salary
    1   4    David   IT   80000
    0   3  Charlie   HR   70000
    

    • Sorting (sort_values()): Sort by one or more columns, or by index using sort_index().
    • Advanced Filtering: Filter rows based on complex conditions using boolean indexing, str.contains(), between(), and query().
    • Combining Sorting and Filtering with Merge/Join: You can merge/join DataFrames and then apply sorting and filtering to the combined dataset.
    • Concatenation: After concatenation, apply filtering to refine the data.

    These operations allow you to efficiently manipulate data using pandas, combining data from multiple sources and selecting, sorting, and filtering relevant information. Let me know if you need more examples or specific clarifications!



    Pivoting, melting, and transposing operations in pandas

    In this section, we will dive deeper into the pivoting, melting, and transposing operations in pandas. These are essential for reshaping data in DataFrames, enabling us to structure data for analysis, reporting, or visualization purposes.

    1. Pivoting in pandas

    Pivoting refers to reshaping data by turning unique values from one column into multiple columns (wide format). It allows transforming long-form data (where a single row represents a data point) into wide-form data (where multiple columns represent different aspects of the same data point).

    1.1. pivot() Function

    The pivot() function reshapes data from long to wide format by specifying an index, columns, and values.

    Syntax:

    pythonCopy codedf.pivot(index='row_label', columns='column_label', values='value_column')
    
    • index: The column(s) to use as the new DataFrame’s index.
    • columns: The column(s) to spread across the new DataFrame as headers.
    • values: The values to fill the DataFrame with, based on the pivot.

    1.2. Example of Pivoting

    pythonCopy codeimport pandas as pd
    
    # Sample data in long format
    df = pd.DataFrame({
        'Name': ['Alice', 'Alice', 'Bob', 'Bob'],
        'Year': [2020, 2021, 2020, 2021],
        'Sales': [1000, 1100, 2000, 2100]
    })
    
    # Pivot the DataFrame to show Sales for each Year as columns
    pivot_df = df.pivot(index='Name', columns='Year', values='Sales')
    print(pivot_df)
    

    Output:

    yamlCopy codeYear     2020  2021
    Name
    Alice    1000  1100
    Bob      2000  2100
    

    1.3. Use Cases for Pivoting

    • Summarizing Data: Pivoting is useful when you want to summarize data across multiple categories and present the data in a more readable, wide format.
    • Visualizing Data: It is often used before creating plots or charts, especially when visualizing time-series data for multiple entities.
    • Data Aggregation: Pivoting is helpful when you want to compare multiple measures across different dimensions.

    1.4. Handling Duplicates in Pivoting

    If there are duplicate entries for the same index-column combination, pivot() will raise an error. To handle aggregation of duplicates, use pivot_table().

    2. Pivot Table in pandas

    pivot_table() works similarly to pivot(), but it allows aggregation when there are duplicate values. It can be seen as a more flexible version of pivot().

    2.1. Syntax for pivot_table()

    pythonCopy codedf.pivot_table(index='row_label', columns='column_label', values='value_column', aggfunc='mean')
    
    • aggfunc: The aggregation function to apply (e.g., mean, sum, count, max, etc.). Default is mean.

    2.2. Example of pivot_table()

    pythonCopy codedf = pd.DataFrame({
        'Name': ['Alice', 'Alice', 'Bob', 'Bob', 'Bob'],
        'Year': [2020, 2021, 2020, 2021, 2020],
        'Sales': [1000, 1100, 2000, 2100, 1800]
    })
    
    # Pivot table with sum aggregation for duplicate entries
    pivot_table_df = df.pivot_table(index='Name', columns='Year', values='Sales', aggfunc='sum')
    print(pivot_table_df)
    

    Output:

    yamlCopy codeYear     2020   2021
    Name
    Alice    1000   1100
    Bob      3800   2100
    

    2.3. Use Cases for pivot_table()

    • Handling Duplicates: It is useful when you want to handle duplicate entries by applying an aggregation function (e.g., summing sales across multiple entries).
    • Data Summarization: When performing group-by like operations and you want to summarize data into a matrix.
    • Flexible Aggregation: You can specify various aggregation functions to summarize data (e.g., sum, average, count).

    3. Melting in pandas

    Melting is the opposite of pivoting. It reshapes wide-form data into long-form data by “unpivoting” columns back into rows. This is useful when you want to bring multiple columns into a single column, often for plotting or further analysis.

    3.1. melt() Function

    The melt() function reshapes a DataFrame by transforming columns into rows, returning long-form data.

    Syntax:

    pythonCopy codedf.melt(id_vars=['fixed_column'], value_vars=['columns_to_unpivot'], var_name='variable_name', value_name='value_name')
    
    • id_vars: Columns that should remain fixed (not melted).
    • value_vars: Columns to unpivot (convert from wide to long).
    • var_name: The name of the new “variable” column.
    • value_name: The name of the new “value” column.

    3.2. Example of Melting

    pythonCopy codedf = pd.DataFrame({
        'Name': ['Alice', 'Bob'],
        '2020': [1000, 2000],
        '2021': [1100, 2100]
    })
    
    # Melt the DataFrame from wide to long format
    melted_df = df.melt(id_vars=['Name'], value_vars=['2020', '2021'], var_name='Year', value_name='Sales')
    print(melted_df)
    

    Output:

    yamlCopy code    Name  Year  Sales
    0  Alice  2020   1000
    1    Bob  2020   2000
    2  Alice  2021   1100
    3    Bob  2021   2100
    

    3.3. Use Cases for Melting

    • Preparing Data for Plotting: Many plotting libraries prefer data in long format, where each row is a single data point.
    • Unpivoting Wide Data: When you have data spread across columns and you want to analyze or process it row-wise.
    • Handling Multiple Time Periods: When dealing with time series data, you often have columns for each year or month. Melting can help consolidate these columns into a single “period” column.

    4. Transposing in pandas

    Transposing is the process of swapping rows and columns in a DataFrame, where the index becomes the column headers and vice versa.

    4.1. transpose() or T

    The transpose() function or T attribute is used to interchange rows and columns.

    Syntax:

    pythonCopy codedf.transpose()  # or simply df.T
    

    4.2. Example of Transposing

    pythonCopy codedf = pd.DataFrame({
        'Name': ['Alice', 'Bob'],
        'Age': [25, 30],
        'Salary': [50000, 60000]
    })
    
    # Transpose the DataFrame
    transposed_df = df.transpose()
    print(transposed_df)
    

    Output:

    markdownCopy code              0      1
    Name      Alice    Bob
    Age          25     30
    Salary    50000  60000
    

    4.3. Use Cases for Transposing

    • Data Presentation: Sometimes transposing data can make it easier to present or visualize. For example, showing each record’s details in columns rather than rows.
    • Analyzing Wide Data: If your data has more columns than rows, transposing can help make the dataset easier to inspect.
    • Pivot-Like Operations: In scenarios where you want to transform row-wise data into column-wise data quickly.

    Summary of Pivoting, Melting, and Transposing

    OperationFunctionDescriptionUse Case
    Pivotingpivot()Reshapes data from long to wide format by turning unique values from a column into new column headers.Summarizing data, comparing across categories, preparing for visualization.
    Pivot Tablepivot_table()Similar to pivot(), but allows aggregation for duplicate values and provides more flexibility.Summarizing data, aggregating data with functions (sum, mean, etc.).
    Meltingmelt()Unpivots data from wide to long format, turning columns into rows.Preparing data for plotting, handling time series, unpivoting wide-form data.
    Transposingtranspose()Swaps rows and columns, turning the DataFrame’s index into columns and vice versa.Analyzing data from different perspectives, data presentation, pivot-like operations.

    Conclusion

    1. Pivoting reshapes long data into wide data, which is useful for summarization and comparison across categories.
    2. Pivot tables extend pivoting by allowing aggregation when duplicates are present.
    3. Melting converts wide data into long data, preparing it for time-series analysis or plotting.
    4. Transposing swaps rows and columns to change the perspective of your data, helping with analysis or presentation.

    These reshaping operations are fundamental to data wrangling and make pandas a versatile library for handling complex data structures. Let me know if you need more details or examples on any specific topic!


    Doubts / Questions / Usecase

    ====================================================

    ====================================================

    How to find third best saliered employee in sales department in Python?

    To find the third highest salaried employee in the Sales department in Python using pandas, you can follow these steps:

    Steps:

    1. Filter the employees who belong to the Sales department.
    2. Sort the employees by their salary in descending order.
    3. Extract the third employee based on the salary ranking.

    Example Code:

    import pandas as pd
    
    # Sample data for employees
    data = {
        'Name': ['Alice', 'Bob', 'Charlie', 'David', 'Eva', 'Frank'],
        'Department': ['Sales', 'Sales', 'Marketing', 'Sales', 'HR', 'Sales'],
        'Salary': [50000, 60000, 70000, 80000, 90000, 55000]
    }
    
    # Create a DataFrame
    df = pd.DataFrame(data)
    
    # Step 1: Filter employees in the Sales department
    sales_df = df[df['Department'] == 'Sales']
    
    # Step 2: Sort employees by Salary in descending order
    sorted_sales_df = sales_df.sort_values(by='Salary', ascending=False)
    
    # Step 3: Find the third highest salaried employee
    third_highest_salary = sorted_sales_df.iloc[2]  # Index 2 corresponds to the third entry
    
    # Output the result
    print("Third highest salaried employee in Sales department:")
    print(third_highest_salary)
    

    Explanation:

    1. Filter: The DataFrame is filtered to include only rows where the Department is Sales.
    2. Sort: The filtered DataFrame is sorted by the Salary column in descending order, so the highest salaries appear first.
    3. Extract: The third entry in the sorted DataFrame is selected using iloc[2] (0-based indexing).

    Sample Output:

    Third highest salaried employee in Sales department:
    Name         Frank
    Department    Sales
    Salary       55000
    Name: 5, dtype: object
    

    Alternate Approach (If There Are Duplicate Salaries):

    If salaries are not unique, and you want to find the third distinct highest salary, you can do the following:

    # Step 1: Filter employees in the Sales department
    sales_df = df[df['Department'] == 'Sales']
    
    # Step 2: Get unique salaries, sort them in descending order
    unique_salaries = sales_df['Salary'].drop_duplicates().sort_values(ascending=False)
    
    # Step 3: Get the third highest salary
    third_highest_salary_value = unique_salaries.iloc[2]  # Third highest salary value
    
    # Step 4: Find the employee(s) with this salary
    third_highest_employee = sales_df[sales_df['Salary'] == third_highest_salary_value]
    
    # Output the result
    print("Third highest salaried employee(s) in Sales department:")
    print(third_highest_employee)
    

    This approach ensures you get the third distinct highest salary, in case there are multiple employees with the same salary value.

    can we use something like limiting the dataframe output?

    Yes, in pandas, you can limit the number of rows returned from a DataFrame by using head(), tail(), or iloc[]. This is useful when you want to fetch only a limited number of results without needing to manually slice the DataFrame.

    How to Limit DataFrame Output in pandas

    1. head(n): Returns the first n rows of the DataFrame.
    2. iloc[:n]: Selects the first n rows by integer-location based indexing.
    3. Using iloc[] with sort_values(): When limiting rows after sorting.

    Example: Limiting DataFrame Output After Sorting

    Here’s how you can find the third highest salaried employee in the Sales department and limit the DataFrame output to only a few rows:

    import pandas as pd
    
    # Sample DataFrame
    data = {
        'Name': ['Alice', 'Bob', 'Charlie', 'David', 'Eva', 'Frank'],
        'Department': ['Sales', 'Sales', 'Marketing', 'Sales', 'HR', 'Sales'],
        'Salary': [50000, 60000, 70000, 80000, 90000, 55000]
    }
    
    df = pd.DataFrame(data)
    
    # Step 1: Filter employees in the Sales department
    sales_df = df[df['Department'] == 'Sales']
    
    # Step 2: Sort employees by Salary in descending order
    sorted_sales_df = sales_df.sort_values(by='Salary', ascending=False)
    
    # Step 3: Limit the result to the first 3 rows using head()
    limited_sales_df = sorted_sales_df.head(3)
    
    # Output the result (third highest employee)
    print("Top 3 salaried employees in Sales department:")
    print(limited_sales_df)
    

    Explanation:

    1. head(3): Limits the DataFrame to the first 3 rows after sorting by salary.
    2. You can use iloc[2] to get the third-highest salary after limiting the result to just the top 3.

    Sample Output:

    Top 3 salaried employees in Sales department:
        Name Department  Salary
    3  David      Sales   80000
    1    Bob      Sales   60000
    5  Frank      Sales   55000
    

    Alternate Limiting Using iloc[] for Exact Indexing:

    You can directly use iloc[] to limit and access the rows you want:

    # Fetch only the third row (third highest salaried employee) after sorting
    third_highest_salary = sorted_sales_df.iloc[2]
    
    print("Third highest salaried employee in Sales department:")
    print(third_highest_salary)
    

    Sample Output:

    Third highest salaried employee in Sales department:
    Name         Frank
    Department    Sales
    Salary       55000
    Name: 5, dtype: object
    

    Use Case of Limiting the DataFrame:

    Limiting is useful when:

    • You want to display the top N results (e.g., top 5 highest salaries).
    • You are dealing with large DataFrames and only need to inspect a small portion of the data.
    • You want to optimize performance when working with large datasets by avoiding fetching more rows than needed.



    1. Saving DataFrame as CSV

    The to_csv() function exports the DataFrame to a CSV (Comma-Separated Values) file.

    Syntax:

    pythonCopy codedf.to_csv('file_name.csv', index=False)
    
    • index=False: Excludes row indices in the saved file.
    • sep=',': (Optional) Specify the delimiter (default is a comma).

    Example:

    pythonCopy codeimport pandas as pd
    
    data = {'Name': ['Alice', 'Bob', 'Charlie'], 'Age': [25, 30, 35]}
    df = pd.DataFrame(data)
    
    # Save DataFrame as CSV
    df.to_csv('output.csv', index=False)
    

    2. Saving DataFrame as Excel

    The to_excel() function exports the DataFrame to an Excel (.xlsx) file.

    Syntax:

    pythonCopy codedf.to_excel('file_name.xlsx', sheet_name='Sheet1', index=False)
    
    • sheet_name='Sheet1': (Optional) Specify the sheet name.
    • index=False: Excludes row indices in the saved file.

    Example:

    pythonCopy code# Save DataFrame as Excel file
    df.to_excel('output.xlsx', sheet_name='Data', index=False)
    

    Note: The to_excel() function requires openpyxl for .xlsx files, so make sure it’s installed (pip install openpyxl).


    3. Saving DataFrame as JSON

    The to_json() function exports the DataFrame to a JSON (JavaScript Object Notation) file, useful for web applications.

    Syntax:

    pythonCopy codedf.to_json('file_name.json', orient='records')
    
    • orient='records': Each row is exported as a separate JSON object.

    Example:

    pythonCopy code# Save DataFrame as JSON file
    df.to_json('output.json', orient='records')
    

    4. Saving DataFrame to SQL Database

    The to_sql() function exports the DataFrame to a SQL table in a relational database.

    Syntax:

    pythonCopy codedf.to_sql('table_name', con=connection, if_exists='replace', index=False)
    
    • con: The SQLAlchemy engine or database connection object.
    • if_exists='replace': Replaces the table if it already exists; use 'append' to add data to the existing table.
    • index=False: Excludes row indices in the table.

    Example:

    pythonCopy codefrom sqlalchemy import create_engine
    
    # Create a database connection (SQLite in this example)
    engine = create_engine('sqlite:///my_database.db')
    
    # Save DataFrame to SQL table
    df.to_sql('people', con=engine, if_exists='replace', index=False)
    

    5. Saving DataFrame as Parquet

    The to_parquet() function exports the DataFrame to a Parquet file, which is a popular columnar storage format, especially in big data environments.

    Syntax:

    pythonCopy codedf.to_parquet('file_name.parquet')
    

    Example:

    # Save DataFrame as Parquet file
    df.to_parquet('output.parquet')
    

    Note: Saving as Parquet requires pyarrow or fastparquet libraries (pip install pyarrow or pip install fastparquet).


    6. Saving DataFrame as HTML

    The to_html() function saves the DataFrame as an HTML table, useful for displaying data on web pages.

    Syntax:

    df.to_html('file_name.html', index=False)
    

    Example:

    # Save DataFrame as HTML file
    df.to_html('output.html', index=False)
    

    Summary

    FormatMethodExample Syntax
    CSVto_csv()df.to_csv('file.csv', index=False)
    Excelto_excel()df.to_excel('file.xlsx', index=False)
    JSONto_json()df.to_json('file.json', orient='records')
    SQLto_sql()df.to_sql('table', con=engine, index=False)
    Parquetto_parquet()df.to_parquet('file.parquet')
    HTMLto_html()df.to_html('file.html', index=False)


    1. Basics and Setup

    OperationCommandDescription
    Import pandasimport pandas as pdImport pandas library.
    Read CSVpd.read_csv('file.csv')Read CSV file into DataFrame.
    Read Excelpd.read_excel('file.xlsx')Read Excel file into DataFrame.
    Save to CSVdf.to_csv('file.csv')Save DataFrame to CSV.
    Save to Exceldf.to_excel('file.xlsx')Save DataFrame to Excel.

    2. Data Inspection

    OperationCommandDescription
    Head and Taildf.head(n), df.tail(n)Display first/last n rows.
    Shape of DataFramedf.shapeGet the dimensions (rows, columns) of DataFrame.
    Info Summarydf.info()Overview of DataFrame (non-null count, dtypes).
    Column Namesdf.columnsList all column names.
    Data Typesdf.dtypesCheck data types of all columns.
    Describe Datadf.describe()Summary statistics for numerical columns.
    Unique Valuesdf['col'].unique()Get unique values in a column.
    Value Countsdf['col'].value_counts()Count unique values in a column.

    3. Selecting and Indexing

    OperationCommandDescription
    Select Column(s)df['col'], df[['col1', 'col2']]Select one or multiple columns.
    Select Row by Labeldf.loc['label']Select row by index label.
    Select Row by Positiondf.iloc[position]Select row by integer position.
    Slice Rowsdf[5:10]Slice rows from index 5 to 10.
    Conditional Selectiondf[df['col'] > value]Filter rows based on condition.
    Set Indexdf.set_index('col', inplace=True)Set column as index.
    Reset Indexdf.reset_index(inplace=True)Reset index to default.
    Multi-Indexingpd.MultiIndex.from_arrays([list1, list2])Create hierarchical indexing.

    4. Data Cleaning

    OperationCommandDescription
    Rename Columnsdf.rename(columns={'old': 'new'})Rename columns in DataFrame.
    Drop Columnsdf.drop(['col1', 'col2'], axis=1)Drop specified columns.
    Drop Rowsdf.drop([0, 1], axis=0)Drop specified rows.
    Drop Missing Valuesdf.dropna()Drop rows with NaN values.
    Fill Missing Valuesdf.fillna(value)Fill NaN values with a specified value.
    Replace Valuesdf.replace({old: new})Replace values throughout DataFrame.
    Remove Duplicatesdf.drop_duplicates()Remove duplicate rows.
    Change Data Typedf['col'] = df['col'].astype(type)Change data type of a column.
    Handle Outliersdf[df['col'] < threshold]Filter outliers based on threshold.

    5. Data Transformation

    OperationCommandDescription
    Apply Function to Columndf['col'].apply(func)Apply function to each element in column.
    Lambda Functiondf['col'].apply(lambda x: x + 2)Apply lambda function to a column.
    Map Valuesdf['col'].map({old: new})Map values in column to new values.
    Replace with Conditionsdf['col'] = df['col'].where(cond, new_val)Conditionally replace values.
    Binning Datapd.cut(df['col'], bins)Bin data into intervals.
    Standardize/Normalize(df['col'] - df['col'].mean()) / df['col'].std()Standardize column.

    6. Aggregation and Grouping

    OperationCommandDescription
    Sum/Mean/Max/Mindf['col'].sum(), df['col'].mean()Basic aggregation functions on columns.
    GroupBydf.groupby('col').sum()Group by column and apply aggregation.
    Multiple Aggregationsdf.groupby('col').agg({'col1': 'sum', 'col2': 'mean'})Multiple aggregations on grouped data.
    Pivot Tabledf.pivot_table(index='col1', columns='col2', values='col3', aggfunc='mean')Create pivot table.
    Cumulative Sumdf['col'].cumsum()Calculate cumulative sum.
    Rolling Calculationsdf['col'].rolling(window=3).mean()Apply rolling calculations (moving average, etc.).

    7. Merging, Joining, Concatenating

    OperationCommandDescription
    Concatenate DataFramespd.concat([df1, df2])Concatenate DataFrames along rows or columns.
    Merge DataFramespd.merge(df1, df2, on='key')Merge DataFrames based on key.
    Left/Right/Inner/Outer Joinpd.merge(df1, df2, how='left', on='key')Perform different types of joins.
    Join on Indexdf1.join(df2, on='key')Join using index as the key.
    Append DataFramedf1.append(df2)Append rows of df2 to df1.

    8. Reshaping Data

    OperationCommandDescription
    Pivotdf.pivot(index='col1', columns='col2', values='col3')Reshape data (long to wide).
    Pivot Tabledf.pivot_table(index='col1', columns='col2', values='col3', aggfunc='mean')Reshape with aggregation.
    Meltdf.melt(id_vars=['col1'], value_vars=['col2'])Unpivot from wide to long format.
    Transposedf.TTranspose rows and columns.
    Stackdf.stack()Stack columns to row MultiIndex.
    Unstackdf.unstack()Unstack row index to columns.

    9. Working with Dates

    OperationCommandDescription
    Convert to Datetimepd.to_datetime(df['col'])Convert column to datetime format.
    Extract Year/Month/Daydf['col'].dt.year, df['col'].dt.monthExtract parts of date.
    Date Rangepd.date_range(start, end, freq='D')Generate a range of dates.
    Time Difference(df['end'] - df['start']).dt.daysCalculate time difference in days.
    Resampledf.resample('M').mean()Resample time series to a specific frequency.

    10. String Operations

    OperationCommandDescription
    Convert to Lowercasedf['col'].str.lower()Convert strings to lowercase.
    Contains Substringdf['col'].str.contains('substring')Check if strings contain a substring.
    Replace Substringdf['col'].str.replace('old', 'new')Replace substring in strings.
    Split Stringsdf['col'].str.split(' ')Split strings on a delimiter.
    Length of Stringsdf['col'].str.len()Get length of each string in a column.

    11. Useful Plotting

    OperationCommandDescription
    Line Plotdf.plot.line()Plot data as a line chart.
    Bar Plotdf.plot.bar()Plot data as a bar chart.
    Histogramdf['col'].plot.hist()Plot data as a histogram.
    Box Plotdf.plot.box()Plot data as a box plot.
    Scatter Plotdf.plot.scatter(x='col1', y='col2')Scatter plot between two columns.

    12. Exporting Data

    OperationCommandDescription
    To CSVdf.to_csv('file.csv', index=False)Export DataFrame to CSV file.
    To Exceldf.to_excel('file.xlsx', index=False)Export DataFrame to Excel file.
    To JSONdf.to_json('file.json')Export DataFrame to JSON file.
    To SQLdf.to_sql('table_name', con=connection)Export DataFrame to SQL database.

    This cheat sheet covers the essentials of pandas for data manipulation, analysis, and exporting results. With this reference, you can quickly review and apply the most common pandas operations.

  • I have divided a pyspark big script in many steps –by using steps1=”’ some codes”’ till steps7, i want to execute all these steps one after another and also if needed some steps can be not be executed. if any steps fails then then next step only get executed if mentioned to run even if previous has failed.

    To execute multiple steps (defined as strings of code) sequentially in PySpark, while controlling which steps should execute next based on success or failure, you can implement a mechanism for error handling and conditional execution.

    Here’s a sample approach using Python’s exec() function to run the steps dynamically and adding a control structure to handle step execution and failures.

    Step-by-Step Approach:

    1. Define the steps as strings (already done).
    2. Create a control table or list to specify:
      • Whether to run the step.
      • Whether the next step should run even if the current step fails.
    3. Use try and except blocks to handle execution and log errors.
    4. Conditional execution of the next steps based on success/failure.

    Example:

    # Defining your steps as code blocks
    step1 = '''
    print("Executing Step 1")
    df = spark.read.csv("data1.csv")
    '''
    
    step2 = '''
    print("Executing Step 2")
    df = df.filter(df['age'] > 30)
    '''
    
    step3 = '''
    print("Executing Step 3")
    df = df.groupBy("country").count()
    '''
    
    step4 = '''
    print("Executing Step 4")
    # Assume this step fails
    raise ValueError("Simulated failure in Step 4")
    '''
    
    # Control table: run_step=True means execute, run_on_failure=True means continue if previous failed
    control_table = [
        {"step": step1, "run_step": True, "run_on_failure": True},   # Step 1
        {"step": step2, "run_step": True, "run_on_failure": False},  # Step 2
        {"step": step3, "run_step": True, "run_on_failure": True},   # Step 3
        {"step": step4, "run_step": True, "run_on_failure": False},  # Step 4
    ]
    
    # Function to execute the steps
    def execute_steps(control_table):
        for index, step_info in enumerate(control_table):
            step_code = step_info["step"]
            run_step = step_info["run_step"]
            run_on_failure = step_info["run_on_failure"]
    
            # If the step is marked to run
            if run_step:
                try:
                    print(f"nRunning Step {index + 1}...")
                    exec(step_code)
                    print(f"Step {index + 1} executed successfully.")
                except Exception as e:
                    print(f"Step {index + 1} failed with error: {e}")
    
                    # Check if the next step should be skipped if failure occurs
                    if not run_on_failure:
                        print(f"Skipping next step due to failure in Step {index + 1}.")
                        break
            else:
                print(f"Skipping Step {index + 1}.")
    
    # Execute all steps based on the control table
    execute_steps(control_table)
    

    Key Points:

    1. Control Table: The control_table holds information on whether each step should run and whether the next step should run even if the current step fails (run_on_failure).
      • run_step=True: Indicates the step should be executed.
      • run_on_failure=True: If the previous step fails, continue executing this step.
    2. Error Handling: The try and except block ensures that if a step fails, you can control whether to continue with the next steps.
    3. Execution Logic: Steps are executed with exec(step_code) dynamically. Each step’s execution is logged with success or failure information.

    This allows you to flexibly handle step execution and error management in your PySpark scripts.

    to maintain a log of successes and failures, or add a retry mechanism for certain steps. You can also store execution results in a control table like the one you mentioned in your earlier queries (to save step results in Oracle).

    from time import sleep
    import cx_Oracle  # You'll need cx_Oracle library to connect to Oracle
    
    # Example Steps
    step1 = '''
    print("Executing Step 1")
    df = spark.read.csv("data1.csv")
    '''
    
    step2 = '''
    print("Executing Step 2")
    df = df.filter(df['age'] > 30)
    '''
    
    step3 = '''
    print("Executing Step 3")
    df = df.groupBy("country").count()
    '''
    
    step4 = '''
    print("Executing Step 4")
    # Simulated failure
    raise ValueError("Simulated failure in Step 4")
    '''
    
    # Control table: run_step=True means execute, run_on_failure=True means continue if previous failed
    control_table = [
        {"step": step1, "run_step": True, "run_on_failure": True, "retry_count": 2},  # Step 1
        {"step": step2, "run_step": True, "run_on_failure": False, "retry_count": 1}, # Step 2
        {"step": step3, "run_step": True, "run_on_failure": True, "retry_count": 2},  # Step 3
        {"step": step4, "run_step": True, "run_on_failure": False, "retry_count": 1}, # Step 4
    ]
    
    # Function to connect to Oracle and insert logs
    def insert_into_control_table(step_number, status, retry_count):
        connection = cx_Oracle.connect('user/password@hostname:port/service_name')
        cursor = connection.cursor()
    
        # Insert execution details into control table
        insert_query = '''
            INSERT INTO control_table (step_number, status, retry_count, executed_at)
            VALUES (:1, :2, :3, SYSTIMESTAMP)
        '''
        cursor.execute(insert_query, (step_number, status, retry_count))
        connection.commit()
        cursor.close()
        connection.close()
    
    # Function to execute the steps with logging and retry mechanism
    def execute_steps_with_retry(control_table):
        for index, step_info in enumerate(control_table):
            step_code = step_info["step"]
            run_step = step_info["run_step"]
            run_on_failure = step_info["run_on_failure"]
            max_retries = step_info["retry_count"]
            step_number = index + 1
            retries = 0
            success = False
    
            if run_step:
                while retries <= max_retries:
                    try:
                        print(f"nRunning Step {step_number} (Attempt {retries + 1})...")
                        exec(step_code)  # Execute the step
                        success = True
                        print(f"Step {step_number} executed successfully.")
                        insert_into_control_table(step_number, "SUCCESS", retries)  # Log success
                        break  # Exit loop if successful
                    except Exception as e:
                        retries += 1
                        print(f"Step {step_number} failed with error: {e}")
                        if retries > max_retries:
                            insert_into_control_table(step_number, "FAILURE", retries)  # Log failure
                            print(f"Max retries exceeded for Step {step_number}.")
                            if not run_on_failure:
                                print(f"Skipping remaining steps due to failure in Step {step_number}.")
                                return  # Exit if we shouldn't continue after failure
                        else:
                            print(f"Retrying Step {step_number} in 5 seconds...")
                            sleep(5)  # Wait before retrying
            else:
                print(f"Skipping Step {step_number}.")
    
    # Execute steps with logging and retry
    execute_steps_with_retry(control_table)
    

    control table as dataframe and can be saved in Hive or export as excel

    from time import sleep
    import pandas as pd
    
    # Sample PySpark DataFrame for control table
    from pyspark.sql import Row
    from pyspark.sql import SparkSession
    
    # Initialize Spark session
    spark = SparkSession.builder
        .appName("Step Execution Logging")
        .enableHiveSupport()
        .getOrCreate()
    
    # Control table as a list of dictionaries
    control_data = [
        {"step_number": 1, "step_name": "Step 1", "run_step": True, "run_on_failure": True, "retry_count": 2},
        {"step_number": 2, "step_name": "Step 2", "run_step": True, "run_on_failure": False, "retry_count": 1},
        {"step_number": 3, "step_name": "Step 3", "run_step": True, "run_on_failure": True, "retry_count": 2},
        {"step_number": 4, "step_name": "Step 4", "run_step": True, "run_on_failure": False, "retry_count": 1},
    ]
    
    # Convert control data to a DataFrame
    control_table_df = spark.createDataFrame(Row(**x) for x in control_data)
    
    # Function to update control table DataFrame after each step
    def update_control_table(step_number, status, retries):
        global control_table_df
        control_table_df = control_table_df.withColumn(
            "status", control_table_df["step_number"].cast("string"))  # Simulated update, replace with real status
    
        control_table_df = control_table_df.withColumn(
            "retry_count", control_table_df["step_number"].cast("int"))  # Simulated update, replace retries
    
    # Steps and retry logic remains the same
    def execute_steps_with_retry(control_table_df):
        for row in control_table_df.collect():
            step_number = row['step_number']
            run_step = row['run_step']
            run_on_failure = row['run_on_failure']
            retries = 0
            max_retries = row['retry_count']
            step_code = f"Step_{step_number}"
    
            if run_step:
                while retries <= max_retries:
                    try:
                        print(f"nRunning Step {step_number} (Attempt {retries + 1})...")
                        exec(step_code)  # Execute the step
                        update_control_table(step_number, "SUCCESS", retries)  # Log success
                        break
                    except Exception as e:
                        retries += 1
                        print(f"Step {step_number} failed with error: {e}")
                        if retries > max_retries:
                            update_control_table(step_number, "FAILURE", retries)  # Log failure
                            print(f"Max retries exceeded for Step {step_number}.")
                            if not run_on_failure:
                                print(f"Skipping remaining steps due to failure in Step {step_number}.")
                                return
                        else:
                            print(f"Retrying Step {step_number} in 5 seconds...")
                            sleep(5)
            else:
                print(f"Skipping Step {step_number}.")
    
    # Execute steps
    execute_steps_with_retry(control_table_df)
    
    # 1. Save to Hive
    control_table_df.write.mode("overwrite").saveAsTable("your_hive_database.control_table")
    
    # 2. Export to Excel (using pandas)
    # Convert to Pandas DataFrame
    control_table_pandas = control_table_df.toPandas()
    
    # Export to Excel
    control_table_pandas.to_excel("control_table_log.xlsx", index=False)
    
    print("Execution results saved to Hive and exported as Excel.")
    
    from time import sleep
    import pandas as pd
    
    # Sample PySpark DataFrame for control table
    from pyspark.sql import Row
    from pyspark.sql import SparkSession
    
    # Initialize Spark session
    spark = SparkSession.builder
        .appName("Step Execution Logging")
        .enableHiveSupport()
        .getOrCreate()
    
    # Control table as a list of dictionaries
    control_data = [
        {"step_number": 1, "step_name": "Step 1", "run_step": True, "run_on_failure": True, "retry_count": 2},
        {"step_number": 2, "step_name": "Step 2", "run_step": True, "run_on_failure": False, "retry_count": 1},
        {"step_number": 3, "step_name": "Step 3", "run_step": True, "run_on_failure": True, "retry_count": 2},
        {"step_number": 4, "step_name": "Step 4", "run_step": True, "run_on_failure": False, "retry_count": 1},
    ]
    
    # Convert control data to a DataFrame
    control_table_df = spark.createDataFrame(Row(**x) for x in control_data)
    
    # Function to update control table DataFrame after each step
    def update_control_table(step_number, status, retries):
        global control_table_df
        control_table_df = control_table_df.withColumn(
            "status", control_table_df["step_number"].cast("string"))  # Simulated update, replace with real status
    
        control_table_df = control_table_df.withColumn(
            "retry_count", control_table_df["step_number"].cast("int"))  # Simulated update, replace retries
    
    # Steps and retry logic remains the same
    def execute_steps_with_retry(control_table_df):
        for row in control_table_df.collect():
            step_number = row['step_number']
            run_step = row['run_step']
            run_on_failure = row['run_on_failure']
            retries = 0
            max_retries = row['retry_count']
            step_code = "Step_{}".format(step_number)  # Replaced f-string with .format()
    
            if run_step:
                while retries <= max_retries:
                    try:
                        print("nRunning Step {} (Attempt {})...".format(step_number, retries + 1))  # Replaced f-string
                        exec(step_code)  # Execute the step
                        update_control_table(step_number, "SUCCESS", retries)  # Log success
                        break
                    except Exception as e:
                        retries += 1
                        print("Step {} failed with error: {}".format(step_number, e))  # Replaced f-string
                        if retries > max_retries:
                            update_control_table(step_number, "FAILURE", retries)  # Log failure
                            print("Max retries exceeded for Step {}.".format(step_number))  # Replaced f-string
                            if not run_on_failure:
                                print("Skipping remaining steps due to failure in Step {}.".format(step_number))  # Replaced f-string
                                return
                        else:
                            print("Retrying Step {} in 5 seconds...".format(step_number))  # Replaced f-string
                            sleep(5)
            else:
                print("Skipping Step {}.".format(step_number))  # Replaced f-string
    
    # Execute steps
    execute_steps_with_retry(control_table_df)
    
    # 1. Save to Hive
    control_table_df.write.mode("overwrite").saveAsTable("your_hive_database.control_table")
    
    # 2. Export to Excel (using pandas)
    # Convert to Pandas DataFrame
    control_table_pandas = control_table_df.toPandas()
    
    # Export to Excel
    control_table_pandas.to_excel("control_table_log.xlsx", index=False)
    
    print("Execution results saved to Hive and exported as Excel.")
    
    # Define your step functions
    def Step_1():
        print("Executing Step 1")
    
    def Step_2():
        print("Executing Step 2")
    
    # Your step_code contains the name of the function to execute
    step_code = "Step_1"
    
    # Fetch the function from globals and execute it
    exec(globals()[step_code]())
    
    from time import sleep
    import pandas as pd
    from pyspark.sql import Row
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col
    
    # Initialize Spark session
    spark = SparkSession.builder
        .appName("Step Execution Logging")
        .enableHiveSupport()
        .getOrCreate()
    
    # Define schema and control data directly
    schema = ["step_number", "step_name", "run_step", "run_on_failure", "retry_count"]
    control_data = [
        (1, "Step 1", True, True, 2),
        (2, "Step 2", True, False, 1),
        (3, "Step 3", True, True, 2),
        (4, "Step 4", True, False, 1)
    ]
    control_table_df = spark.createDataFrame(control_data, schema=schema)
    
    # Initialize a DataFrame to log job statuses
    job_status_data = []
    job_status_df = spark.createDataFrame(job_status_data, schema=["step_number", "table_name", "action", "count"])
    
    def update_control_table(step_number, status, retries):
        global control_table_df
        control_table_df = control_table_df.withColumn(
            "status", col("step_number").cast("string"))  # Simulated update, replace with real status
    
        control_table_df = control_table_df.withColumn(
            "retry_count", col("step_number").cast("int"))  # Simulated update, replace retries
    
    def log_job_status(step_number, table_name, action, count):
        global job_status_df
        new_status = [(step_number, table_name, action, count)]
        new_status_df = spark.createDataFrame(new_status, schema=["step_number", "table_name", "action", "count"])
        job_status_df = job_status_df.union(new_status_df)
    
    def execute_steps_with_retry(control_table_df):
        for row in control_table_df.collect():
            step_number = row['step_number']
            run_step = row['run_step']
            run_on_failure = row['run_on_failure']
            retries = 0
            max_retries = row['retry_count']
            step_code = "Step_{}".format(step_number)  # Using .format()
    
            if run_step:
                while retries <= max_retries:
                    try:
                        print("nRunning Step {} (Attempt {})...".format(step_number, retries + 1))
    
                        # Assume the step_code variable contains the function name
                        # Fetch the function from globals and execute it
                        exec(globals()[step_code]())
    
                        # Log table actions and counts here (dummy values used)
                        log_job_status(step_number, "some_table", "save", 1000)
    
                        update_control_table(step_number, "SUCCESS", retries)  # Log success
                        break
                    except Exception as e:
                        retries += 1
                        print("Step {} failed with error: {}".format(step_number, e))
                        if retries > max_retries:
                            update_control_table(step_number, "FAILURE", retries)  # Log failure
                            print("Max retries exceeded for Step {}.".format(step_number))
                            if not run_on_failure:
                                print("Skipping remaining steps due to failure in Step {}.".format(step_number))
                                return
                        else:
                            print("Retrying Step {} in 5 seconds...".format(step_number))
                            sleep(5)
            else:
                print("Skipping Step {}.".format(step_number))
    
            # Cache and uncache DataFrames at the end of the step
            df_to_cache = spark.table("some_table")  # Replace with the DataFrame you are working with
            df_to_cache.cache()
            df_to_cache.unpersist()  # Uncache after step completion
    
    # Define step functions for demonstration
    def Step_1():
        # Your Step_1 logic here
        print("Executing Step 1")
    
    def Step_2():
        # Your Step_2 logic here
        print("Executing Step 2")
    
    # Execute steps
    execute_steps_with_retry(control_table_df)
    
    # 1. Save control table and job status to Hive
    control_table_df.write.mode("overwrite").saveAsTable("your_hive_database.control_table")
    job_status_df.write.mode("overwrite").saveAsTable("your_hive_database.job_status")
    
    # 2. Export to Excel (using pandas)
    # Convert to Pandas DataFrames
    control_table_pandas = control_table_df.toPandas()
    job_status_pandas = job_status_df.toPandas()
    
    # Export to Excel
    with pd.ExcelWriter("execution_results.xlsx") as writer:
        control_table_pandas.to_excel(writer, sheet_name='Control Table', index=False)
        job_status_pandas.to_excel(writer, sheet_name='Job Status', index=False)
    
    print("Execution results saved to Hive and exported as Excel.")
    
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import date_add, date_format, lit
    from pyspark.sql.types import IntegerType
    from datetime import datetime
    
    # Initialize Spark session
    spark = SparkSession.builder
        .appName("Generate YearMonth List")
        .getOrCreate()
    
    # Get current date
    current_date = datetime.now().strftime("%Y-%m-%d")
    
    # Create a DataFrame with current date
    df = spark.createDataFrame([(current_date,)], ["current_date"])
    
    # Generate a sequence of -12 to +12 months (last and next 12 months)
    months_range = list(range(-12, 13))
    
    # Use date_add to calculate past and future months
    months_df = df.select([date_format(date_add(lit(current_date), 30 * i), 'yyyyMM').alias(f'month_{i}') for i in months_range])
    
    # Collect values into a list (or you can keep them as separate variables)
    months_list = months_df.collect()[0]
    
    # Print the list of last and next 12 months
    for i, month in enumerate(months_list):
        print(f'Month_{i-12}: {month}')
    
    # Example of accessing each month as separate variables
    month_vars = {f"month_{i}": months_list[i] for i in range(len(months_list))}
    
    # Now you can access individual months
    next_month = month_vars['month_1']  # Next month
    last_month = month_vars['month_-1']  # Previous month
    
    from datetime import datetime, timedelta
    from pyspark.sql import SparkSession
    from pyspark.sql import Row
    
    # Initialize Spark session
    spark = SparkSession.builder
        .appName("Generate YearMonth List")
        .getOrCreate()
    
    # Get the current date
    current_date = datetime.now()
    
    # Function to generate year-month (yyyyMM) format for a given date
    def generate_year_month(date):
        return date.strftime('%Y%m')
    
    # Create a list to store the year-month values
    months_list = []
    
    # Generate the next 12 months (including the current month)
    for i in range(0, 13):
        future_date = current_date + timedelta(days=30 * i)  # Approximate to 30 days per month
        months_list.append(generate_year_month(future_date))
    
    # Generate the last 12 months
    for i in range(1, 13):
        past_date = current_date - timedelta(days=30 * i)  # Approximate to 30 days per month
        months_list.append(generate_year_month(past_date))
    
    # Convert the list into a DataFrame for further operations in PySpark
    months_rdd = spark.sparkContext.parallelize(months_list)
    months_df = spark.createDataFrame(months_rdd.map(lambda x: Row(month=x)))
    
    # Show the DataFrame
    months_df.show()
    
    # Example of accessing specific months in the list
    print("Next month: ", months_list[1])
    print("Previous month: ", months_list[-1])
    
  • How to code in Pyspark a Complete ETL job using only Pyspark sql api not dataframe specific API?

    Here’s an example of a complete ETL (Extract, Transform, Load) job using PySpark SQL API:

    from pyspark.sql import SparkSession
    
    # Create SparkSession
    spark = SparkSession.builder.appName("ETL Job").getOrCreate()
    
    # Extract (Read from source)
    spark.sql("CREATE TEMPORARY VIEW sales USING org.apache.spark.sql.parquet OPTIONS (path 'sales_data.parquet')")
    spark.sql("CREATE TEMPORARY VIEW customers USING org.apache.spark.sql.parquet OPTIONS (path 'customer_data.parquet')")
    
    # Transform (Join, Filter, Aggregate)
    spark.sql("""
        SELECT
            s.order_id,
            c.customer_name,
            SUM(s.order_total) AS total_sales
        FROM
            sales s
        JOIN
            customers c ON s.customer_id = c.customer_id
        WHERE
            s.order_date >= '2022-01-01'
        GROUP BY
            s.order_id,
            c.customer_name
    """).createOrReplaceTempView("transformed_data")
    
    # Load (Write to target)
    spark.sql("CREATE TABLE IF NOT EXISTS transformed_sales USING parquet OPTIONS (path 'transformed_sales_data.parquet')")
    spark.sql("INSERT INTO TABLE transformed_sales SELECT * FROM transformed_data")
    
    # Verify loaded data
    spark.sql("SELECT * FROM transformed_sales").show()

    Explanation

    1. Extract: Read data from source (Parquet files) into temporary views.
    2. Transform: Perform transformations (join, filter, aggregate) using SQL queries.
    3. Load: Write transformed data to target (Parquet file).

    Tips and Variations

    1. Use CREATE TABLE instead of CREATE TEMPORARY VIEW for persistent storage.
    2. Specify storage formats (e.g., USING org.apache.spark.sql.json).
    3. Utilize SQL functions (e.g., DATE_TRUNC, CONCAT).
    4. Leverage Spark SQL’s built-in functions (e.g., IN, LIKE).
    5. Optimize performance with partitioning and caching.

    PySpark ETL script that incorporates : control table management, job status tracking, data pre-checks, retries, dynamic broadcasting, caching, and logging.

    Here’s a complete PySpark ETL script that incorporates all the features we discussed: control table management, job status tracking, data pre-checks, retries, dynamic broadcasting, caching, and logging. This is a robust framework for large-scale ETL jobs running in a production environment.

    Complete PySpark ETL Code:

    from pyspark.sql import SparkSession
    from datetime import datetime, timedelta
    import logging
    
    # Initialize logger
    logger = logging.getLogger('ETL_Logger')
    logger.setLevel(logging.INFO)
    
    # Spark session setup
    spark = SparkSession.builder
        .appName("ETL_Job")
        .enableHiveSupport()
        .getOrCreate()
    
    # Function to update job status in the job status table
    def update_job_status(job_id, status, error_message=None):
        end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    
        # Update job status in Hive table
        spark.sql(f"""
            UPDATE job_status_table
            SET status = '{status}',
                end_time = '{end_time}',
                error_message = '{error_message}'
            WHERE job_id = {job_id}
        """)
    
        logger.info(f"Job {job_id} status updated to {status}")
    
    # Function to check data availability for all source tables
    def check_data_availability(sources, month):
        for source in sources:
            logger.info(f"Checking data availability for source: {source}")
    
            # Count rows in source table for the given month
            count_df = spark.sql(f"SELECT COUNT(1) FROM {source} WHERE month = '{month}'")
            count = count_df.collect()[0][0]
    
            # If no data exists for the current month, fail the job
            if count == 0:
                logger.error(f"No data available for {source} for month {month}. Failing the job.")
                raise Exception(f"Data not available for {source}")
    
            # Compare with previous month count
            prev_month = (datetime.now().replace(day=1) - timedelta(days=1)).strftime('%Y-%m')
            prev_count_df = spark.sql(f"SELECT COUNT(1) FROM {source} WHERE month = '{prev_month}'")
            prev_count = prev_count_df.collect()[0][0]
    
            logger.info(f"Source {source}: Current month count = {count}, Previous month count = {prev_count}")
    
            # Insert source count into control table
            spark.sql(f"""
                INSERT INTO control_table
                (step_id, step_name, priority, source_table, source_count, status, run_date)
                VALUES (1, 'read {source}', 1, '{source}', {count}, 'pending', current_date())
            """)
    
    # Main ETL job function
    def etl_job(job_id, sources, month):
        try:
            logger.info(f"Starting ETL job with Job ID: {job_id}")
    
            # Step 1: Check Data Availability
            check_data_availability(sources, month)
    
            # Step 2: Read data from sources (assuming they are in Hive tables)
            dataframes = {}
            for source in sources:
                logger.info(f"Reading data from {source}")
                df = spark.sql(f"SELECT * FROM {source} WHERE month = '{month}'")
                df.createOrReplaceTempView(f"{source}_view")
                dataframes[source] = df
    
            # Dynamic broadcasting of smaller tables
            for table in dataframes:
                df_size = dataframes[table].rdd.map(lambda x: len(str(x))).sum()
                if df_size < 100 * 1024 * 1024:  # Size threshold for broadcasting (100MB)
                    logger.info(f"Broadcasting table: {table}")
                    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", df_size)
    
            # Step 3: Perform joins and transformations
            logger.info("Performing data transformations and joins")
    
            # First join
            join_df1 = spark.sql("""
                SELECT a.*, b.other_column
                FROM source1_view a
                JOIN source2_view b ON a.id = b.id
            """)
            join_df1.cache()  # Caching intermediate result
    
            # Second join with another table
            join_df2 = spark.sql("""
                SELECT a.*, c.additional_info
                FROM join_df1 a
                JOIN source3_view c ON a.id = c.id
            """)
    
            # Step 4: Write to target table
            logger.info("Writing results to the target table")
            join_df2.write.mode('overwrite').saveAsTable('bdl_final_table')
    
            # Step 5: Clean up cache
            join_df1.unpersist()
            join_df2.unpersist()
    
            # Step 6: Update job status to completed
            update_job_status(job_id, 'completed')
    
        except Exception as e:
            logger.error(f"Job failed due to error: {str(e)}")
            update_job_status(job_id, 'failed', str(e))
    
    # Retry mechanism for running ETL jobs
    def run_etl_with_retries(job_id, sources, month, max_retries=3):
        retry_count = 0
        while retry_count <= max_retries:
            try:
                etl_job(job_id, sources, month)
                logger.info(f"Job {job_id} completed successfully.")
                break
            except Exception as e:
                retry_count += 1
                logger.error(f"Job {job_id} failed on attempt {retry_count}: {e}")
                if retry_count > max_retries:
                    logger.error(f"Max retries reached for job {job_id}. Job failed.")
                    update_job_status(job_id, 'failed', f"Max retries reached: {e}")
                else:
                    logger.info(f"Retrying job {job_id} (attempt {retry_count + 1}/{max_retries})")
    
    # List of sources to read from
    sources = ["bdl_source_table1", "bdl_source_table2", "bdl_source_table3"]
    
    # Running the ETL job for the current month with retries
    run_etl_with_retries(1001, sources, "2024-10", max_retries=3)
    

    Explanation and Enhancements

    1. Data Availability Pre-checks:
      • The function check_data_availability checks for data availability for all source tables for the current month and logs counts into the control table.
      • The job fails early if required data is missing.
    2. Dynamic Broadcasting:
      • We dynamically broadcast small tables (less than 100MB) to optimize joins with larger tables.
    3. Caching and Memory Management:
      • The results of the first join (join_df1) are cached, and unpersisted after usage to free up memory.
      • Caching is applied where intermediate data needs to be reused across transformations.
    4. Job Status and Control Table:
      • The control_table logs information about the source table counts and progress of the ETL job.
      • The job_status_table logs the job’s overall status, including success, failure, and retry attempts.
    5. Retry Mechanism:
      • The run_etl_with_retries function wraps the main ETL job execution with a retry mechanism.
      • It retries the job up to max_retries times in case of failures, logging each attempt.
    6. Logging and Error Handling:
      • The logger captures key information such as job start/end, data availability, errors, and retry attempts.
      • In case of failure, the job status and error message are updated in the job_status_table.

    Additional Enhancements

    • Email/Notification Alerts:
      • You can integrate email alerts (using a service like AWS SES or Spark’s alerting tool) to notify the team in case of job failures or retries.
    • Corrupt Data Handling:
      • If any data is found to be corrupt during reading or processing, you can implement a mechanism to move corrupt data into a “quarantine” folder for manual inspection.
    • Audit Logs:
      • Audit logs can be maintained for every ETL execution, logging details such as data transformations, count mismatches, and data quality checks for compliance reporting.

    The solution includes:

    1. Schema creation for control and job status tables.
    2. Pre-checks for data availability.
    3. Data read, join, and transformation steps.
    4. Caching, broadcasting, and optimizations.
    5. Tracking ETL job progress and status updates.
    6. Handling retries and error logging.
    7. Enhancements such as auditing, retries, dynamic broadcasting, and memory management.

    1. Table Schemas (Control and Job Status Tables)

    Control Table Schema

    CREATE TABLE IF NOT EXISTS control_table (
        step_id INT,
        step_name STRING,
        priority INT,
        source_table STRING,
        source_count BIGINT,
        target_table STRING,
        target_count BIGINT,
        status STRING,
        run_date DATE,
        error_message STRING
    );
    

    Job Status Table Schema

    CREATE TABLE IF NOT EXISTS job_status_table (
        job_id INT,
        job_name STRING,
        run_date DATE,
        start_time TIMESTAMP,
        end_time TIMESTAMP,
        retry_count INT,
        status STRING,
        error_message STRING
    );
    

    2. Pre-Check Logic for Data Availability

    We will first verify if all the required source data is available for the specified month. If any source table lacks data, the ETL job should fail before any processing starts.

    def check_data_availability(sources, month):
        for source in sources:
            logger.info(f"Checking data availability for source: {source}")
    
            # Count rows in source table for the given month
            count_df = spark.sql(f"SELECT COUNT(1) FROM {source} WHERE month = '{month}'")
            count = count_df.collect()[0][0]
    
            # If no data exists for the current month, fail the job
            if count == 0:
                logger.error(f"No data available for {source} for month {month}. Failing the job.")
                raise Exception(f"Data not available for {source}")
    
            # Compare with previous month count
            prev_month = (datetime.now().replace(day=1) - timedelta(days=1)).strftime('%Y-%m')
            prev_count_df = spark.sql(f"SELECT COUNT(1) FROM {source} WHERE month = '{prev_month}'")
            prev_count = prev_count_df.collect()[0][0]
    
            logger.info(f"Source {source}: Current month count = {count}, Previous month count = {prev_count}")
    
            # Insert source count into control table
            spark.sql(f"""
                INSERT INTO control_table
                (step_id, step_name, priority, source_table, source_count, status, run_date)
                VALUES (1, 'read {source}', 1, '{source}', {count}, 'pending', current_date())
            """)
    

    3. Job Status Tracking

    We’ll maintain the job status (whether it is running, completed, or failed) in the job_status_table. This allows us to track the entire job execution, including retries and errors.

    def update_job_status(job_id, status, error_message=None):
        end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    
        # Update job status in Hive table
        spark.sql(f"""
            UPDATE job_status_table
            SET status = '{status}',
                end_time = '{end_time}',
                error_message = '{error_message}'
            WHERE job_id = {job_id}
        """)
    
        logger.info(f"Job {job_id} status updated to {status}")
    

    4. ETL Job with Data Reads, Joins, and Optimizations

    This is the main body of the ETL job. We read the source tables, perform joins, apply transformations, and write the result to a target table.

    Enhancements:

    1. Dynamic Broadcasting: If a table size is small, it should be broadcasted in joins.
    2. Caching: We cache intermediate results that will be reused multiple times.
    3. Error Handling: Logs and updates the status of the job and retries in case of failure.
    4. Memory Management: Cleans up after each step (cache, uncache).
    from pyspark.sql import SparkSession
    from datetime import datetime, timedelta
    import logging
    
    # Initialize logger
    logger = logging.getLogger('ETL_Logger')
    
    # Spark session setup
    spark = SparkSession.builder
        .appName("ETL_Job")
        .enableHiveSupport()
        .getOrCreate()
    
    # Main ETL job function
    def etl_job(job_id, sources, month):
        try:
            logger.info(f"Starting ETL job with Job ID: {job_id}")
    
            # Step 1: Check Data Availability
            check_data_availability(sources, month)
    
            # Step 2: Read data from sources (assuming they are in Hive tables)
            dataframes = {}
            for source in sources:
                logger.info(f"Reading data from {source}")
                df = spark.sql(f"SELECT * FROM {source} WHERE month = '{month}'")
                df.createOrReplaceTempView(f"{source}_view")
                dataframes[source] = df
    
            # Dynamic broadcasting of smaller tables
            for table in dataframes:
                df_size = dataframes[table].rdd.map(lambda x: len(str(x))).sum()
                if df_size < 100 * 1024 * 1024:  # Size threshold for broadcasting (100MB)
                    logger.info(f"Broadcasting table: {table}")
                    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", df_size)
    
            # Step 3: Perform joins and transformations
            logger.info("Performing data transformations and joins")
    
            # First join
            join_df1 = spark.sql("""
                SELECT a.*, b.other_column
                FROM source1_view a
                JOIN source2_view b ON a.id = b.id
            """)
            join_df1.cache()  # Caching intermediate result
    
            # Second join with another table
            join_df2 = spark.sql("""
                SELECT a.*, c.additional_info
                FROM join_df1 a
                JOIN source3_view c ON a.id = c.id
            """)
    
            # Step 4: Write to target table
            logger.info("Writing results to the target table")
            join_df2.write.mode('overwrite').saveAsTable('bdl_final_table')
    
            # Step 5: Clean up cache
            join_df1.unpersist()
            join_df2.unpersist()
    
            # Step 6: Update job status to completed
            update_job_status(job_id, 'completed')
    
        except Exception as e:
            logger.error(f"Job failed due to error: {str(e)}")
            update_job_status(job_id, 'failed', str(e))
    
    # List of sources to read from
    sources = ["bdl_source_table1", "bdl_source_table2", "bdl_source_table3"]
    
    # Running the ETL job for the current month
    etl_job(1001, sources, "2024-10")
    

    5. Retry Mechanism

    In case of failure, the job should be retried a set number of times. Here’s how to add a retry mechanism:

    pythonCopy codedef run_etl_with_retries(job_id, sources, month, max_retries=3):
        retry_count = 0
        while retry_count <= max_retries:
            try:
                etl_job(job_id, sources, month)
                logger.info(f"Job {job_id} completed successfully.")
                break
            except Exception as e:
                retry_count += 1
                logger.error(f"Job {job_id} failed on attempt {retry_count}: {e}")
                if retry_count > max_retries:
                    logger.error(f"Max retries reached for job {job_id}. Job failed.")
                    update_job_status(job_id, 'failed', f"Max retries reached: {e}")
                else:
                    logger.info(f"Retrying job {job_id} (attempt {retry_count + 1}/{max_retries})")
    

    6. Enhancements Added

    1. Dynamic Broadcasting: If a table size is less than 100 MB, it gets broadcasted to optimize joins with larger tables.
    2. Caching and Uncaching: Intermediate results are cached to optimize repeated access, and uncached after use to free up memory.
    3. Pre-check Data Availability: The job will fail at the start if data is not available for the current month.
    4. Job Status and Control Table: Each step and job’s status is logged and recorded, making it easier to monitor, audit, and retry failed jobs.
    5. Retry Mechanism: Added a retry mechanism in case of job failure, with configurable retries.

    Final Thoughts

    • Performance: Dynamic broadcasting and caching have been used to optimize joins and intermediate data processing.
    • Error Handling: The job logs every failure, updates the job status in the database, and supports automatic retries.
    • Monitoring: Both job status and step-level progress are tracked using the control and job status tables, giving visibility into the workflow’s state at any given point.
    • Memory Management: Proper use of cache() and unpersist() ensures that memory is efficiently managed.

    This ETL framework is production-ready and is designed to handle large datasets, ensure robustness, and track job progress while providing opportunities for further customizations.

    To successfully implement the ETL workflow described earlier, you need to have two essential tracking mechanisms in place: a control table and a job status table. These tables help to monitor the progress of the ETL job, track the job’s execution, log errors, and ensure data integrity.

    Here, I’ll define the structure (schema) for both tables, provide sample data, and explain how to implement them in the PySpark SQL-based ETL job.

    To successfully implement the ETL workflow described earlier, you need to have two essential tracking mechanisms in place: a control table and a job status table. These tables help to monitor the progress of the ETL job, track the job’s execution, log errors, and ensure data integrity.

    Here, I’ll define the structure (schema) for both tables, provide sample data, and explain how to implement them in the PySpark SQL-based ETL job.


    1. Control Table Schema:

    This table tracks the details of each step of the ETL job, including its execution status, source data information, and any conditions for retry or failure.

    Control Table Schema

    Column NameData TypeDescription
    step_idINTUnique ID for each step in the ETL job.
    step_nameSTRINGName of the ETL step (e.g., ‘read source1’, ‘join tables’).
    priorityINTPriority of the step (lower numbers mean higher priority).
    source_tableSTRINGName of the source table being read or transformed.
    source_countBIGINTRow count of the source data in the step.
    target_tableSTRINGName of the target table being written.
    target_countBIGINTRow count of the data being written.
    statusSTRINGStatus of the step (‘pending’, ‘in-progress’, ‘completed’, ‘failed’).
    run_dateDATEDate the step was last run.
    error_messageSTRINGError message if the step fails.

    Sample Data for Control Table

    step_idstep_nameprioritysource_tablesource_counttarget_tabletarget_countstatusrun_dateerror_message
    1read source11bdl_source_table11500000NULLNULLcompleted2024-10-01NULL
    2join source1_22bdl_source_table21700000NULLNULLcompleted2024-10-01NULL
    3write to target3NULLNULLbdl_target_table3000000completed2024-10-01NULL
    4aggregate results4bdl_target_table3000000bdl_final_table1000000failed2024-10-01Memory error

    2. Job Status Table Schema:

    This table logs the overall status of the ETL job. It tracks each execution attempt, including the number of retries, job duration, and whether it succeeded or failed.

    Job Status Table Schema

    Column NameData TypeDescription
    job_idINTUnique identifier for the entire ETL job run.
    job_nameSTRINGName of the ETL job.
    run_dateDATEDate when the job started.
    start_timeTIMESTAMPTimestamp when the job execution started.
    end_timeTIMESTAMPTimestamp when the job execution ended.
    retry_countINTNumber of times the job was retried.
    statusSTRINGOverall status of the job (‘pending’, ‘running’, ‘completed’, ‘failed’).
    error_messageSTRINGError message in case of job failure.

    Sample Data for Job Status Table

    job_idjob_namerun_datestart_timeend_timeretry_countstatuserror_message
    1001BDL_ETL_Job2024-10-012024-10-01 08:00:002024-10-01 09:30:000completedNULL
    1002BDL_ETL_Job2024-10-022024-10-02 08:00:002024-10-02 10:00:002failedMemory error

    3. Building and Using These Tables in the ETL Process

    Here’s how you can build these tables in your Hive Metastore or your preferred data store and integrate them into the PySpark SQL-based ETL job.

    Step 1: Creating Control Table and Job Status Table

    You can create these tables in Hive using SQL commands. This will allow you to track the ETL progress.

    -- Creating Control Table
    CREATE TABLE IF NOT EXISTS control_table (
        step_id INT,
        step_name STRING,
        priority INT,
        source_table STRING,
        source_count BIGINT,
        target_table STRING,
        target_count BIGINT,
        status STRING,
        run_date DATE,
        error_message STRING
    );
    
    -- Creating Job Status Table
    CREATE TABLE IF NOT EXISTS job_status_table (
        job_id INT,
        job_name STRING,
        run_date DATE,
        start_time TIMESTAMP,
        end_time TIMESTAMP,
        retry_count INT,
        status STRING,
        error_message STRING
    );
    

    Step 2: Pre-Check for Data Availability (Checking Source Counts)

    Before starting the ETL job, you’ll want to check that all source tables contain data for the specified month. If any source is missing data, the job should fail early.

    def check_data_availability(sources, month):
        for source in sources:
            logger.info(f"Checking data availability for source: {source}")
    
            # Count rows in source table for the given month
            count_df = spark.sql(f"SELECT COUNT(1) FROM {source} WHERE month = '{month}'")
            count = count_df.collect()[0][0]
    
            # If no data exists for the current month, fail the job
            if count == 0:
                logger.error(f"No data available for {source} for month {month}. Failing the job.")
                raise Exception(f"Data not available for {source}")
    
            # Compare with previous month count
            prev_month = (datetime.now().replace(day=1) - timedelta(days=1)).strftime('%Y-%m')
            prev_count_df = spark.sql(f"SELECT COUNT(1) FROM {source} WHERE month = '{prev_month}'")
            prev_count = prev_count_df.collect()[0][0]
    
            logger.info(f"Source {source}: Current month count = {count}, Previous month count = {prev_count}")
    
            # Add source count to control table
            spark.sql(f"""
                INSERT INTO control_table
                (step_id, step_name, priority, source_table, source_count, status, run_date)
                VALUES (1, 'read {source}', 1, '{source}', {count}, 'pending', current_date())
            """)
    

    Step 3: Tracking Job Status

    You can track the status of each ETL job run using the job_status_table. Each time the job runs, it updates the job’s start time, end time, retry attempts, and whether the job succeeded or failed.

    def update_job_status(job_id, status, error_message=None):
        end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    
        # Update job status in Hive table
        spark.sql(f"""
            UPDATE job_status_table
            SET status = '{status}',
                end_time = '{end_time}',
                error_message = '{error_message}'
            WHERE job_id = {job_id}
        """)
    
        logger.info(f"Job {job_id} status updated to {status}")
    

    Step 4: Performing the ETL Job

    Below is an overview of the steps performed in the ETL job, including reading from source tables, checking data availability, performing joins, and updating the status.

    def etl_job(job_id, sources, month):
        try:
            logger.info(f"Starting ETL job with Job ID: {job_id}")
    
            # Step 1: Check Data Availability
            check_data_availability(sources, month)
    
            # Step 2: Read data from sources (assuming they are in Hive tables)
            for source in sources:
                logger.info(f"Reading data from {source}")
                df = spark.sql(f"SELECT * FROM {source} WHERE month = '{month}'")
                df.createOrReplaceTempView(f"{source}_view")
    
            # Step 3: Perform joins and transformations
            logger.info("Performing data transformations and joins")
            transformed_df = spark.sql("""
                SELECT a.*, b.other_column
                FROM source1_view a
                JOIN source2_view b ON a.id = b.id
            """)
    
            # Cache the result to avoid recomputation
            transformed_df.cache()
    
            # Perform additional transformations
            final_df = spark.sql("""
                SELECT a.*, c.additional_info
                FROM transformed_df a
                JOIN source3_view c ON a.id = c.id
            """)
    
            # Write to target
            final_df.write.mode('overwrite').saveAsTable('bdl_final_table')
    
            # Step 4: Update job status
            update_job_status(job_id, 'completed')
    
        except Exception as e:
            logger.error(f"Job failed due to error: {str(e)}")
            update_job_status(job_id, 'failed', str(e))
    

    Final Thoughts:

    • Control and Job Status Tables: Provide a robust mechanism to track and manage the execution of ETL jobs.
    • Pre-check Logic: Ensures data availability before starting the job to prevent unnecessary processing.
    • Job Status Updates: Help to track the job’s execution and easily identify issues.
    • Optimization: The job includes caching to optimize the execution time of chained transformations.

    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    from pyspark.sql.window import Window
    import logging
    import os
    
    # Initialize Spark session
    spark = SparkSession.builder
        .appName("BDL Processing Job")
        .enableHiveSupport()
        .getOrCreate()
    
    # Initialize logger
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger("BDL Processing Job")
    
    # Configurations
    input_table = "bdl.input_table"
    output_table = "bdl.output_table"
    control_table = "bdl.control_table"  # To track the state of file processing
    max_retries = 3
    
    # State management: Fetch unprocessed or failed files from the control table
    def fetch_unprocessed_files():
        return spark.sql(f"SELECT * FROM {control_table} WHERE status = 'unprocessed' OR status = 'failed'")
    
    # Retry mechanism: Retry failed files up to max_retries
    def retry_failed_files(file_name, retries):
        return retries < max_retries
    
    # Main processing function
    def process_file(file_data):
        try:
            # Processing logic here (for example, transformations)
            processed_data = file_data.withColumn("processed", F.lit(True))
    
            # Write the processed data back to BDL table
            processed_data.write.mode("overwrite").insertInto(output_table)
    
            # Log success and update the control table
            logger.info(f"Processing completed for file: {file_data}")
            spark.sql(f"UPDATE {control_table} SET status = 'processed' WHERE file_name = '{file_data}'")
    
        except Exception as e:
            logger.error(f"Error processing file {file_data}: {str(e)}")
            spark.sql(f"UPDATE {control_table} SET status = 'failed' WHERE file_name = '{file_data}'")
    
            # Retry logic
            retries = spark.sql(f"SELECT retries FROM {control_table} WHERE file_name = '{file_data}'").collect()[0][0]
            if retry_failed_files(file_data, retries):
                logger.info(f"Retrying file {file_data} (Retry count: {retries+1})")
                spark.sql(f"UPDATE {control_table} SET retries = retries + 1 WHERE file_name = '{file_data}'")
                process_file(file_data)  # Retry
            else:
                logger.error(f"File {file_data} failed after {max_retries} retries")
                move_to_corrupt_folder(file_data)
    
    # Function to move corrupt files to a folder for manual review
    def move_to_corrupt_folder(file_data):
        corrupt_folder = "/path/to/corrupt_folder"
        os.rename(file_data, os.path.join(corrupt_folder, os.path.basename(file_data)))
        logger.info(f"Moved file {file_data} to corrupt folder")
    
    # Function to send notifications in case of significant issues
    def send_notification(message):
        # This can be integrated with email alerts or messaging systems (e.g., Slack)
        logger.info(f"Notification sent: {message}")
    
    # Main Job Logic
    if __name__ == "__main__":
        logger.info("Starting BDL Processing Job")
    
        # Fetch unprocessed or failed files from control table
        unprocessed_files = fetch_unprocessed_files()
    
        for file in unprocessed_files.collect():
            logger.info(f"Processing file: {file}")
            process_file(file)
    
        logger.info("BDL Processing Job completed")
    

    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    from pyspark.sql.window import Window
    import logging
    import os
    
    # Initialize Spark session
    spark = SparkSession.builder
        .appName("BDL Processing Job")
        .enableHiveSupport()
        .getOrCreate()
    
    # Initialize logger
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger("BDL Processing Job")
    
    # Configurations
    input_table = "bdl.input_table"
    output_table = "bdl.output_table"
    control_table = "bdl.control_table"  # To track the state of file processing
    max_retries = 3
    
    # State management: Fetch unprocessed or failed files from the control table
    def fetch_unprocessed_files():
        return spark.sql(f"SELECT * FROM {control_table} WHERE status = 'unprocessed' OR status = 'failed'")
    
    # Retry mechanism: Retry failed files up to max_retries
    def retry_failed_files(file_name, retries):
        return retries < max_retries
    
    # Main processing function
    def process_file(file_data):
        try:
            # Processing logic here (for example, transformations)
            processed_data = file_data.withColumn("processed", F.lit(True))
    
            # Write the processed data back to BDL table
            processed_data.write.mode("overwrite").insertInto(output_table)
    
            # Log success and update the control table
            logger.info(f"Processing completed for file: {file_data}")
            spark.sql(f"UPDATE {control_table} SET status = 'processed' WHERE file_name = '{file_data}'")
    
        except Exception as e:
            logger.error(f"Error processing file {file_data}: {str(e)}")
            spark.sql(f"UPDATE {control_table} SET status = 'failed' WHERE file_name = '{file_data}'")
    
            # Retry logic
            retries = spark.sql(f"SELECT retries FROM {control_table} WHERE file_name = '{file_data}'").collect()[0][0]
            if retry_failed_files(file_data, retries):
                logger.info(f"Retrying file {file_data} (Retry count: {retries+1})")
                spark.sql(f"UPDATE {control_table} SET retries = retries + 1 WHERE file_name = '{file_data}'")
                process_file(file_data)  # Retry
            else:
                logger.error(f"File {file_data} failed after {max_retries} retries")
                move_to_corrupt_folder(file_data)
    
    # Function to move corrupt files to a folder for manual review
    def move_to_corrupt_folder(file_data):
        corrupt_folder = "/path/to/corrupt_folder"
        os.rename(file_data, os.path.join(corrupt_folder, os.path.basename(file_data)))
        logger.info(f"Moved file {file_data} to corrupt folder")
    
    # Function to send notifications in case of significant issues
    def send_notification(message):
        # This can be integrated with email alerts or messaging systems (e.g., Slack)
        logger.info(f"Notification sent: {message}")
    
    # Main Job Logic
    if __name__ == "__main__":
        logger.info("Starting BDL Processing Job")
    
        # Fetch unprocessed or failed files from control table
        unprocessed_files = fetch_unprocessed_files()
    
        for file in unprocessed_files.collect():
            logger.info(f"Processing file: {file}")
            process_file(file)
    
        logger.info("BDL Processing Job completed")
    

    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    from pyspark.sql.window import Window
    import logging
    import os
    
    # Initialize Spark session
    spark = SparkSession.builder
        .appName("BDL Processing Job")
        .enableHiveSupport()
        .getOrCreate()
    
    # Initialize logger
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger("BDL Processing Job")
    
    # Configurations
    input_table = "bdl.input_table"
    output_table = "bdl.output_table"
    control_table = "bdl.control_table"  # To track the state of file processing
    max_retries = 3
    
    # State management: Fetch unprocessed or failed files from the control table
    def fetch_unprocessed_files():
        return spark.sql(f"SELECT * FROM {control_table} WHERE status = 'unprocessed' OR status = 'failed'")
    
    # Retry mechanism: Retry failed files up to max_retries
    def retry_failed_files(file_name, retries):
        return retries < max_retries
    
    # Main processing function
    def process_file(file_data):
        try:
            # Processing logic here (for example, transformations)
            processed_data = file_data.withColumn("processed", F.lit(True))
    
            # Write the processed data back to BDL table
            processed_data.write.mode("overwrite").insertInto(output_table)
    
            # Log success and update the control table
            logger.info(f"Processing completed for file: {file_data}")
            spark.sql(f"UPDATE {control_table} SET status = 'processed' WHERE file_name = '{file_data}'")
    
        except Exception as e:
            logger.error(f"Error processing file {file_data}: {str(e)}")
            spark.sql(f"UPDATE {control_table} SET status = 'failed' WHERE file_name = '{file_data}'")
    
            # Retry logic
            retries = spark.sql(f"SELECT retries FROM {control_table} WHERE file_name = '{file_data}'").collect()[0][0]
            if retry_failed_files(file_data, retries):
                logger.info(f"Retrying file {file_data} (Retry count: {retries+1})")
                spark.sql(f"UPDATE {control_table} SET retries = retries + 1 WHERE file_name = '{file_data}'")
                process_file(file_data)  # Retry
            else:
                logger.error(f"File {file_data} failed after {max_retries} retries")
                move_to_corrupt_folder(file_data)
    
    # Function to move corrupt files to a folder for manual review
    def move_to_corrupt_folder(file_data):
        corrupt_folder = "/path/to/corrupt_folder"
        os.rename(file_data, os.path.join(corrupt_folder, os.path.basename(file_data)))
        logger.info(f"Moved file {file_data} to corrupt folder")
    
    # Function to send notifications in case of significant issues
    def send_notification(message):
        # This can be integrated with email alerts or messaging systems (e.g., Slack)
        logger.info(f"Notification sent: {message}")
    
    # Main Job Logic
    if __name__ == "__main__":
        logger.info("Starting BDL Processing Job")
    
        # Fetch unprocessed or failed files from control table
        unprocessed_files = fetch_unprocessed_files()
    
        for file in unprocessed_files.collect():
            logger.info(f"Processing file: {file}")
            process_file(file)
    
        logger.info("BDL Processing Job completed")
    

    # Read data from BDL table
    input_data = spark.sql(f"SELECT * FROM {input_table}")
    
    # Example processing in SQL
    input_data.createOrReplaceTempView("input_data_view")
    processed_data = spark.sql("""
        SELECT *,
               TRUE AS processed
        FROM input_data_view
    """)
    
    # Write processed data to output BDL table
    processed_data.write.mode("overwrite").insertInto(output_table)
    
    # Update the control table to mark the file as processed
    spark.sql(f"UPDATE {control_table} SET status = 'processed' WHERE file_name = 'example_file'")
    

    PySpark script incorporating optimizations for joining large tables, performing groupBy, transpose and writing output

    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    from pyspark.sql import Window
    
    # Create SparkSession with optimizations
    spark = SparkSession.builder.appName("Optimized Join and Transform")
        .config("spark.sql.shuffle.partitions", 200)
        .config("spark.driver.memory", "8g")
        .config("spark.executor.memory", "16g")
        .config("spark.executor.cores", 4)
        .getOrCreate()
    
    # Load 30GB table (e.g., sales data)
    sales_df = spark.read.parquet("sales_data.parquet")
    
    # Load 200MB look-up tables (e.g., customer, product)
    customer_df = spark.read.parquet("customer_data.parquet").cache()
    product_df = spark.read.parquet("product_data.parquet").cache()
    
    # Broadcast smaller DataFrames for efficient joins
    customer_bcast = customer_df.broadcast()
    product_bcast = product_df.broadcast()
    
    # Join sales data with look-up tables
    joined_df = sales_df
        .join(customer_bcast, "customer_id")
        .join(product_bcast, "product_id")
    
    # Perform groupBy and aggregation
    grouped_df = joined_df
        .groupBy("date", "region")
        .agg(F.sum("sales").alias("total_sales"))
    
    # Transpose data using pivot
    transposed_df = grouped_df
        .groupBy("date")
        .pivot("region")
        .sum("total_sales")
    
    # Write output to Parquet file
    transposed_df.write.parquet("output_data.parquet", compression="snappy")
    
    # Estimate DataFrame sizes
    def estimate_size(df):
        return df.estimatedRowCount * df.select(F.size(F.col("value")).alias("size")).first().size
    
    print("Sales DataFrame size:", estimate_size(sales_df))
    print("Customer DataFrame size:", estimate_size(customer_df))
    print("Product DataFrame size:", estimate_size(product_df))
    print("Joined DataFrame size:", estimate_size(joined_df))
    print("Transposed DataFrame size:", estimate_size(transposed_df))
    
    # Monitor Spark UI for execution plans and performance metrics
    spark.sparkContext.uiWebUrl

    Optimizations Used

    1. Broadcasting: Smaller DataFrames (customer_df, product_df) are broadcasted for efficient joins.
    2. Caching: Frequently used DataFrames (customer_df, product_df) are cached.
    3. Partitioning: spark.sql.shuffle.partitions is set to 200 for efficient shuffling.
    4. Memory Configuration: Driver and executor memory are configured for optimal performance.
    5. Executor Cores: spark.executor.cores is set to 4 for parallel processing.
    6. Parquet Compression: Output is written with Snappy compression.

    Estimating DataFrame Sizes

    The estimate_size function uses estimatedRowCount and size calculation to estimate DataFrame sizes.

    Monitoring Performance

    1. Spark UI: spark.sparkContext.uiWebUrl provides execution plans and performance metrics.
    2. Spark History Server: Analyze completed application metrics.

HintsToday

Hints and Answers for Everything

Skip to content ↓