Data Engineer Interview Questions Set1

Explain a scenario on schema evolution in data pipelines

Here’s an automated Python script using PySpark that performs schema evolution between two datasets (e.g., two Parquet files or DataFrames):

✅ Features:

  1. Compares schemas of two DataFrames (e.g., old vs. new).
  2. Identifies added or missing fields.
  3. Adds missing fields with default values so that both DataFrames can be aligned and merged.
  4. Merges the two DataFrames safely.

🔧 Prerequisites:

  • Apache Spark (PySpark)
  • Input: Two DataFrames (could be from old and new Parquet files)

🧠 Script: Schema Evolution Handler

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType

# Start Spark Session
spark = SparkSession.builder \
    .appName("SchemaEvolutionHandler") \
    .getOrCreate()

def align_schemas(df1, df2, default_map=None):
    """
    Aligns two DataFrames by adding missing fields with default values.
    
    Args:
        df1: First DataFrame
        df2: Second DataFrame
        default_map: Dict of default values to use for new fields
    
    Returns:
        Tuple of (aligned_df1, aligned_df2)
    """
    if default_map is None:
        default_map = {}

    # Get schemas
    schema1 = set(df1.schema.names)
    schema2 = set(df2.schema.names)

    # Find differences
    fields_only_in_df1 = schema1 - schema2
    fields_only_in_df2 = schema2 - schema1

    # Add missing columns with default values to df2
    for col in fields_only_in_df1:
        default_val = default_map.get(col, None)
        df2 = df2.withColumn(col, lit(default_val))

    # Add missing columns with default values to df1
    for col in fields_only_in_df2:
        default_val = default_map.get(col, None)
        df1 = df1.withColumn(col, lit(default_val))

    # Reorder columns for consistency
    final_columns = sorted(df1.columns)  # Or define your own order
    return df1.select(final_columns), df2.select(final_columns)


# Example usage:
# Load two DataFrames (e.g., old and new)
df_old = spark.read.parquet("path/to/old_data.parquet")
df_new = spark.read.parquet("path/to/new_data.parquet")

# Optional: Provide default values for specific columns
defaults = {
    "discount": 0.0,
    "promo_code": None,
    "shipping_cost": 0.0
}

# Align schemas
df_old_aligned, df_new_aligned = align_schemas(df_old, df_new, default_map=defaults)

# Merge or compare
df_merged = df_old_aligned.unionByName(df_new_aligned)

# Save or analyze
df_merged.write.mode("overwrite").parquet("path/to/merged_output.parquet")

🔍 Output:

  • A merged dataset with all fields from both versions.
  • Missing fields filled with appropriate default values.

💡 Notes:

  • You can plug this into your ETL pipeline before merging historical and new data.
  • Extend it for nested schemas (with recursive functions) if dealing with complex structures.

Automated script for schema evolution. first to check what fields are missing or added. then for that list we assign a default value so that both tables can be merged compared

Yes, absolutely — you can use df1.columns and df2.columns to simplify the comparison when you’re only dealing with flat (non-nested) schemas. Here’s a cleaner, minimal version of the schema evolution logic using column lists:


✅ Flat Schema Evolution Script (Using df.columns)

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

# Start Spark Session
spark = SparkSession.builder.appName("FlatSchemaEvolution").getOrCreate()

# Load DataFrames
df1 = spark.read.parquet("path/to/old_data.parquet")
df2 = spark.read.parquet("path/to/new_data.parquet")

# Optional: Default values for new fields
default_values = {
    "discount": 0.0,
    "promo_code": None,
    "shipping_cost": 0.0
}

# Get column sets
cols1 = set(df1.columns)
cols2 = set(df2.columns)

# Identify missing columns
missing_in_df1 = cols2 - cols1
missing_in_df2 = cols1 - cols2

# Add missing columns to df1
for col in missing_in_df1:
    default_val = default_values.get(col, None)
    df1 = df1.withColumn(col, lit(default_val))

# Add missing columns to df2
for col in missing_in_df2:
    default_val = default_values.get(col, None)
    df2 = df2.withColumn(col, lit(default_val))

# Align column order (sorted or any desired order)
final_columns = sorted(list(cols1.union(cols2)))
df1_aligned = df1.select(final_columns)
df2_aligned = df2.select(final_columns)

# Merge or Compare
df_merged = df1_aligned.unionByName(df2_aligned)

# Save result
df_merged.write.mode("overwrite").parquet("path/to/merged_output.parquet")

🧠 Why This Works:

  • df.columns gives a simple list of column names.
  • You use basic set operations to find differences.
  • You fill in missing columns using withColumn() and lit().

🛑 Limitation:

This version won’t work with nested fields (like JSON inside a column). For that, you’d need recursive schema inspection using df.schema.

Here’s a PySpark script that handles nested JSON structures for schema evolution. This version recursively:

  1. Compares two schemas.
  2. Identifies missing fields at all levels (even inside nested structs).
  3. Adds those missing fields with default values so both DataFrames can be aligned and merged.

🧠 Key Concepts:

  • Uses StructType and StructField to parse nested schemas.
  • Recursively aligns nested fields by building a merged schema.
  • Fills in missing fields with default values (None, or you can customize).

✅ Full Script: Schema Evolution for Nested Structures

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, struct
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, BooleanType

# Start Spark session
spark = SparkSession.builder.appName("NestedSchemaEvolution").getOrCreate()

