Explain a scenario on schema evolution in data pipelines
Here’s an automated Python script using PySpark that performs schema evolution between two datasets (e.g., two Parquet files or DataFrames):
✅ Features:
- Compares schemas of two DataFrames (e.g., old vs. new).
- Identifies added or missing fields.
- Adds missing fields with default values so that both DataFrames can be aligned and merged.
- Merges the two DataFrames safely.
🔧 Prerequisites:
- Apache Spark (PySpark)
- Input: Two DataFrames (could be from old and new Parquet files)
🧠 Script: Schema Evolution Handler
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType
# Start Spark Session
spark = SparkSession.builder \
.appName("SchemaEvolutionHandler") \
.getOrCreate()
def align_schemas(df1, df2, default_map=None):
"""
Aligns two DataFrames by adding missing fields with default values.
Args:
df1: First DataFrame
df2: Second DataFrame
default_map: Dict of default values to use for new fields
Returns:
Tuple of (aligned_df1, aligned_df2)
"""
if default_map is None:
default_map = {}
# Get schemas
schema1 = set(df1.schema.names)
schema2 = set(df2.schema.names)
# Find differences
fields_only_in_df1 = schema1 - schema2
fields_only_in_df2 = schema2 - schema1
# Add missing columns with default values to df2
for col in fields_only_in_df1:
default_val = default_map.get(col, None)
df2 = df2.withColumn(col, lit(default_val))
# Add missing columns with default values to df1
for col in fields_only_in_df2:
default_val = default_map.get(col, None)
df1 = df1.withColumn(col, lit(default_val))
# Reorder columns for consistency
final_columns = sorted(df1.columns) # Or define your own order
return df1.select(final_columns), df2.select(final_columns)
# Example usage:
# Load two DataFrames (e.g., old and new)
df_old = spark.read.parquet("path/to/old_data.parquet")
df_new = spark.read.parquet("path/to/new_data.parquet")
# Optional: Provide default values for specific columns
defaults = {
"discount": 0.0,
"promo_code": None,
"shipping_cost": 0.0
}
# Align schemas
df_old_aligned, df_new_aligned = align_schemas(df_old, df_new, default_map=defaults)
# Merge or compare
df_merged = df_old_aligned.unionByName(df_new_aligned)
# Save or analyze
df_merged.write.mode("overwrite").parquet("path/to/merged_output.parquet")
🔍 Output:
- A merged dataset with all fields from both versions.
- Missing fields filled with appropriate default values.
💡 Notes:
- You can plug this into your ETL pipeline before merging historical and new data.
- Extend it for nested schemas (with recursive functions) if dealing with complex structures.
Automated script for schema evolution. first to check what fields are missing or added. then for that list we assign a default value so that both tables can be merged compared
Yes, absolutely — you can use df1.columns
and df2.columns
to simplify the comparison when you’re only dealing with flat (non-nested) schemas. Here’s a cleaner, minimal version of the schema evolution logic using column lists:
✅ Flat Schema Evolution Script (Using df.columns
)
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
# Start Spark Session
spark = SparkSession.builder.appName("FlatSchemaEvolution").getOrCreate()
# Load DataFrames
df1 = spark.read.parquet("path/to/old_data.parquet")
df2 = spark.read.parquet("path/to/new_data.parquet")
# Optional: Default values for new fields
default_values = {
"discount": 0.0,
"promo_code": None,
"shipping_cost": 0.0
}
# Get column sets
cols1 = set(df1.columns)
cols2 = set(df2.columns)
# Identify missing columns
missing_in_df1 = cols2 - cols1
missing_in_df2 = cols1 - cols2
# Add missing columns to df1
for col in missing_in_df1:
default_val = default_values.get(col, None)
df1 = df1.withColumn(col, lit(default_val))
# Add missing columns to df2
for col in missing_in_df2:
default_val = default_values.get(col, None)
df2 = df2.withColumn(col, lit(default_val))
# Align column order (sorted or any desired order)
final_columns = sorted(list(cols1.union(cols2)))
df1_aligned = df1.select(final_columns)
df2_aligned = df2.select(final_columns)
# Merge or Compare
df_merged = df1_aligned.unionByName(df2_aligned)
# Save result
df_merged.write.mode("overwrite").parquet("path/to/merged_output.parquet")
🧠 Why This Works:
df.columns
gives a simple list of column names.- You use basic set operations to find differences.
- You fill in missing columns using
withColumn()
andlit()
.
🛑 Limitation:
This version won’t work with nested fields (like JSON inside a column). For that, you’d need recursive schema inspection using df.schema
.
Here’s a PySpark script that handles nested JSON structures for schema evolution. This version recursively:
- Compares two schemas.
- Identifies missing fields at all levels (even inside nested structs).
- Adds those missing fields with default values so both DataFrames can be aligned and merged.
🧠 Key Concepts:
- Uses
StructType
andStructField
to parse nested schemas. - Recursively aligns nested fields by building a merged schema.
- Fills in missing fields with default values (
None
, or you can customize).
✅ Full Script: Schema Evolution for Nested Structures
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, struct
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, BooleanType
# Start Spark session
spark = SparkSession.builder.appName("NestedSchemaEvolution").getOrCreate()
def merge_schemas(schema1: StructType, schema2: StructType, default_map=None):
""" Recursively merges two StructType schemas, returning a merged schema. """
if default_map is None:
default_map = {}
fields = {}
# Add fields from schema1
for field in schema1.fields:
fields[field.name] = field
# Merge/compare with fields from schema2
for field in schema2.fields:
if field.name in fields:
existing_field = fields[field.name]
# If both are Structs, recurse
if isinstance(existing_field.dataType, StructType) and isinstance(field.dataType, StructType):
merged_sub_schema = merge_schemas(existing_field.dataType, field.dataType, default_map)
fields[field.name] = StructField(field.name, merged_sub_schema, True)
else:
fields[field.name] = field
return StructType(list(fields.values()))
def fill_missing_fields(df, merged_schema, default_map=None):
""" Recursively adds missing fields (at any level) with default values """
if default_map is None:
default_map = {}
def _fill(schema, prefix=""):
for field in schema.fields:
full_name = f"{prefix}.{field.name}" if prefix else field.name
if isinstance(field.dataType, StructType):
if full_name not in df.columns:
df_local = df.withColumn(field.name, lit(None).cast(field.dataType))
else:
df_local = df
df_struct = df_local.select(f"{field.name}.*")
df_struct_filled = fill_missing_fields(df_struct, field.dataType, default_map)
df_local = df_local.drop(field.name).withColumn(field.name, struct(*[df_struct_filled[col] for col in df_struct_filled.columns]))
return df_local
else:
if field.name not in df.columns:
default_val = default_map.get(full_name, None)
df_local = df.withColumn(field.name, lit(default_val).cast(field.dataType))
return df_local
return df
return _fill(merged_schema)
# Example: Load two JSON-based DataFrames
df1 = spark.read.json("path/to/old_data.json")
df2 = spark.read.json("path/to/new_data.json")
# Optional: Set default values for specific nested fields
default_values = {
"discount": 0.0,
"promo_code": None,
"customer.email": "unknown@example.com"
}
# Merge schemas
merged_schema = merge_schemas(df1.schema, df2.schema, default_map=default_values)
# Fill missing fields
df1_filled = fill_missing_fields(df1, merged_schema, default_map=default_values)
df2_filled = fill_missing_fields(df2, merged_schema, default_map=default_values)
# Ensure column order matches
final_columns = df1_filled.columns # should be the same now
df1_aligned = df1_filled.select(final_columns)
df2_aligned = df2_filled.select(final_columns)
# Merge
df_merged = df1_aligned.unionByName(df2_aligned)
# Save output
df_merged.write.mode("overwrite").json("path/to/merged_output.json")
🔍 Example Input:
Old Schema:
{
"order_id": "123",
"customer": {
"name": "Alice"
}
}
New Schema:
{
"order_id": "456",
"customer": {
"name": "Bob",
"email": "bob@example.com"
},
"discount": 5.0
}
🛑 Notes:
- This version handles one level of nested structs.
- If you have deeper levels of nesting or arrays of structs, it can be extended — let me know if you want that.
Delta Lake questions — concepts, use cases, and commands.
Sure! Here’s a structured breakdown of Delta Lake covering concepts, use cases, and commands:
🔍 Delta Lake Concepts
Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark and big data workloads.
Key Concepts:
- ACID Transactions: Guarantees Atomicity, Consistency, Isolation, and Durability.
- Schema Enforcement and Evolution: Ensures data adheres to a defined schema and supports schema changes over time.
- Time Travel: Access previous versions of the data (using versions or timestamps).
- Unified Batch and Streaming: Same Delta table can be used for both batch and streaming jobs.
- Data Lineage and Audit: Built-in support for tracking data changes and version history.
- Scalable Metadata Handling: Uses transaction logs (
_delta_log
) instead of listing files.
💼 Use Cases
1. Data Lakehouse Architecture
- Combine the reliability of a data warehouse with the scalability of a data lake.
2. Slowly Changing Dimensions (SCD)
- Maintain historical changes in records.
3. Change Data Capture (CDC)
- Efficiently track and apply changes (inserts, updates, deletes).
4. Streaming + Batch Pipeline
- Process real-time data with the same Delta tables used for batch processing.
5. Data Versioning & Rollbacks
- Restore data to previous states in case of bad writes or failures.
6. Data Governance
- Enforce schema, maintain audit trails, and ensure consistency across jobs.
💻 Delta Lake Commands (on Apache Spark or Databricks)
📁 Create / Write Tables
df.write.format("delta").save("/delta/events") # Write to path
df.write.format("delta").saveAsTable("events") # Save as managed table
📥 Read Data
df = spark.read.format("delta").load("/delta/events")
df = spark.read.table("events")
📌 Update Table
UPDATE events SET eventType = 'click' WHERE eventType = 'tap'
❌ Delete Records
DELETE FROM events WHERE eventType = 'spam'
➕ Merge (Upsert)
MERGE INTO target_table USING source_table
ON target_table.id = source_table.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
⏪ Time Travel
# By version
df = spark.read.format("delta").option("versionAsOf", 3).load("/delta/events")
# By timestamp
df = spark.read.format("delta").option("timestampAsOf", "2024-05-01").load("/delta/events")
🔄 Convert Parquet to Delta
from delta.tables import DeltaTable
DeltaTable.convertToDelta(spark, "parquet.`/path/to/parquet`")
🧾 Vacuum (Clean old files)
VACUUM events RETAIN 168 HOURS -- default is 7 days
📜 Describe History (Versioning)
DESCRIBE HISTORY events
✅ Summary
Feature | Benefit |
---|---|
ACID Transactions | Reliable writes/reads |
Schema Enforcement | Data quality |
Time Travel | Data recovery & debugging |
Streaming Support | Real-time + batch pipelines |
Metadata Logging | Audit and versioning |
Memory Tuning — focused on Spark’s cluster-level memory: driver vs executor configs.
Great — let’s walk through Spark memory tuning at the cluster level, with a focus on driver vs executor memory configs. This is a key area for improving performance, preventing OOM (Out of Memory) errors, and making the most of your cluster resources.
🔧 Core Spark Memory Tuning Parameters
🧠 Basics:
- Driver: Coordinates the job, maintains metadata (RDD lineage, DAG), and collects small results.
- Executors: Do the heavy lifting — they run the tasks and hold data in memory (e.g., for shuffle, cache).
🔩 Key Cluster-Level Memory Settings
Config Option | Applies To | Purpose |
---|---|---|
spark.driver.memory | Driver | Memory allocated to the driver JVM. |
spark.executor.memory | Executors | Memory for each executor JVM. |
spark.executor.instances | Cluster | Number of executors to launch. |
spark.executor.cores | Executors | Number of cores per executor. |
spark.driver.memoryOverhead | Driver | Off-heap memory for native overhead. |
spark.executor.memoryOverhead | Executors | Off-heap memory (shuffle, JNI, etc.). |
🧮 Memory Breakdown Inside an Executor
spark.executor.memory (e.g. 8G)
|
├── Execution Memory (for shuffles, joins, aggregations)
└── Storage Memory (for caching, broadcasts)
spark.memory.fraction = 0.6 (default)
└── 60% of executor memory is usable by Spark (rest is JVM heap)
Example: If spark.executor.memory = 8g
, then:
- ~4.8 GB for Spark memory (60%)
- Split between execution & storage
💡 You can tweak
spark.memory.fraction
andspark.memory.storageFraction
for finer control.
🎯 Tuning Scenarios
Scenario 1: Driver Out of Memory
Symptoms:
- Error like
java.lang.OutOfMemoryError: Java heap space
from driver - Happens often with large collect, toPandas(), or large broadcast joins
Fixes:
- Increase driver memory:
--conf spark.driver.memory=4g --conf spark.driver.memoryOverhead=1g
- Avoid large
collect()
operations
Scenario 2: Executor OOM or Slow Tasks
Symptoms:
- Executors die with memory errors
- GC overhead or spills to disk during shuffles/joins
Fixes:
- Increase executor memory:
--conf spark.executor.memory=8g --conf spark.executor.memoryOverhead=2g
- Balance
executor.memory
andexecutor.instances
to use the cluster efficiently - Repartition data to avoid large shuffles
- Avoid caching very large datasets unless needed
Scenario 3: Poor Cluster Utilization
Symptoms:
- Some nodes idle, some overloaded
- Wasted memory or under-parallelized tasks
Fixes:
- Adjust:
--conf spark.executor.instances --conf spark.executor.cores
- Use dynamic allocation (
spark.dynamicAllocation.enabled=true
) withshuffle.service.enabled=true
✅ General Guidelines
Cluster Size | Suggested Config |
---|---|
Small (e.g. 16 cores, 64 GB RAM) | 4 executors, 4 cores, 12g each |
Medium (e.g. 32 cores, 128 GB) | 8 executors, 4 cores, 14g each |
Large | Tune based on workload type (ETL, ML, SQL) |
🔍 Tools for Monitoring
- Spark UI → Executors Tab (look for memory usage, GC time)
- Ganglia / Prometheus → Cluster metrics
- Logs → Check stderr for OOM or GC-related errors
🧠 Pro Tips
- Don’t just increase memory — investigate why memory is used (wide transformations? skew? caching?)
- Avoid very large executors (>16g heap) — GC becomes less efficient.
- For large shuffles, increase
spark.shuffle.file.buffer
,spark.reducer.maxSizeInFlight
Spark Architecture — detailed explanation of driver, executor, and cluster components
Here’s a detailed breakdown of Apache Spark architecture, focusing on the driver, executors, and cluster components. Understanding these is essential for designing efficient Spark jobs and managing resources properly.
🚂 1. Driver Program
What It Is:
The driver is the master process that runs your Spark application’s main function. It orchestrates the entire execution of the job.
Responsibilities:
- Converts your Spark code (RDDs, DataFrames, etc.) into a logical execution plan.
- Builds a Directed Acyclic Graph (DAG) of stages.
- Submits tasks to the cluster for execution.
- Tracks task progress and retries failed tasks.
- Maintains cluster resource coordination via the cluster manager.
Lives In:
- Your application’s main JVM process.
- Typically runs on the master node (or client machine in client mode).
Important:
- If the driver dies → the whole job fails.
- Heavy actions like
collect()
,toPandas()
, or large broadcasts can crash the driver.
⚙️ 2. Executors
What They Are:
Executor processes run on worker nodes and do the actual data processing.
Responsibilities:
- Execute the tasks assigned by the driver.
- Store data in memory or disk (RDD/cache/broadcast variables).
- Report task status back to the driver.
Lives In:
- Each executor is a separate JVM on a worker node.
- One Spark application gets its own set of executors (not shared).
Characteristics:
- Have a fixed amount of CPU cores and memory.
- Stay alive for the duration of the application unless explicitly terminated.
🖥️ 3. Cluster Manager
Purpose:
Manages resources (CPU, memory, executors) across all applications running in the cluster.
Types:
- Standalone – Simple Spark-native cluster manager.
- YARN – Hadoop’s resource manager.
- Kubernetes – Container-native orchestration.
- Mesos – Older general-purpose cluster manager.
Role:
- Starts and manages executors on worker nodes.
- Handles scheduling and resource allocation.
- Launches the driver (in cluster mode).
🏗️ Spark Cluster Layout (Overview)
+-----------------------------+
| Spark Driver |
| (runs your Spark application)|
+-------------+---------------+
|
Coordinates job execution
↓
+-------------------------------------------+
| Cluster Manager |
| (YARN, Kubernetes, Standalone, Mesos) |
+-----------------+-------------------------+
↓
Allocates Resources (Executors)
↓
+------------------------+ +------------------------+
| Executor 1 | | Executor 2 |
| - Runs Tasks | | - Runs Tasks |
| - Caches Data | | - Reports to Driver |
+------------------------+ +------------------------+
🧠 Spark Application Lifecycle
- Submit Job:
spark-submit
sends the application to the cluster manager. - Driver Starts:
- Parses code, builds logical plan.
- Breaks it into stages and tasks.
- Executors Launch:
- Cluster manager allocates worker nodes.
- Executors start on those nodes.
- Tasks Distributed:
- Driver assigns tasks to executors.
- Executors process data and report back.
- Shuffle/Data Exchange (if needed).
- Result Returned (or written to storage).
- Cleanup: Executors shut down when job completes.
🔍 Key Concepts to Remember
Component | Description |
---|---|
Job | Triggered by an action (e.g., collect , save ) |
Stage | A set of tasks that can run in parallel |
Task | A unit of work (e.g., applying a function to a partition) |
DAG | Execution graph showing dependencies |
🧠 Tips for Working with Spark Architecture
- Avoid too many small tasks → overhead increases.
- Don’t overload driver with large
collect()
calls. - Use
persist()
orcache()
wisely to save recomputation. - Monitor Spark UI for DAG visualization and executor stats.
Sure! Here’s a short and effective breakdown of client vs cluster mode in Spark:
🔹 Client Mode
- Driver runs on your local machine (the one running
spark-submit
). - Executors run on the cluster.
- Best for: development, testing, or small jobs.
- Downside: If your machine disconnects or is weak, job fails or runs slow.
🔹 Cluster Mode
- Driver runs inside the cluster (on a worker node).
- Fully managed by the cluster manager.
- Best for: production jobs — more stable and scalable.
- Works even if your submitting machine goes offline.
Mode | Driver Location | Use Case | Stability |
---|---|---|---|
Client | Local (your machine) | Dev/Test, small jobs | ❌ Depends on client |
Cluster | Inside cluster node | Production, large jobs | ✅ More reliable |
Projection Pruning and Predicate Pushdown — optimization techniques.
Great topic! Here’s a clear and practical explanation of Projection Pruning and Predicate Pushdown, two key query optimization techniques in Spark (and other data processing engines like Presto, Hive, etc.).
🎯 Goal of Both Techniques:
Reduce the amount of data read and processed, which improves performance, lowers memory usage, and speeds up your job.
🧾 1. Projection Pruning (a.k.a. Column Pruning)
📌 What It Does:
Only reads the columns required for your query — skips the rest.
✅ Example:
# Only selecting 2 columns from a wide dataset
df.select("id", "name").show()
🧠 Why It Helps:
- Avoids reading unused columns from disk.
- Especially powerful with columnar formats like Parquet, ORC, Delta.
🔧 Behind the Scenes:
- Spark scans the schema and only loads
id
andname
, skipping others likeaddress
,email
, etc. - Works automatically if you use
select()
instead ofdf.*
.
🔍 2. Predicate Pushdown
📌 What It Does:
Pushes filter conditions down to the data source (e.g., Parquet, JDBC, Hive) so that only matching rows are read.
✅ Example:
df.filter("age > 30").show()
🧠 Why It Helps:
- Reduces I/O: fewer rows scanned and loaded.
- Speeds up performance, especially for large datasets.
🔧 Works Best With:
- Parquet, ORC, Delta Lake
- Some JDBC sources
- File formats and sources that support filter pushdown
🚀 Combined Example:
# Select only relevant columns and filter rows efficiently
df.select("name", "age").filter("age > 30")
This query benefits from:
- Projection Pruning → reads only
name
andage
columns. - Predicate Pushdown → filters
age > 30
at the storage level, not in memory.
🛠️ Tips to Maximize Effectiveness
Technique | Do This | Avoid This |
---|---|---|
Projection Pruning | Use .select("col1", "col2") | Don’t use df.* or select("*") |
Predicate Pushdown | Use .filter() or .where() early | Avoid filtering late in pipeline |
📈 How to Check If It’s Working
- Use the Spark UI or
explain()
to see the physical plan. - Look for terms like
PushedFilters
,PushedDownFilters
, or column pruning in the scan node.
df.select("name").filter("age > 30").explain(True)
HDFS Commands — hands-on usage and common command scenarios.
Here’s a hands-on guide to HDFS (Hadoop Distributed File System) commands, focused on common real-world scenarios, use cases, and practical command-line examples.
📁 Basic HDFS Commands
1. Check if HDFS is running
hdfs dfsadmin -report
📂 File/Directory Management
🔸 List Files
hdfs dfs -ls / # List root directory
hdfs dfs -ls -R /data # Recursively list /data
🔸 Create Directories
hdfs dfs -mkdir /user/data
hdfs dfs -mkdir -p /user/data/logs # -p makes parent dirs
🔸 Upload Files to HDFS
hdfs dfs -put localfile.txt /user/data/
hdfs dfs -copyFromLocal localfile.txt /user/data/
🔸 Download Files from HDFS
hdfs dfs -get /user/data/file.txt ./localdir/
hdfs dfs -copyToLocal /user/data/file.txt ./localdir/
🔸 Move/Rename Files
hdfs dfs -mv /user/data/file1.txt /user/archive/file1.txt
🔸 Delete Files/Dirs
hdfs dfs -rm /user/data/file.txt # Delete file
hdfs dfs -rm -r /user/data/old_logs # Delete dir
hdfs dfs -rm -skipTrash /user/data/file.txt # Bypass trash
📄 Viewing File Content
🔸 View File Content
hdfs dfs -cat /user/data/sample.txt
🔸 Display Head/Tail
hdfs dfs -tail /user/data/sample.txt # Show last KB
hdfs dfs -head /user/data/sample.txt # First KB (if supported)
🔸 Display File Stats
hdfs dfs -stat /user/data/file.txt
🔍 File Checks and Permissions
🔸 Check Disk Usage
hdfs dfs -du -h /user/data
🔸 Set Permissions
hdfs dfs -chmod 755 /user/data
hdfs dfs -chown user:group /user/data
🔸 Check Quota or File Count
hdfs dfsadmin -fsck /user/data -files -blocks -locations
🧪 Useful Scenarios
✅ Verify if File Exists
hdfs dfs -test -e /user/data/file.txt && echo "Exists"
✅ Count Files and Size
hdfs dfs -count -q -h /user/data
✅ Copy Entire Folder
hdfs dfs -cp /user/data/2024 /user/backup/
🔧 Administrative Commands (Advanced)
🔸 Report Cluster Health
hdfs dfsadmin -report
🔸 Safe Mode
hdfs dfsadmin -safemode get
hdfs dfsadmin -safemode enter
hdfs dfsadmin -safemode leave
🔸 Balancer
hdfs balancer # Rebalances data blocks
📌 Summary: HDFS vs Linux Commands
Task | HDFS Command | Linux Equivalent |
---|---|---|
List files | hdfs dfs -ls /data | ls /data |
Upload to HDFS | hdfs dfs -put local.txt /data | cp local.txt /data |
View file contents | hdfs dfs -cat /data/file.txt | cat /data/file.txt |
Create directory | hdfs dfs -mkdir /data/logs | mkdir /data/logs |
Delete file | hdfs dfs -rm /data/file.txt | rm /data/file.txt |
Great! Here’s a breakdown of what I’ll prepare for you:
✅ 1. HDFS Command Cheat Sheet (Markdown format)
# HDFS Command Cheat Sheet
## Basic File Operations
| Task | Command |
|------|---------|
| List files | `hdfs dfs -ls /path/` |
| Recursively list | `hdfs dfs -ls -R /path/` |
| Create directory | `hdfs dfs -mkdir /path/` |
| Create parent dirs | `hdfs dfs -mkdir -p /a/b/c` |
| Upload file | `hdfs dfs -put localfile.txt /path/` |
| Download file | `hdfs dfs -get /path/file.txt ./` |
| Move/Rename | `hdfs dfs -mv /src /dest` |
| Delete file | `hdfs dfs -rm /path/file.txt` |
| Delete dir | `hdfs dfs -rm -r /path/dir/` |
## File Content
| Task | Command |
|------|---------|
| View file | `hdfs dfs -cat /path/file.txt` |
| Tail file | `hdfs dfs -tail /path/file.txt` |
| Stat info | `hdfs dfs -stat /path/file.txt` |
## Disk Usage & Permissions
| Task | Command |
|------|---------|
| Disk usage | `hdfs dfs -du -h /path/` |
| File count | `hdfs dfs -count -q /path/` |
| Change permissions | `hdfs dfs -chmod 755 /path/` |
| Change owner | `hdfs dfs -chown user:group /path/` |
## Admin
| Task | Command |
|------|---------|
| Cluster report | `hdfs dfsadmin -report` |
| Safe mode check | `hdfs dfsadmin -safemode get` |
| Enter safe mode | `hdfs dfsadmin -safemode enter` |
| Leave safe mode | `hdfs dfsadmin -safemode leave` |
| Balance data | `hdfs balancer` |
Would you like this as a PDF as well? I can generate and send it.
🔄 2. Using HDFS with Spark or Hive
Spark Example:
# Read from HDFS
df = spark.read.text("hdfs:///user/data/file.txt")
# Save to HDFS in Parquet
df.write.parquet("hdfs:///user/data/output/")
# Use Delta if configured
df.write.format("delta").save("hdfs:///delta/events")
Hive Example:
-- Create external table on HDFS
CREATE EXTERNAL TABLE sales (
id INT, product STRING, amount DOUBLE
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs:///user/hive/warehouse/sales_data/';
-- Query Hive table
SELECT * FROM sales;
🛠️ 3. Common Troubleshooting Tips
Error | Cause | Fix |
---|---|---|
No such file or directory | Wrong path or permissions | hdfs dfs -ls /path/ to verify |
Permission denied | Lacking HDFS write/read rights | Use hdfs dfs -chmod or check ownership |
Safe mode exception | Cluster in safe mode | Exit with hdfs dfsadmin -safemode leave |
File already exists | Target file exists | Use -f flag or hdfs dfs -rm first |
Connection refused | Namenode down or config issue | Check Namenode status and config |
Deployment — How do you deploy data pipelines in production?
Deploying data pipelines in production involves much more than just running scripts — it’s about building reliable, scalable, and maintainable workflows that deliver data on time and correctly. Here’s a step-by-step guide to production-grade deployment.
✅ 1. Pipeline Design
Before deployment, ensure your pipeline has:
- Modular components (e.g., ingestion, transformation, load)
- Parameterization (dates, paths, configs)
- Error handling & logging
- Support for retries and idempotency (safe to re-run)
🚀 2. Code Packaging & Versioning
- Use a version control system (e.g., Git)
- Package code into deployable units:
- Python scripts →
.py
modules or.whl
- Spark jobs →
.jar
or.py
viaspark-submit
- Python scripts →
- Manage dependencies via
requirements.txt
,poetry
, orDocker
🛠️ 3. Orchestration with Workflow Tools
Use orchestration tools to manage scheduling, retries, and dependencies:
Tool | Type | Best For |
---|---|---|
Airflow | Code-first DAGs | Flexible batch scheduling |
Dagster | Data-aware DAGs | Strong typing, dev-friendly |
Luigi | Python pipelines | Lightweight workflows |
Prefect | Python-native | Dynamic flows, cloud integration |
dbt Cloud | SQL-based | Analytics/ELT-focused pipelines |
Example with Airflow:
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG("daily_sales_etl", schedule_interval="@daily") as dag:
extract = BashOperator(task_id="extract", bash_command="python extract.py")
transform = BashOperator(task_id="transform", bash_command="python transform.py")
load = BashOperator(task_id="load", bash_command="python load.py")
extract >> transform >> load
📦 4. Environment Configuration
- Separate environments: dev, staging, production
- Use config files, env vars, or Secrets Managers (AWS Secrets Manager, Vault) to manage credentials & paths
- Avoid hardcoding anything
🔄 5. Scheduling & Triggers
- Use cron-like schedules (
@daily
,0 3 * * *
) - Support backfilling (for missed runs)
- Trigger pipelines via:
- APIs
- File arrivals (S3, GCS, HDFS events)
- Event queues (Kafka, Pub/Sub)
🧪 6. Testing & Validation
- Unit test functions (e.g., transformation logic)
- Integration test with sample datasets
- Validate output schema & data quality (e.g., with Great Expectations)
📊 7. Monitoring & Alerts
- Use built-in tools (Airflow UI, Spark UI)
- Send alerts via Slack, email, PagerDuty
- Track:
- Runtime
- Failure counts
- Data volumes
- Schema changes
🧰 8. Infrastructure & Deployment Tools
- Containerize your pipeline with Docker
- Deploy via CI/CD:
- GitHub Actions, GitLab CI, Jenkins
- Use infrastructure-as-code:
- Terraform, Pulumi (to manage clusters, buckets, etc.)
🧵 9. Execution Environments
Depending on your stack:
- Spark: Run on YARN, Kubernetes, EMR, Databricks
- Python: Use Kubernetes, Docker, or serverless (AWS Lambda, Cloud Functions)
- SQL: Trigger dbt, BigQuery jobs, or stored procedures
✅ 10. Post-Deployment Checks
- Sanity-check the first few runs
- Check log output and metrics
- Ensure data landed correctly
- Set up SLA checks
Summary Cheat Sheet
Step | What to Do |
---|---|
Code & Package | Git + Docker/Whl/Jar |
Orchestrate | Airflow, Dagster, dbt, Prefect |
Environment Separation | Dev / Stage / Prod with config mgmt |
Schedule & Trigger | Cron, Events, APIs |
Validate & Test | Great Expectations, pytest |
Monitor & Alert | Logs, metrics, Slack alerts |
Automate Deployments | CI/CD + IaC (Terraform) |
Absolutely — here’s a specific example of deploying a PySpark job with Airflow on AWS EMR, which is a common, production-grade setup.
🚀 Goal:
Deploy and schedule a Spark job on Amazon EMR using Apache Airflow.
🧱 Setup Components:
Component | Description |
---|---|
Airflow | Workflow scheduler (can run on EC2, ECS, or MWAA) |
AWS EMR | Managed Hadoop/Spark cluster |
S3 | Stores Spark scripts and input/output data |
Spark Script | Your PySpark job (process_data.py ) |
📁 Example Directory Structure:
airflow/
│
├── dags/
│ └── emr_spark_job_dag.py # Airflow DAG
├── spark_jobs/
│ └── process_data.py # Spark job
├── requirements.txt
🧾 1. Sample PySpark Job (process_data.py
)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ProcessDataJob").getOrCreate()
df = spark.read.json("s3://my-bucket/input-data/")
df_filtered = df.filter("age > 25")
df_filtered.write.parquet("s3://my-bucket/output-data/", mode="overwrite")
spark.stop()
📜 2. Airflow DAG (emr_spark_job_dag.py
)
from airflow import DAG
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
from datetime import datetime
JOB_FLOW_OVERRIDES = {
"Name": "Airflow_EMR_Cluster",
"ReleaseLabel": "emr-6.12.0",
"Applications": [{"Name": "Spark"}],
"Instances": {
"InstanceGroups": [
{
"Name": "Master nodes",
"InstanceRole": "MASTER",
"InstanceType": "m5.xlarge",
"InstanceCount": 1
},
{
"Name": "Core nodes",
"InstanceRole": "CORE",
"InstanceType": "m5.xlarge",
"InstanceCount": 2
}
],
"KeepJobFlowAliveWhenNoSteps": True,
"TerminationProtected": False
},
"JobFlowRole": "EMR_EC2_DefaultRole",
"ServiceRole": "EMR_DefaultRole",
"LogUri": "s3://my-bucket/emr-logs/"
}
SPARK_STEPS = [
{
"Name": "Run PySpark Job",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode", "cluster",
"s3://my-bucket/spark_jobs/process_data.py"
]
}
}
]
with DAG(
dag_id="emr_spark_job_pipeline",
start_date=datetime(2024, 1, 1),
schedule_interval="@daily",
catchup=False
) as dag:
create_cluster = EmrCreateJobFlowOperator(
task_id="create_emr_cluster",
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id="aws_default"
)
add_step = EmrAddStepsOperator(
task_id="add_emr_step",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
steps=SPARK_STEPS,
aws_conn_id="aws_default"
)
monitor_step = EmrStepSensor(
task_id="watch_step",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_emr_step', key='return_value')[0] }}",
aws_conn_id="aws_default"
)
terminate_cluster = EmrTerminateJobFlowOperator(
task_id="terminate_emr_cluster",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
aws_conn_id="aws_default",
trigger_rule="all_done" # terminate even if step fails
)
create_cluster >> add_step >> monitor_step >> terminate_cluster
🧰 3. Deployment Steps
- Upload Spark job to S3:
aws s3 cp spark_jobs/process_data.py s3://my-bucket/spark_jobs/
- Ensure IAM roles (
EMR_DefaultRole
,EMR_EC2_DefaultRole
) have permissions to access S3 and run Spark. - Deploy DAG to Airflow:
- Place DAG file in
airflow/dags/
- Restart Airflow scheduler/webserver if needed
- Place DAG file in
- Monitor Execution:
- Airflow UI → Logs
- EMR Console → Cluster and Step logs
- S3 → Output directory
✅ Benefits of This Setup
Feature | Benefit |
---|---|
EMR | Scalable, managed Spark cluster |
Airflow | Declarative scheduling, retries, logging |
S3 | Durable storage for jobs and data |
Decoupled | You can scale EMR independently of Airflow |
Perfect — let’s briefly walk through 3 alternative deployment approaches for your Spark job, using:
- ✅ EMR Serverless
- ✅ Databricks Jobs
- ✅ AWS Glue
These are managed Spark execution platforms, each with different trade-offs in terms of cost, control, and complexity.
✅ 1. EMR Serverless + Airflow
🔍 What It Is:
A fully managed, serverless runtime for Spark — you don’t provision clusters. Just submit jobs, and EMR handles scaling.
✅ Ideal For:
- On-demand ETL jobs
- No cluster management
- Usage-based billing
📜 DAG Snippet (Using EmrServerlessStartJobOperator
from Airflow v2.8+):
from airflow.providers.amazon.aws.operators.emr import EmrServerlessStartJobOperator
from airflow import DAG
from datetime import datetime
with DAG("emr_serverless_spark", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
spark_job = EmrServerlessStartJobOperator(
task_id="run_spark_job",
application_id="your-emr-serverless-app-id",
execution_role_arn="arn:aws:iam::123456789012:role/EMRServerlessExecutionRole",
job_driver={
"sparkSubmit": {
"entryPoint": "s3://my-bucket/spark_jobs/process_data.py",
"sparkSubmitParameters": "--conf spark.executor.memory=2G"
}
},
configuration_overrides={
"monitoringConfiguration": {
"s3MonitoringConfiguration": {
"logUri": "s3://my-bucket/emr-serverless-logs/"
}
}
},
aws_conn_id="aws_default"
)
✅ 2. Databricks Jobs + Airflow
🔍 What It Is:
Fully managed Spark platform optimized for big data and ML. Great IDE, collaboration, and performance tuning.
✅ Ideal For:
- Teams needing UI + API
- ML + SQL + Streaming workloads
- Deep Spark integration
📜 DAG Snippet (Using DatabricksSubmitRunOperator
):
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow import DAG
from datetime import datetime
with DAG("databricks_spark_job", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
run_job = DatabricksSubmitRunOperator(
task_id="run_databricks_spark",
databricks_conn_id="databricks_default",
new_cluster={
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2
},
notebook_task={
"notebook_path": "/Shared/process_data"
}
)
🔐 Needs a Databricks token and workspace configured in your Airflow connection.
✅ 3. AWS Glue Jobs + Airflow
🔍 What It Is:
A serverless ETL service from AWS that runs Spark under the hood (with PySpark or Scala support).
✅ Ideal For:
- Catalog-based ETL (tied to AWS Glue Data Catalog)
- Serverless, cost-efficient batch processing
- Lightweight job logic
📜 DAG Snippet (Using AwsGlueJobOperator
):
from airflow.providers.amazon.aws.operators.glue import AwsGlueJobOperator
from airflow import DAG
from datetime import datetime
with DAG("glue_spark_job", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
glue_job = AwsGlueJobOperator(
task_id="run_glue_job",
job_name="my_glue_spark_job",
script_location="s3://my-bucket/glue-scripts/process_data.py",
iam_role_name="GlueServiceRole",
region_name="us-east-1",
num_of_dpus=10,
)
⚖️ Comparison Summary
Platform | Cluster Management | Best For | Cost Control | Complexity |
---|---|---|---|---|
EMR Serverless | ❌ No | Ad hoc Spark/ETL | Pay-per-second | Medium |
Databricks | ✅ Yes (managed) | Enterprise Spark + ML | Subscription + spot | Low |
AWS Glue | ❌ No | Serverless catalog-driven ETL | Pay-per-DPU-hour | Low |
Here’s a real-world CI/CD deployment template using GitHub Actions to deploy Airflow DAGs and Spark jobs (e.g., to S3 for EMR Serverless, AWS Glue, or even Databricks).
✅ CI/CD Deployment Pipeline with GitHub Actions
🎯 Goal:
- Lint, test, and deploy Airflow DAGs
- Upload PySpark scripts to S3
- Optionally trigger a job run (EMR/Glue/Databricks)
📁 Project Structure
my-data-pipeline/
├── dags/
│ └── my_airflow_dag.py
├── spark_jobs/
│ └── process_data.py
├── tests/
│ └── test_transformations.py
├── requirements.txt
├── .github/
│ └── workflows/
│ └── deploy.yml
📜 .github/workflows/deploy.yml
name: Deploy Data Pipeline
on:
push:
branches: [main]
paths:
- 'dags/**'
- 'spark_jobs/**'
- '.github/workflows/**'
env:
AWS_REGION: us-east-1
S3_BUCKET: my-bucket
S3_SCRIPTS_PREFIX: spark_jobs/
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Checkout Code
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: 3.9
- name: Install Dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run Unit Tests
run: |
pytest tests/
- name: Upload PySpark Jobs to S3
uses: jakejarvis/s3-sync-action@v0.5.1
with:
args: --acl private --follow-symlinks
env:
AWS_S3_BUCKET: ${{ env.S3_BUCKET }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AWS_REGION: ${{ env.AWS_REGION }}
SOURCE_DIR: spark_jobs
DEST_DIR: ${{ env.S3_SCRIPTS_PREFIX }}
- name: Deploy DAGs to Airflow (via SSH)
uses: appleboy/scp-action@v0.1.4
with:
host: ${{ secrets.AIRFLOW_HOST }}
username: ${{ secrets.AIRFLOW_USER }}
key: ${{ secrets.AIRFLOW_SSH_KEY }}
source: "dags/"
target: "/opt/airflow/dags/"
# Optional: Trigger EMR Serverless or Glue Job
- name: Trigger EMR Serverless Job (Optional)
run: |
aws emr-serverless start-job-run \
--application-id YOUR-APP-ID \
--execution-role-arn arn:aws:iam::123456789012:role/EMRServerlessExecutionRole \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://${{ env.S3_BUCKET }}/${{ env.S3_SCRIPTS_PREFIX }}process_data.py"
}
}' \
--region ${{ env.AWS_REGION }}
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
🔐 Secrets You’ll Need in GitHub
Set these under GitHub → Repo Settings → Secrets and variables → Actions:
Secret Name | Description |
---|---|
AWS_ACCESS_KEY_ID | IAM access key with S3/EMR/Glue |
AWS_SECRET_ACCESS_KEY | IAM secret key |
AIRFLOW_HOST | Host/IP of your Airflow instance |
AIRFLOW_USER | SSH username |
AIRFLOW_SSH_KEY | SSH private key for Airflow server |
✅ Summary of Workflow Steps
- Checks out code
- Installs Python dependencies
- Runs unit tests
- Syncs Spark scripts to S3
- Deploys DAGs to Airflow via SSH
- (Optional): Triggers EMR Serverless job
⚙️ Need Variants?
- Want to deploy to Databricks? Use
databricks-cli
with a GitHub Action like:databricks jobs run-now --job-id 12345 --profile DEFAULT
- Deploy to AWS Glue? Use:
aws glue start-job-run --job-name my_glue_job