Here’s a full explanation of Functional Programming concepts in Python — Lambda functions and Decoratorswith examples, data engineering use cases, and pro tips to make your pipelines smarter, cleaner, and reusable.


🔹 1. Lambda Functions in Data Engineering

✅ What it is:

A lambda is an anonymous, one-line function — useful for quick transformations or filtering, especially in map(), filter(), apply(), etc.

lambda x: x + 1  # equivalent to def f(x): return x + 1

🧪 Use Case 1: Transforming columns in a DataFrame

import pandas as pd

df = pd.DataFrame({"price": [100, 150, 200]})
df["price_with_tax"] = df["price"].apply(lambda x: x * 1.18)
print(df)

🔄 Great for column-wise transformations, like converting units, adding tax, or cleaning strings.


🧪 Use Case 2: Filtering data using filter()

data = [10, 25, 30, 5, 0, -2]
filtered = list(filter(lambda x: x > 10, data))
print(filtered)  # → [25, 30]

✅ Used in quick filtering during ETL when applying conditions on numeric or string columns.


🧪 Use Case 3: Cleaning data in a pipeline step

raw_strings = ["  John", "Sara  ", "  Mike "]
cleaned = list(map(lambda x: x.strip().lower(), raw_strings))
print(cleaned)

🧹 Useful in data cleansing (trimming, case normalization, conversions).


🔹 2. Decorators in Data Engineering

✅ What is a Decorator?

A decorator is a higher-order function that wraps another function, adding behavior without changing the function code itself.


🧱 Syntax Basics

def decorator(func):
    def wrapper(*args, **kwargs):
        print("Before function")
        result = func(*args, **kwargs)
        print("After function")
        return result
    return wrapper

@decorator
def my_function():
    print("Hello!")

my_function()

🛠️ Decorator Use Cases in Data Engineering


🧪 1. Retry on API Failure

import time
import random

def retry(n=3, delay=2):
    def decorator(fn):
        def wrapper(*args, **kwargs):
            for attempt in range(1, n + 1):
                try:
                    return fn(*args, **kwargs)
                except Exception as e:
                    print(f"[Retry {attempt}] Failed: {e}")
                    time.sleep(delay)
            raise Exception("Max retries exceeded")
        return wrapper
    return decorator

@retry(n=2)
def fetch_data():
    if random.random() < 0.7:
        raise Exception("API down")
    return "Data fetched"

print(fetch_data())

🧩 Use case: When fetching data from unreliable APIs (public datasets, RESTful microservices).


🧪 2. Log Every ETL Step

from datetime import datetime

def log_step(func):
    def wrapper(*args, **kwargs):
        print(f"[{datetime.now()}] Starting {func.__name__}")
        result = func(*args, **kwargs)
        print(f"[{datetime.now()}] Finished {func.__name__}")
        return result
    return wrapper

@log_step
def load_to_db():
    print("Loading data to warehouse...")

load_to_db()

📦 Good for logging step-by-step ETL stages — ingestion, transformation, loading.


🧪 3. Time Each Step (Performance Profiling)

import time

def timeit(fn):
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = fn(*args, **kwargs)
        end = time.perf_counter()
        print(f"{fn.__name__} took {end - start:.2f}s")
        return result
    return wrapper

@timeit
def transform_big_data():
    time.sleep(2)  # simulate heavy transformation
    return "Transformed"

transform_big_data()

⏱️ Helps profile performance of slow stages (e.g., large joins, aggregations, writes to disk).


✅ Summary Table

FeatureWhat it DoesData Engineering Use Case
lambdaOne-liner anonymous functionQuick transforms, apply on columns, filter, clean strings
@retryAuto-retry flaky functionsRetry API/db/network failures
@log_stepLogs the beginning and end of a stepTrack each ETL component’s execution
@timeitMeasures execution timeFind bottlenecks in long-running data pipeline steps
Custom decoratorAdd audit logging, validation, schema checksWrap extract/load functions

🧪 Bonus: Stack Multiple Decorators on an ETL Step