def merge_schemas(schema1: StructType, schema2: StructType, default_map=None):
    """ Recursively merges two StructType schemas, returning a merged schema. """
    if default_map is None:
        default_map = {}

    fields = {}
    
    # Add fields from schema1
    for field in schema1.fields:
        fields[field.name] = field

    # Merge/compare with fields from schema2
    for field in schema2.fields:
        if field.name in fields:
            existing_field = fields[field.name]
            # If both are Structs, recurse
            if isinstance(existing_field.dataType, StructType) and isinstance(field.dataType, StructType):
                merged_sub_schema = merge_schemas(existing_field.dataType, field.dataType, default_map)
                fields[field.name] = StructField(field.name, merged_sub_schema, True)
        else:
            fields[field.name] = field

    return StructType(list(fields.values()))

def fill_missing_fields(df, merged_schema, default_map=None):
    """ Recursively adds missing fields (at any level) with default values """
    if default_map is None:
        default_map = {}

    def _fill(schema, prefix=""):
        for field in schema.fields:
            full_name = f"{prefix}.{field.name}" if prefix else field.name
            if isinstance(field.dataType, StructType):
                if full_name not in df.columns:
                    df_local = df.withColumn(field.name, lit(None).cast(field.dataType))
                else:
                    df_local = df
                    df_struct = df_local.select(f"{field.name}.*")
                    df_struct_filled = fill_missing_fields(df_struct, field.dataType, default_map)
                    df_local = df_local.drop(field.name).withColumn(field.name, struct(*[df_struct_filled[col] for col in df_struct_filled.columns]))
                return df_local
            else:
                if field.name not in df.columns:
                    default_val = default_map.get(full_name, None)
                    df_local = df.withColumn(field.name, lit(default_val).cast(field.dataType))
                    return df_local
        return df

    return _fill(merged_schema)

# Example: Load two JSON-based DataFrames
df1 = spark.read.json("path/to/old_data.json")
df2 = spark.read.json("path/to/new_data.json")

# Optional: Set default values for specific nested fields
default_values = {
    "discount": 0.0,
    "promo_code": None,
    "customer.email": "unknown@example.com"
}

# Merge schemas
merged_schema = merge_schemas(df1.schema, df2.schema, default_map=default_values)

# Fill missing fields
df1_filled = fill_missing_fields(df1, merged_schema, default_map=default_values)
df2_filled = fill_missing_fields(df2, merged_schema, default_map=default_values)

# Ensure column order matches
final_columns = df1_filled.columns  # should be the same now
df1_aligned = df1_filled.select(final_columns)
df2_aligned = df2_filled.select(final_columns)

# Merge
df_merged = df1_aligned.unionByName(df2_aligned)

# Save output
df_merged.write.mode("overwrite").json("path/to/merged_output.json")

🔍 Example Input:

Old Schema:

{
  "order_id": "123",
  "customer": {
    "name": "Alice"
  }
}

New Schema:

{
  "order_id": "456",
  "customer": {
    "name": "Bob",
    "email": "bob@example.com"
  },
  "discount": 5.0
}

🛑 Notes:

  • This version handles one level of nested structs.
  • If you have deeper levels of nesting or arrays of structs, it can be extended — let me know if you want that.

Delta Lake questions — concepts, use cases, and commands.

Sure! Here’s a structured breakdown of Delta Lake covering concepts, use cases, and commands:


🔍 Delta Lake Concepts

Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark and big data workloads.

Key Concepts:

  • ACID Transactions: Guarantees Atomicity, Consistency, Isolation, and Durability.
  • Schema Enforcement and Evolution: Ensures data adheres to a defined schema and supports schema changes over time.
  • Time Travel: Access previous versions of the data (using versions or timestamps).
  • Unified Batch and Streaming: Same Delta table can be used for both batch and streaming jobs.
  • Data Lineage and Audit: Built-in support for tracking data changes and version history.
  • Scalable Metadata Handling: Uses transaction logs (_delta_log) instead of listing files.

💼 Use Cases

1. Data Lakehouse Architecture

  • Combine the reliability of a data warehouse with the scalability of a data lake.

2. Slowly Changing Dimensions (SCD)

  • Maintain historical changes in records.

3. Change Data Capture (CDC)

  • Efficiently track and apply changes (inserts, updates, deletes).

4. Streaming + Batch Pipeline

  • Process real-time data with the same Delta tables used for batch processing.

5. Data Versioning & Rollbacks

  • Restore data to previous states in case of bad writes or failures.

6. Data Governance

  • Enforce schema, maintain audit trails, and ensure consistency across jobs.

💻 Delta Lake Commands (on Apache Spark or Databricks)

📁 Create / Write Tables

df.write.format("delta").save("/delta/events")  # Write to path
df.write.format("delta").saveAsTable("events")  # Save as managed table

📥 Read Data

df = spark.read.format("delta").load("/delta/events")
df = spark.read.table("events")

📌 Update Table

UPDATE events SET eventType = 'click' WHERE eventType = 'tap'

❌ Delete Records

DELETE FROM events WHERE eventType = 'spam'

➕ Merge (Upsert)

MERGE INTO target_table USING source_table
ON target_table.id = source_table.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

⏪ Time Travel

# By version
df = spark.read.format("delta").option("versionAsOf", 3).load("/delta/events")

# By timestamp
df = spark.read.format("delta").option("timestampAsOf", "2024-05-01").load("/delta/events")

🔄 Convert Parquet to Delta

from delta.tables import DeltaTable
DeltaTable.convertToDelta(spark, "parquet.`/path/to/parquet`")

🧾 Vacuum (Clean old files)

VACUUM events RETAIN 168 HOURS  -- default is 7 days

📜 Describe History (Versioning)

DESCRIBE HISTORY events

✅ Summary

FeatureBenefit
ACID TransactionsReliable writes/reads
Schema EnforcementData quality
Time TravelData recovery & debugging
Streaming SupportReal-time + batch pipelines
Metadata LoggingAudit and versioning

Memory Tuning — focused on Spark’s cluster-level memory: driver vs executor configs.

Great — let’s walk through Spark memory tuning at the cluster level, with a focus on driver vs executor memory configs. This is a key area for improving performance, preventing OOM (Out of Memory) errors, and making the most of your cluster resources.


🔧 Core Spark Memory Tuning Parameters

🧠 Basics:

  • Driver: Coordinates the job, maintains metadata (RDD lineage, DAG), and collects small results.
  • Executors: Do the heavy lifting — they run the tasks and hold data in memory (e.g., for shuffle, cache).

🔩 Key Cluster-Level Memory Settings

Config OptionApplies ToPurpose
spark.driver.memoryDriverMemory allocated to the driver JVM.
spark.executor.memoryExecutorsMemory for each executor JVM.
spark.executor.instancesClusterNumber of executors to launch.
spark.executor.coresExecutorsNumber of cores per executor.
spark.driver.memoryOverheadDriverOff-heap memory for native overhead.
spark.executor.memoryOverheadExecutorsOff-heap memory (shuffle, JNI, etc.).

🧮 Memory Breakdown Inside an Executor

spark.executor.memory (e.g. 8G)
  |
  ├── Execution Memory (for shuffles, joins, aggregations)
  └── Storage Memory (for caching, broadcasts)

spark.memory.fraction = 0.6 (default)
  └── 60% of executor memory is usable by Spark (rest is JVM heap)

Example: If spark.executor.memory = 8g, then:

  • ~4.8 GB for Spark memory (60%)
  • Split between execution & storage

💡 You can tweak spark.memory.fraction and spark.memory.storageFraction for finer control.


🎯 Tuning Scenarios

Scenario 1: Driver Out of Memory

Symptoms:

  • Error like java.lang.OutOfMemoryError: Java heap space from driver
  • Happens often with large collect, toPandas(), or large broadcast joins

Fixes:

  • Increase driver memory: --conf spark.driver.memory=4g --conf spark.driver.memoryOverhead=1g
  • Avoid large collect() operations

Scenario 2: Executor OOM or Slow Tasks

Symptoms:

  • Executors die with memory errors
  • GC overhead or spills to disk during shuffles/joins

Fixes:

  • Increase executor memory: --conf spark.executor.memory=8g --conf spark.executor.memoryOverhead=2g
  • Balance executor.memory and executor.instances to use the cluster efficiently
  • Repartition data to avoid large shuffles
  • Avoid caching very large datasets unless needed

Scenario 3: Poor Cluster Utilization

Symptoms:

  • Some nodes idle, some overloaded
  • Wasted memory or under-parallelized tasks

Fixes:

  • Adjust: --conf spark.executor.instances --conf spark.executor.cores
  • Use dynamic allocation (spark.dynamicAllocation.enabled=true) with shuffle.service.enabled=true

✅ General Guidelines

Cluster SizeSuggested Config
Small (e.g. 16 cores, 64 GB RAM)4 executors, 4 cores, 12g each
Medium (e.g. 32 cores, 128 GB)8 executors, 4 cores, 14g each
LargeTune based on workload type (ETL, ML, SQL)

🔍 Tools for Monitoring

  • Spark UI → Executors Tab (look for memory usage, GC time)
  • Ganglia / Prometheus → Cluster metrics
  • Logs → Check stderr for OOM or GC-related errors

🧠 Pro Tips

  • Don’t just increase memory — investigate why memory is used (wide transformations? skew? caching?)
  • Avoid very large executors (>16g heap) — GC becomes less efficient.
  • For large shuffles, increase spark.shuffle.file.buffer, spark.reducer.maxSizeInFlight

Spark Architecture — detailed explanation of driver, executor, and cluster components

Here’s a detailed breakdown of Apache Spark architecture, focusing on the driver, executors, and cluster components. Understanding these is essential for designing efficient Spark jobs and managing resources properly.


🚂 1. Driver Program

What It Is:

The driver is the master process that runs your Spark application’s main function. It orchestrates the entire execution of the job.

Responsibilities:

  • Converts your Spark code (RDDs, DataFrames, etc.) into a logical execution plan.
  • Builds a Directed Acyclic Graph (DAG) of stages.
  • Submits tasks to the cluster for execution.
  • Tracks task progress and retries failed tasks.
  • Maintains cluster resource coordination via the cluster manager.

Lives In:

  • Your application’s main JVM process.
  • Typically runs on the master node (or client machine in client mode).

Important:

  • If the driver dies → the whole job fails.
  • Heavy actions like collect(), toPandas(), or large broadcasts can crash the driver.

⚙️ 2. Executors

What They Are:

Executor processes run on worker nodes and do the actual data processing.

Responsibilities:

  • Execute the tasks assigned by the driver.
  • Store data in memory or disk (RDD/cache/broadcast variables).
  • Report task status back to the driver.

Lives In:

  • Each executor is a separate JVM on a worker node.
  • One Spark application gets its own set of executors (not shared).

Characteristics:

  • Have a fixed amount of CPU cores and memory.
  • Stay alive for the duration of the application unless explicitly terminated.

🖥️ 3. Cluster Manager

Purpose:

Manages resources (CPU, memory, executors) across all applications running in the cluster.

Types:

  • Standalone – Simple Spark-native cluster manager.
  • YARN – Hadoop’s resource manager.
  • Kubernetes – Container-native orchestration.
  • Mesos – Older general-purpose cluster manager.

Role:

  • Starts and manages executors on worker nodes.
  • Handles scheduling and resource allocation.
  • Launches the driver (in cluster mode).

🏗️ Spark Cluster Layout (Overview)

                    +-----------------------------+
                    |         Spark Driver         |
                    | (runs your Spark application)|
                    +-------------+---------------+
                                  |
                       Coordinates job execution
                                  ↓
           +-------------------------------------------+
           |                Cluster Manager             |
           |  (YARN, Kubernetes, Standalone, Mesos)     |
           +-----------------+-------------------------+
                             ↓
                  Allocates Resources (Executors)
                             ↓
         +------------------------+   +------------------------+
         |      Executor 1        |   |      Executor 2        |
         | - Runs Tasks           |   | - Runs Tasks           |
         | - Caches Data          |   | - Reports to Driver    |
         +------------------------+   +------------------------+


🧠 Spark Application Lifecycle

  1. Submit Job: spark-submit sends the application to the cluster manager.
  2. Driver Starts:
    • Parses code, builds logical plan.
    • Breaks it into stages and tasks.
  3. Executors Launch:
    • Cluster manager allocates worker nodes.
    • Executors start on those nodes.
  4. Tasks Distributed:
    • Driver assigns tasks to executors.
    • Executors process data and report back.
  5. Shuffle/Data Exchange (if needed).
  6. Result Returned (or written to storage).
  7. Cleanup: Executors shut down when job completes.

🔍 Key Concepts to Remember

ComponentDescription
JobTriggered by an action (e.g., collect, save)
StageA set of tasks that can run in parallel
TaskA unit of work (e.g., applying a function to a partition)
DAGExecution graph showing dependencies

🧠 Tips for Working with Spark Architecture

  • Avoid too many small tasks → overhead increases.
  • Don’t overload driver with large collect() calls.
  • Use persist() or cache() wisely to save recomputation.
  • Monitor Spark UI for DAG visualization and executor stats.

Sure! Here’s a short and effective breakdown of client vs cluster mode in Spark:


🔹 Client Mode

  • Driver runs on your local machine (the one running spark-submit).
  • Executors run on the cluster.
  • Best for: development, testing, or small jobs.
  • Downside: If your machine disconnects or is weak, job fails or runs slow.

🔹 Cluster Mode

  • Driver runs inside the cluster (on a worker node).
  • Fully managed by the cluster manager.
  • Best for: production jobs — more stable and scalable.
  • Works even if your submitting machine goes offline.

ModeDriver LocationUse CaseStability
ClientLocal (your machine)Dev/Test, small jobs❌ Depends on client
ClusterInside cluster nodeProduction, large jobs✅ More reliable

Projection Pruning and Predicate Pushdown — optimization techniques.

Great topic! Here’s a clear and practical explanation of Projection Pruning and Predicate Pushdown, two key query optimization techniques in Spark (and other data processing engines like Presto, Hive, etc.).


🎯 Goal of Both Techniques:

Reduce the amount of data read and processed, which improves performance, lowers memory usage, and speeds up your job.


🧾 1. Projection Pruning (a.k.a. Column Pruning)

📌 What It Does:

Only reads the columns required for your query — skips the rest.

✅ Example:

# Only selecting 2 columns from a wide dataset
df.select("id", "name").show()

🧠 Why It Helps:

  • Avoids reading unused columns from disk.
  • Especially powerful with columnar formats like Parquet, ORC, Delta.

🔧 Behind the Scenes:

  • Spark scans the schema and only loads id and name, skipping others like address, email, etc.
  • Works automatically if you use select() instead of df.*.

🔍 2. Predicate Pushdown

📌 What It Does:

Pushes filter conditions down to the data source (e.g., Parquet, JDBC, Hive) so that only matching rows are read.

✅ Example:

df.filter("age > 30").show()

🧠 Why It Helps:

  • Reduces I/O: fewer rows scanned and loaded.
  • Speeds up performance, especially for large datasets.

🔧 Works Best With:

  • Parquet, ORC, Delta Lake
  • Some JDBC sources
  • File formats and sources that support filter pushdown

🚀 Combined Example:

# Select only relevant columns and filter rows efficiently
df.select("name", "age").filter("age > 30")

This query benefits from:

  • Projection Pruning → reads only name and age columns.
  • Predicate Pushdown → filters age > 30 at the storage level, not in memory.

🛠️ Tips to Maximize Effectiveness

TechniqueDo ThisAvoid This
Projection PruningUse .select("col1", "col2")Don’t use df.* or select("*")
Predicate PushdownUse .filter() or .where() earlyAvoid filtering late in pipeline

📈 How to Check If It’s Working

  • Use the Spark UI or explain() to see the physical plan.
  • Look for terms like PushedFilters, PushedDownFilters, or column pruning in the scan node.
df.select("name").filter("age > 30").explain(True)

HDFS Commands — hands-on usage and common command scenarios.

Here’s a hands-on guide to HDFS (Hadoop Distributed File System) commands, focused on common real-world scenarios, use cases, and practical command-line examples.


📁 Basic HDFS Commands

1. Check if HDFS is running

hdfs dfsadmin -report

📂 File/Directory Management

🔸 List Files

hdfs dfs -ls /           # List root directory
hdfs dfs -ls -R /data    # Recursively list /data

🔸 Create Directories

hdfs dfs -mkdir /user/data
hdfs dfs -mkdir -p /user/data/logs  # -p makes parent dirs

🔸 Upload Files to HDFS

hdfs dfs -put localfile.txt /user/data/
hdfs dfs -copyFromLocal localfile.txt /user/data/

🔸 Download Files from HDFS

hdfs dfs -get /user/data/file.txt ./localdir/
hdfs dfs -copyToLocal /user/data/file.txt ./localdir/

🔸 Move/Rename Files

hdfs dfs -mv /user/data/file1.txt /user/archive/file1.txt

🔸 Delete Files/Dirs

hdfs dfs -rm /user/data/file.txt              # Delete file
hdfs dfs -rm -r /user/data/old_logs           # Delete dir
hdfs dfs -rm -skipTrash /user/data/file.txt   # Bypass trash

📄 Viewing File Content

🔸 View File Content

hdfs dfs -cat /user/data/sample.txt

🔸 Display Head/Tail

hdfs dfs -tail /user/data/sample.txt   # Show last KB
hdfs dfs -head /user/data/sample.txt   # First KB (if supported)

🔸 Display File Stats

hdfs dfs -stat /user/data/file.txt

🔍 File Checks and Permissions

🔸 Check Disk Usage

hdfs dfs -du -h /user/data

🔸 Set Permissions

hdfs dfs -chmod 755 /user/data
hdfs dfs -chown user:group /user/data

🔸 Check Quota or File Count

hdfs dfsadmin -fsck /user/data -files -blocks -locations

🧪 Useful Scenarios

Verify if File Exists

hdfs dfs -test -e /user/data/file.txt && echo "Exists"

Count Files and Size

hdfs dfs -count -q -h /user/data

Copy Entire Folder

hdfs dfs -cp /user/data/2024 /user/backup/

🔧 Administrative Commands (Advanced)

🔸 Report Cluster Health

hdfs dfsadmin -report

🔸 Safe Mode

hdfs dfsadmin -safemode get
hdfs dfsadmin -safemode enter
hdfs dfsadmin -safemode leave

🔸 Balancer

hdfs balancer       # Rebalances data blocks

📌 Summary: HDFS vs Linux Commands

TaskHDFS CommandLinux Equivalent
List fileshdfs dfs -ls /datals /data
Upload to HDFShdfs dfs -put local.txt /datacp local.txt /data
View file contentshdfs dfs -cat /data/file.txtcat /data/file.txt
Create directoryhdfs dfs -mkdir /data/logsmkdir /data/logs
Delete filehdfs dfs -rm /data/file.txtrm /data/file.txt

Great! Here’s a breakdown of what I’ll prepare for you:


1. HDFS Command Cheat Sheet (Markdown format)

# HDFS Command Cheat Sheet

## Basic File Operations
| Task | Command |
|------|---------|
| List files | `hdfs dfs -ls /path/` |
| Recursively list | `hdfs dfs -ls -R /path/` |
| Create directory | `hdfs dfs -mkdir /path/` |
| Create parent dirs | `hdfs dfs -mkdir -p /a/b/c` |
| Upload file | `hdfs dfs -put localfile.txt /path/` |
| Download file | `hdfs dfs -get /path/file.txt ./` |
| Move/Rename | `hdfs dfs -mv /src /dest` |
| Delete file | `hdfs dfs -rm /path/file.txt` |
| Delete dir | `hdfs dfs -rm -r /path/dir/` |

## File Content
| Task | Command |
|------|---------|
| View file | `hdfs dfs -cat /path/file.txt` |
| Tail file | `hdfs dfs -tail /path/file.txt` |
| Stat info | `hdfs dfs -stat /path/file.txt` |

## Disk Usage & Permissions
| Task | Command |
|------|---------|
| Disk usage | `hdfs dfs -du -h /path/` |
| File count | `hdfs dfs -count -q /path/` |
| Change permissions | `hdfs dfs -chmod 755 /path/` |
| Change owner | `hdfs dfs -chown user:group /path/` |

## Admin
| Task | Command |
|------|---------|
| Cluster report | `hdfs dfsadmin -report` |
| Safe mode check | `hdfs dfsadmin -safemode get` |
| Enter safe mode | `hdfs dfsadmin -safemode enter` |
| Leave safe mode | `hdfs dfsadmin -safemode leave` |
| Balance data | `hdfs balancer` |

Would you like this as a PDF as well? I can generate and send it.


🔄 2. Using HDFS with Spark or Hive

Spark Example:

# Read from HDFS
df = spark.read.text("hdfs:///user/data/file.txt")

# Save to HDFS in Parquet
df.write.parquet("hdfs:///user/data/output/")

# Use Delta if configured
df.write.format("delta").save("hdfs:///delta/events")

Hive Example:

-- Create external table on HDFS
CREATE EXTERNAL TABLE sales (
  id INT, product STRING, amount DOUBLE
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs:///user/hive/warehouse/sales_data/';

-- Query Hive table
SELECT * FROM sales;

🛠️ 3. Common Troubleshooting Tips

ErrorCauseFix
No such file or directoryWrong path or permissionshdfs dfs -ls /path/ to verify
Permission deniedLacking HDFS write/read rightsUse hdfs dfs -chmod or check ownership
Safe mode exceptionCluster in safe modeExit with hdfs dfsadmin -safemode leave
File already existsTarget file existsUse -f flag or hdfs dfs -rm first
Connection refusedNamenode down or config issueCheck Namenode status and config

Deployment — How do you deploy data pipelines in production?

Deploying data pipelines in production involves much more than just running scripts — it’s about building reliable, scalable, and maintainable workflows that deliver data on time and correctly. Here’s a step-by-step guide to production-grade deployment.


✅ 1. Pipeline Design

Before deployment, ensure your pipeline has:

  • Modular components (e.g., ingestion, transformation, load)
  • Parameterization (dates, paths, configs)
  • Error handling & logging
  • Support for retries and idempotency (safe to re-run)

🚀 2. Code Packaging & Versioning

  • Use a version control system (e.g., Git)
  • Package code into deployable units:
    • Python scripts → .py modules or .whl
    • Spark jobs → .jar or .py via spark-submit
  • Manage dependencies via requirements.txt, poetry, or Docker

🛠️ 3. Orchestration with Workflow Tools

Use orchestration tools to manage scheduling, retries, and dependencies:

ToolTypeBest For
AirflowCode-first DAGsFlexible batch scheduling
DagsterData-aware DAGsStrong typing, dev-friendly
LuigiPython pipelinesLightweight workflows
PrefectPython-nativeDynamic flows, cloud integration
dbt CloudSQL-basedAnalytics/ELT-focused pipelines

Example with Airflow:

from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG("daily_sales_etl", schedule_interval="@daily") as dag:
    extract = BashOperator(task_id="extract", bash_command="python extract.py")
    transform = BashOperator(task_id="transform", bash_command="python transform.py")
    load = BashOperator(task_id="load", bash_command="python load.py")
    
    extract >> transform >> load

📦 4. Environment Configuration

  • Separate environments: dev, staging, production
  • Use config files, env vars, or Secrets Managers (AWS Secrets Manager, Vault) to manage credentials & paths
  • Avoid hardcoding anything

🔄 5. Scheduling & Triggers

  • Use cron-like schedules (@daily, 0 3 * * *)
  • Support backfilling (for missed runs)
  • Trigger pipelines via:
    • APIs
    • File arrivals (S3, GCS, HDFS events)
    • Event queues (Kafka, Pub/Sub)

🧪 6. Testing & Validation

  • Unit test functions (e.g., transformation logic)
  • Integration test with sample datasets
  • Validate output schema & data quality (e.g., with Great Expectations)

📊 7. Monitoring & Alerts

  • Use built-in tools (Airflow UI, Spark UI)
  • Send alerts via Slack, email, PagerDuty
  • Track:
    • Runtime
    • Failure counts
    • Data volumes
    • Schema changes

🧰 8. Infrastructure & Deployment Tools

  • Containerize your pipeline with Docker
  • Deploy via CI/CD:
    • GitHub Actions, GitLab CI, Jenkins
  • Use infrastructure-as-code:
    • Terraform, Pulumi (to manage clusters, buckets, etc.)

🧵 9. Execution Environments

Depending on your stack:

  • Spark: Run on YARN, Kubernetes, EMR, Databricks
  • Python: Use Kubernetes, Docker, or serverless (AWS Lambda, Cloud Functions)
  • SQL: Trigger dbt, BigQuery jobs, or stored procedures

✅ 10. Post-Deployment Checks

  • Sanity-check the first few runs
  • Check log output and metrics
  • Ensure data landed correctly
  • Set up SLA checks

Summary Cheat Sheet

StepWhat to Do
Code & PackageGit + Docker/Whl/Jar
OrchestrateAirflow, Dagster, dbt, Prefect
Environment SeparationDev / Stage / Prod with config mgmt
Schedule & TriggerCron, Events, APIs
Validate & TestGreat Expectations, pytest
Monitor & AlertLogs, metrics, Slack alerts
Automate DeploymentsCI/CD + IaC (Terraform)

Absolutely — here’s a specific example of deploying a PySpark job with Airflow on AWS EMR, which is a common, production-grade setup.


🚀 Goal:

Deploy and schedule a Spark job on Amazon EMR using Apache Airflow.


🧱 Setup Components:

ComponentDescription
AirflowWorkflow scheduler (can run on EC2, ECS, or MWAA)
AWS EMRManaged Hadoop/Spark cluster
S3Stores Spark scripts and input/output data
Spark ScriptYour PySpark job (process_data.py)

📁 Example Directory Structure:

airflow/
│
├── dags/
│   └── emr_spark_job_dag.py      # Airflow DAG
├── spark_jobs/
│   └── process_data.py           # Spark job
├── requirements.txt

🧾 1. Sample PySpark Job (process_data.py)

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ProcessDataJob").getOrCreate()

df = spark.read.json("s3://my-bucket/input-data/")
df_filtered = df.filter("age > 25")
df_filtered.write.parquet("s3://my-bucket/output-data/", mode="overwrite")

spark.stop()

📜 2. Airflow DAG (emr_spark_job_dag.py)

from airflow import DAG
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
from datetime import datetime

JOB_FLOW_OVERRIDES = {
    "Name": "Airflow_EMR_Cluster",
    "ReleaseLabel": "emr-6.12.0",
    "Applications": [{"Name": "Spark"}],
    "Instances": {
        "InstanceGroups": [
            {
                "Name": "Master nodes",
                "InstanceRole": "MASTER",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 1
            },
            {
                "Name": "Core nodes",
                "InstanceRole": "CORE",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 2
            }
        ],
        "KeepJobFlowAliveWhenNoSteps": True,
        "TerminationProtected": False
    },
    "JobFlowRole": "EMR_EC2_DefaultRole",
    "ServiceRole": "EMR_DefaultRole",
    "LogUri": "s3://my-bucket/emr-logs/"
}

SPARK_STEPS = [
    {
        "Name": "Run PySpark Job",
        "ActionOnFailure": "CONTINUE",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "spark-submit",
                "--deploy-mode", "cluster",
                "s3://my-bucket/spark_jobs/process_data.py"
            ]
        }
    }
]

with DAG(
    dag_id="emr_spark_job_pipeline",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily",
    catchup=False
) as dag:

    create_cluster = EmrCreateJobFlowOperator(
        task_id="create_emr_cluster",
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id="aws_default"
    )

    add_step = EmrAddStepsOperator(
        task_id="add_emr_step",
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
        steps=SPARK_STEPS,
        aws_conn_id="aws_default"
    )

    monitor_step = EmrStepSensor(
        task_id="watch_step",
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
        step_id="{{ task_instance.xcom_pull(task_ids='add_emr_step', key='return_value')[0] }}",
        aws_conn_id="aws_default"
    )

    terminate_cluster = EmrTerminateJobFlowOperator(
        task_id="terminate_emr_cluster",
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
        aws_conn_id="aws_default",
        trigger_rule="all_done"  # terminate even if step fails
    )

    create_cluster >> add_step >> monitor_step >> terminate_cluster

🧰 3. Deployment Steps

  1. Upload Spark job to S3: aws s3 cp spark_jobs/process_data.py s3://my-bucket/spark_jobs/
  2. Ensure IAM roles (EMR_DefaultRole, EMR_EC2_DefaultRole) have permissions to access S3 and run Spark.
  3. Deploy DAG to Airflow:
    • Place DAG file in airflow/dags/
    • Restart Airflow scheduler/webserver if needed
  4. Monitor Execution:
    • Airflow UI → Logs
    • EMR Console → Cluster and Step logs
    • S3 → Output directory

✅ Benefits of This Setup

FeatureBenefit
EMRScalable, managed Spark cluster
AirflowDeclarative scheduling, retries, logging
S3Durable storage for jobs and data
DecoupledYou can scale EMR independently of Airflow

Perfect — let’s briefly walk through 3 alternative deployment approaches for your Spark job, using:

  1. EMR Serverless
  2. Databricks Jobs
  3. AWS Glue

These are managed Spark execution platforms, each with different trade-offs in terms of cost, control, and complexity.


✅ 1. EMR Serverless + Airflow

🔍 What It Is:

A fully managed, serverless runtime for Spark — you don’t provision clusters. Just submit jobs, and EMR handles scaling.

✅ Ideal For:

  • On-demand ETL jobs
  • No cluster management
  • Usage-based billing

📜 DAG Snippet (Using EmrServerlessStartJobOperator from Airflow v2.8+):

from airflow.providers.amazon.aws.operators.emr import EmrServerlessStartJobOperator
from airflow import DAG
from datetime import datetime

with DAG("emr_serverless_spark", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:

    spark_job = EmrServerlessStartJobOperator(
        task_id="run_spark_job",
        application_id="your-emr-serverless-app-id",
        execution_role_arn="arn:aws:iam::123456789012:role/EMRServerlessExecutionRole",
        job_driver={
            "sparkSubmit": {
                "entryPoint": "s3://my-bucket/spark_jobs/process_data.py",
                "sparkSubmitParameters": "--conf spark.executor.memory=2G"
            }
        },
        configuration_overrides={
            "monitoringConfiguration": {
                "s3MonitoringConfiguration": {
                    "logUri": "s3://my-bucket/emr-serverless-logs/"
                }
            }
        },
        aws_conn_id="aws_default"
    )

✅ 2. Databricks Jobs + Airflow

🔍 What It Is:

Fully managed Spark platform optimized for big data and ML. Great IDE, collaboration, and performance tuning.

✅ Ideal For:

  • Teams needing UI + API
  • ML + SQL + Streaming workloads
  • Deep Spark integration

📜 DAG Snippet (Using DatabricksSubmitRunOperator):

from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow import DAG
from datetime import datetime

with DAG("databricks_spark_job", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:

    run_job = DatabricksSubmitRunOperator(
        task_id="run_databricks_spark",
        databricks_conn_id="databricks_default",
        new_cluster={
            "spark_version": "13.3.x-scala2.12",
            "node_type_id": "i3.xlarge",
            "num_workers": 2
        },
        notebook_task={
            "notebook_path": "/Shared/process_data"
        }
    )

🔐 Needs a Databricks token and workspace configured in your Airflow connection.


✅ 3. AWS Glue Jobs + Airflow

🔍 What It Is:

A serverless ETL service from AWS that runs Spark under the hood (with PySpark or Scala support).

✅ Ideal For:

  • Catalog-based ETL (tied to AWS Glue Data Catalog)
  • Serverless, cost-efficient batch processing
  • Lightweight job logic

📜 DAG Snippet (Using AwsGlueJobOperator):

from airflow.providers.amazon.aws.operators.glue import AwsGlueJobOperator
from airflow import DAG
from datetime import datetime

with DAG("glue_spark_job", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:

    glue_job = AwsGlueJobOperator(
        task_id="run_glue_job",
        job_name="my_glue_spark_job",
        script_location="s3://my-bucket/glue-scripts/process_data.py",
        iam_role_name="GlueServiceRole",
        region_name="us-east-1",
        num_of_dpus=10,
    )

⚖️ Comparison Summary

PlatformCluster ManagementBest ForCost ControlComplexity
EMR Serverless❌ NoAd hoc Spark/ETLPay-per-secondMedium
Databricks✅ Yes (managed)Enterprise Spark + MLSubscription + spotLow
AWS Glue❌ NoServerless catalog-driven ETLPay-per-DPU-hourLow

Here’s a real-world CI/CD deployment template using GitHub Actions to deploy Airflow DAGs and Spark jobs (e.g., to S3 for EMR Serverless, AWS Glue, or even Databricks).


✅ CI/CD Deployment Pipeline with GitHub Actions

🎯 Goal:

  • Lint, test, and deploy Airflow DAGs
  • Upload PySpark scripts to S3
  • Optionally trigger a job run (EMR/Glue/Databricks)

📁 Project Structure

my-data-pipeline/
├── dags/
│   └── my_airflow_dag.py
├── spark_jobs/
│   └── process_data.py
├── tests/
│   └── test_transformations.py
├── requirements.txt
├── .github/
│   └── workflows/
│       └── deploy.yml

📜 .github/workflows/deploy.yml

name: Deploy Data Pipeline

on:
  push:
    branches: [main]
    paths:
      - 'dags/**'
      - 'spark_jobs/**'
      - '.github/workflows/**'

env:
  AWS_REGION: us-east-1
  S3_BUCKET: my-bucket
  S3_SCRIPTS_PREFIX: spark_jobs/

jobs:
  deploy:
    runs-on: ubuntu-latest

    steps:
    - name: Checkout Code
      uses: actions/checkout@v3

    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: 3.9

    - name: Install Dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt

    - name: Run Unit Tests
      run: |
        pytest tests/

    - name: Upload PySpark Jobs to S3
      uses: jakejarvis/s3-sync-action@v0.5.1
      with:
        args: --acl private --follow-symlinks
      env:
        AWS_S3_BUCKET: ${{ env.S3_BUCKET }}
        AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
        AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
        AWS_REGION: ${{ env.AWS_REGION }}
        SOURCE_DIR: spark_jobs
        DEST_DIR: ${{ env.S3_SCRIPTS_PREFIX }}

    - name: Deploy DAGs to Airflow (via SSH)
      uses: appleboy/scp-action@v0.1.4
      with:
        host: ${{ secrets.AIRFLOW_HOST }}
        username: ${{ secrets.AIRFLOW_USER }}
        key: ${{ secrets.AIRFLOW_SSH_KEY }}
        source: "dags/"
        target: "/opt/airflow/dags/"

    # Optional: Trigger EMR Serverless or Glue Job
    - name: Trigger EMR Serverless Job (Optional)
      run: |
        aws emr-serverless start-job-run \
          --application-id YOUR-APP-ID \
          --execution-role-arn arn:aws:iam::123456789012:role/EMRServerlessExecutionRole \
          --job-driver '{
            "sparkSubmit": {
              "entryPoint": "s3://${{ env.S3_BUCKET }}/${{ env.S3_SCRIPTS_PREFIX }}process_data.py"
            }
          }' \
          --region ${{ env.AWS_REGION }}
      env:
        AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
        AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

🔐 Secrets You’ll Need in GitHub

Set these under GitHub → Repo Settings → Secrets and variables → Actions:

Secret NameDescription
AWS_ACCESS_KEY_IDIAM access key with S3/EMR/Glue
AWS_SECRET_ACCESS_KEYIAM secret key
AIRFLOW_HOSTHost/IP of your Airflow instance
AIRFLOW_USERSSH username
AIRFLOW_SSH_KEYSSH private key for Airflow server

✅ Summary of Workflow Steps

  1. Checks out code
  2. Installs Python dependencies
  3. Runs unit tests
  4. Syncs Spark scripts to S3
  5. Deploys DAGs to Airflow via SSH
  6. (Optional): Triggers EMR Serverless job

⚙️ Need Variants?

  • Want to deploy to Databricks? Use databricks-cli with a GitHub Action like: databricks jobs run-now --job-id 12345 --profile DEFAULT
  • Deploy to AWS Glue? Use: aws glue start-job-run --job-name my_glue_job

Decorators in Python — use cases and real-world application.