Here’s a full explanation of Functional Programming concepts in Python — Lambda functions and Decorators — with examples, data engineering use cases, and pro tips to make your pipelines smarter, cleaner, and reusable.
🔹 1. Lambda Functions in Data Engineering
✅ What it is:
A lambda is an anonymous, one-line function — useful for quick transformations or filtering, especially in map()
, filter()
, apply()
, etc.
lambda x: x + 1 # equivalent to def f(x): return x + 1
🧪 Use Case 1: Transforming columns in a DataFrame
import pandas as pd
df = pd.DataFrame({"price": [100, 150, 200]})
df["price_with_tax"] = df["price"].apply(lambda x: x * 1.18)
print(df)
🔄 Great for column-wise transformations, like converting units, adding tax, or cleaning strings.
🧪 Use Case 2: Filtering data using filter()
data = [10, 25, 30, 5, 0, -2]
filtered = list(filter(lambda x: x > 10, data))
print(filtered) # → [25, 30]
✅ Used in quick filtering during ETL when applying conditions on numeric or string columns.
🧪 Use Case 3: Cleaning data in a pipeline step
raw_strings = [" John", "Sara ", " Mike "]
cleaned = list(map(lambda x: x.strip().lower(), raw_strings))
print(cleaned)
🧹 Useful in data cleansing (trimming, case normalization, conversions).
🔹 2. Decorators in Data Engineering
✅ What is a Decorator?
A decorator is a higher-order function that wraps another function, adding behavior without changing the function code itself.
🧱 Syntax Basics
def decorator(func):
def wrapper(*args, **kwargs):
print("Before function")
result = func(*args, **kwargs)
print("After function")
return result
return wrapper
@decorator
def my_function():
print("Hello!")
my_function()
🛠️ Decorator Use Cases in Data Engineering
🧪 1. Retry on API Failure
import time
import random
def retry(n=3, delay=2):
def decorator(fn):
def wrapper(*args, **kwargs):
for attempt in range(1, n + 1):
try:
return fn(*args, **kwargs)
except Exception as e:
print(f"[Retry {attempt}] Failed: {e}")
time.sleep(delay)
raise Exception("Max retries exceeded")
return wrapper
return decorator
@retry(n=2)
def fetch_data():
if random.random() < 0.7:
raise Exception("API down")
return "Data fetched"
print(fetch_data())
🧩 Use case: When fetching data from unreliable APIs (public datasets, RESTful microservices).
🧪 2. Log Every ETL Step
from datetime import datetime
def log_step(func):
def wrapper(*args, **kwargs):
print(f"[{datetime.now()}] Starting {func.__name__}")
result = func(*args, **kwargs)
print(f"[{datetime.now()}] Finished {func.__name__}")
return result
return wrapper
@log_step
def load_to_db():
print("Loading data to warehouse...")
load_to_db()
📦 Good for logging step-by-step ETL stages — ingestion, transformation, loading.
🧪 3. Time Each Step (Performance Profiling)
import time
def timeit(fn):
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = fn(*args, **kwargs)
end = time.perf_counter()
print(f"{fn.__name__} took {end - start:.2f}s")
return result
return wrapper
@timeit
def transform_big_data():
time.sleep(2) # simulate heavy transformation
return "Transformed"
transform_big_data()
⏱️ Helps profile performance of slow stages (e.g., large joins, aggregations, writes to disk).
✅ Summary Table
Feature | What it Does | Data Engineering Use Case |
---|---|---|
lambda | One-liner anonymous function | Quick transforms, apply on columns, filter, clean strings |
@retry | Auto-retry flaky functions | Retry API/db/network failures |
@log_step | Logs the beginning and end of a step | Track each ETL component’s execution |
@timeit | Measures execution time | Find bottlenecks in long-running data pipeline steps |
Custom decorator | Add audit logging, validation, schema checks | Wrap extract/load functions |
🧪 Bonus: Stack Multiple Decorators on an ETL Step
@retry(n=2)
@log_step
@timeit
def extract():
if random.random() < 0.5:
raise Exception("Connection error")
time.sleep(1)
return {"records": 1000}
extract()
✅ Decorators are stackable — this function is timed, logged, and auto‑retried.
🧠 Pro Tips:
- Make your decorators reusable across all steps (fetch, transform, load).
- Decorators + logging + retry form the “backbone of resilient pipelines”.
- Use
functools.wraps()
to preserve metadata of the original function. - Combine decorators with exception classes (like
IngestError
,TransformError
) to improve alerting and debugging.
“Apply‑style” helpers in Pandas (and how they translate to PySpark)
Function | Operates On | Expects | Returns | Best For | Gotchas |
---|---|---|---|---|---|
Series.map() | One Series | Python dict / Series / function | Series (same size) | Simple value substitution or 1‑to‑1 transform | Ignores index alignment with functions |
Series.apply() | One Series | Python function | Scalar or Series | Row‑wise UDFs that can’t be vectorised | Runs Python once per element → slow for large columns |
DataFrame.apply() | Whole DataFrame (axis 0/1) | Function that gets Series/row | Series, scalar, or DataFrame | Column‑ or row‑level ops that need more context | Default axis=0 (per‑column) surprises many |
DataFrame.applymap() | Each cell | Function of a single value | DataFrame | Quick cell‑wise cleaning (e.g., .strip() ) | Slowest of all; avoid for big frames |
Series/DF.transform() | Same shape in / out | Function or list | Same shape | Broadcasting column‑wise maths (e.g., z‑score) | Must return same length/shape |
Series/DF.agg() | Reduces along axis | Function or list | Scalar(s) | Aggregations (sum, mean, custom) | Shape shrinks; not for transforms |
1. Quick cheatsheet
# Series.map
cities = pd.Series(['delhi', 'mumbai', 'delhi'])
lookup = {'delhi': 'DL', 'mumbai': 'MH'}
cities_code = cities.map(lookup) # → Series ['DL', 'MH', 'DL']
# Series.apply
df['log_price'] = df['price'].apply(lambda x: math.log1p(x))
# DataFrame.apply (per‑row)
df['full_name'] = df.apply(
lambda r: f"{r['first']} {r['last']}", axis=1
)
# DataFrame.applymap
clean = df.applymap(lambda x: x.strip() if isinstance(x, str) else x)
# transform keeps row count
df[['norm']] = df[['value']].transform(
lambda s: (s - s.mean())/s.std()
)
# agg shrinks
totals = df.agg({'sales':'sum', 'qty':'mean'})
2. Performance rule of thumb
vectorised NumPy >> pandas built‑ins >> .map/.apply with Python
On tens‑of‑millions of rows, pure‑Python lambdas can be 10‑100× slower.
Whenever possible:
- Use column arithmetic (
df['a'] * 2
) or string methods (.str.lower()
). - Fall back to
transform
oragg
with NumPy/pandas functions. - Reserve
apply
/applymap
for last‑mile oddities.
3. Why data engineers still reach for apply
Scenario | Example |
---|---|
Complex regex parse of semi‑structured fields | df['ts'] = df['raw'].apply(parse_logline) |
Per‑row REST call for enrichment (small dimension) | df['population'] = df['city'].apply(api_lookup) |
Conditional logic too ugly in np.where or SQL | Multi‑column tax bracket calc |
4. PySpark counterparts
Purpose | Pandas | PySpark idiom |
---|---|---|
Column vectorised math | df['a']*2 | df.withColumn('a2', F.col('a')*2) |
Row‑wise custom logic | df.apply(row_fn, axis=1) | df.mapPartitions(lambda it: ...) or Pandas UDF (🐼 → Spark) |
Cell‑wise cleanup | applymap | F.expr with regexp_replace , F.when , etc. |
Aggregation | df.agg({'v':'mean'}) | df.agg(F.mean('v')) |
Standardising entire columns | transform | F.transform() for arrays or just column ops |
For heavy Python UDF logic in Spark, use vectorised Pandas UDFs (Arrow) instead of per‑row UDFs:
@F.pandas_udf("double")
def log1p_udf(s: pd.Series) -> pd.Series:
return np.log1p(s)
spark_df = spark_df.withColumn("log_val", log1p_udf("value"))
This processes a batch of rows at once, similar to vectorisation in Pandas, and bypasses a lot of UDF overhead.
TL;DR
map
: dictionary‑style replace for a single column.apply
: Swiss‑army knife; flexible but slower — use sparingly.applymap
: cell‑wise; usually avoid for big data.transform
: same shape in/out, great for scaling/normalising.agg
: reduces data (sum, mean, etc.).
import time
import random
def retry(n=3, delay=2):
def decorator(fn):
def wrapper(*args, **kwargs):
for attempt in range(1, n + 1):
try:
return fn(*args, **kwargs)
except Exception as e:
print(f"[Retry {attempt}] Failed: {e}")
time.sleep(delay)
raise Exception("Max retries exceeded")
return wrapper
return decorator
@retry(n=2)
def fetch_data():
if random.random() < 0.7:
raise Exception("API down")
return "Data fetched"
print(fetch_data())
what exactly has happened here? how it is being called
Here’s the story of what happens, step by step, from the moment Python reaches the @retry
line to the moment "Data fetched"
(or an error) is printed.
1 ️⃣ Decorating time – happens once, when the module is imported
@retry(n=2) # ← decorator *executes right now*
def fetch_data(): # ← original function object is created,
...
Stage | What Python does |
---|---|
a. Evaluate retry(n=2) | • Calls the outer function retry with n=2 (default delay=2 ).• retry returns the inner function decorator . |
b. Call decorator(fetch_data) | • decorator receives the original fetch_data function object (fn ).• Inside decorator , the wrapper function is created. |
c. Replace fetch_data | • decorator returns wrapper .• The name fetch_data in the module now points to wrapper , not to the original body. |
After decoration you effectively have:
def wrapper(*args, **kwargs):
... # retry loop lives here
fetch_data = wrapper # original body is captured inside `wrapper`
2 ️⃣ Call time – each time you run fetch_data()
print(fetch_data())
Because fetch_data
is now wrapper
, the call sequence is:
- Enter
wrapper
attempt = 1
.
- Call the original body (
fn
) insidetry
:if random.random() < 0.7: raise Exception("API down") return "Data fetched"
- If it raises,
except
prints[Retry 1] Failed: API down
then sleepsdelay
= 2 seconds, and the loop continues withattempt = 2
. - Second try (and last, because
n=2
):- If it succeeds →
"Data fetched"
is returned all the way out ofwrapper
(and printed). - If it fails again → loop ends,
wrapper
raisesException("Max retries exceeded")
, which propagates to theprint()
call and crashes the script.
- If it succeeds →
3 ️⃣ Why the random success / failure?
random.random()
returns a float in[0,1)
.- 70 % of the time it is
< 0.7
, triggering theraise Exception("API down")
. - 30 % of the time it’s ≥ 0.7, so the function returns
"Data fetched"
immediately.
With two attempts (n=2
) the combined probability of ultimate success is:
Success on 1st try : 30%
Fail then success : 0.70 * 0.30 = 21%
---------------------------------------
Total success : 51%
Total failure : 49%
So roughly half the time you’ll see "Data fetched"
, the other half you’ll get “Max retries exceeded.”
4 ️⃣ Visual call flow
main
└─ wrapper (fetch_data alias)
├─ try #1 → original body → raises? ↴
│ yes → print + sleep
├─ try #2 → original body
│ ├ success → return "Data fetched"
│ └ failure → raise "Max retries exceeded"
└─ return / raise to caller
Key take‑aways
- Decorators run at definition time, replacing the function with a wrapper.
- The wrapper holds the retry loop, so every future call is automatically retried.
- The original function body is still executed, just inside the wrapper’s
try
. - You can tweak behavior by changing
n
,delay
, or printing/logging inside the decorator.
Leave a Reply