@retry(n=2)
@log_step
@timeit
def extract():
    if random.random() < 0.5:
        raise Exception("Connection error")
    time.sleep(1)
    return {"records": 1000}

extract()

✅ Decorators are stackable — this function is timed, logged, and auto‑retried.


🧠 Pro Tips:

  • Make your decorators reusable across all steps (fetch, transform, load).
  • Decorators + logging + retry form the “backbone of resilient pipelines”.
  • Use functools.wraps() to preserve metadata of the original function.
  • Combine decorators with exception classes (like IngestError, TransformError) to improve alerting and debugging.

“Apply‑style” helpers in Pandas (and how they translate to PySpark)

FunctionOperates OnExpectsReturnsBest ForGotchas
Series.map()One SeriesPython dict / Series / functionSeries (same size)Simple value substitution or 1‑to‑1 transformIgnores index alignment with functions
Series.apply()One SeriesPython functionScalar or SeriesRow‑wise UDFs that can’t be vectorisedRuns Python once per element → slow for large columns
DataFrame.apply()Whole DataFrame (axis 0/1)Function that gets Series/rowSeries, scalar, or DataFrameColumn‑ or row‑level ops that need more contextDefault axis=0 (per‑column) surprises many
DataFrame.applymap()Each cellFunction of a single valueDataFrameQuick cell‑wise cleaning (e.g., .strip())Slowest of all; avoid for big frames
Series/DF.transform()Same shape in / outFunction or listSame shapeBroadcasting column‑wise maths (e.g., z‑score)Must return same length/shape
Series/DF.agg()Reduces along axisFunction or listScalar(s)Aggregations (sum, mean, custom)Shape shrinks; not for transforms

1. Quick cheatsheet

# Series.map
cities = pd.Series(['delhi', 'mumbai', 'delhi'])
lookup = {'delhi': 'DL', 'mumbai': 'MH'}
cities_code = cities.map(lookup)          # → Series ['DL', 'MH', 'DL']

# Series.apply
df['log_price'] = df['price'].apply(lambda x: math.log1p(x))

# DataFrame.apply (per‑row)
df['full_name'] = df.apply(
    lambda r: f"{r['first']} {r['last']}", axis=1
)

# DataFrame.applymap
clean = df.applymap(lambda x: x.strip() if isinstance(x, str) else x)

# transform keeps row count
df[['norm']] = df[['value']].transform(
    lambda s: (s - s.mean())/s.std()
)

# agg shrinks
totals = df.agg({'sales':'sum', 'qty':'mean'})

2. Performance rule of thumb

vectorised NumPy  >>  pandas built‑ins  >>  .map/.apply with Python

On tens‑of‑millions of rows, pure‑Python lambdas can be 10‑100× slower.
Whenever possible:

  1. Use column arithmetic (df['a'] * 2) or string methods (.str.lower()).
  2. Fall back to transform or agg with NumPy/pandas functions.
  3. Reserve apply / applymap for last‑mile oddities.

3. Why data engineers still reach for apply

ScenarioExample
Complex regex parse of semi‑structured fieldsdf['ts'] = df['raw'].apply(parse_logline)
Per‑row REST call for enrichment (small dimension)df['population'] = df['city'].apply(api_lookup)
Conditional logic too ugly in np.where or SQLMulti‑column tax bracket calc

4. PySpark counterparts

PurposePandasPySpark idiom
Column vectorised mathdf['a']*2df.withColumn('a2', F.col('a')*2)
Row‑wise custom logicdf.apply(row_fn, axis=1)df.mapPartitions(lambda it: ...) or Pandas UDF (🐼 → Spark)
Cell‑wise cleanupapplymapF.expr with regexp_replace, F.when, etc.
Aggregationdf.agg({'v':'mean'})df.agg(F.mean('v'))
Standardising entire columnstransformF.transform() for arrays or just column ops

For heavy Python UDF logic in Spark, use vectorised Pandas UDFs (Arrow) instead of per‑row UDFs:

@F.pandas_udf("double")
def log1p_udf(s: pd.Series) -> pd.Series:
    return np.log1p(s)

spark_df = spark_df.withColumn("log_val", log1p_udf("value"))

This processes a batch of rows at once, similar to vectorisation in Pandas, and bypasses a lot of UDF overhead.


TL;DR

  • map: dictionary‑style replace for a single column.
  • apply: Swiss‑army knife; flexible but slower — use sparingly.
  • applymap: cell‑wise; usually avoid for big data.
  • transform: same shape in/out, great for scaling/normalising.
  • agg: reduces data (sum, mean, etc.).
import time
import random

def retry(n=3, delay=2):
    def decorator(fn):
        def wrapper(*args, **kwargs):
            for attempt in range(1, n + 1):
                try:
                    return fn(*args, **kwargs)
                except Exception as e:
                    print(f"[Retry {attempt}] Failed: {e}")
                    time.sleep(delay)
            raise Exception("Max retries exceeded")
        return wrapper
    return decorator

@retry(n=2)
def fetch_data():
    if random.random() < 0.7:
        raise Exception("API down")
    return "Data fetched"

print(fetch_data())
what exactly has happened here? how it is being called

Here’s the story of what happens, step by step, from the moment Python reaches the @retry line to the moment "Data fetched" (or an error) is printed.


1 ️⃣ Decorating time – happens once, when the module is imported

@retry(n=2)          #   ← decorator *executes right now*
def fetch_data():    #   ← original function object is created,
    ...
StageWhat Python does
a. Evaluate retry(n=2)• Calls the outer function retry with n=2 (default delay=2).• retry returns the inner function decorator.
b. Call decorator(fetch_data)decorator receives the original fetch_data function object (fn).• Inside decorator, the wrapper function is created.
c. Replace fetch_datadecorator returns wrapper.• The name fetch_data in the module now points to wrapper, not to the original body.

After decoration you effectively have:

def wrapper(*args, **kwargs):
    ...  # retry loop lives here

fetch_data = wrapper    # original body is captured inside `wrapper`

2 ️⃣ Call time – each time you run fetch_data()

print(fetch_data())

Because fetch_data is now wrapper, the call sequence is:

  1. Enter wrapper
    • attempt = 1.
  2. Call the original body (fn) inside try: if random.random() < 0.7: raise Exception("API down") return "Data fetched"
  3. If it raises, except prints
    [Retry 1] Failed: API down
    then sleeps delay = 2 seconds, and the loop continues with attempt = 2.
  4. Second try (and last, because n=2):
    • If it succeeds → "Data fetched" is returned all the way out of wrapper (and printed).
    • If it fails again → loop ends, wrapper raises Exception("Max retries exceeded"), which propagates to the print() call and crashes the script.

3 ️⃣ Why the random success / failure?

  • random.random() returns a float in [0,1).
  • 70 % of the time it is < 0.7, triggering the raise Exception("API down").
  • 30 % of the time it’s ≥ 0.7, so the function returns "Data fetched" immediately.

With two attempts (n=2) the combined probability of ultimate success is:

Success on 1st try  : 30%
Fail then success   : 0.70 * 0.30 = 21%
---------------------------------------
Total success       : 51%
Total failure       : 49%

So roughly half the time you’ll see "Data fetched", the other half you’ll get “Max retries exceeded.”


4 ️⃣ Visual call flow

main
 └─ wrapper   (fetch_data alias)
     ├─ try #1 → original body → raises? ↴
     │         yes → print + sleep
     ├─ try #2 → original body
     │         ├ success → return "Data fetched"
     │         └ failure → raise "Max retries exceeded"
     └─ return / raise to caller

Key take‑aways

  1. Decorators run at definition time, replacing the function with a wrapper.
  2. The wrapper holds the retry loop, so every future call is automatically retried.
  3. The original function body is still executed, just inside the wrapper’s try.
  4. You can tweak behavior by changing n, delay, or printing/logging inside the decorator.

Pages: 1 2


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Posted in

Leave a Reply

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading