Learnixo

Data Engineering Interview Prep · Lesson 3 of 3

Python Interview: 30 Pipeline & Engineering Questions

How to Use This Guide

These questions target mid-to-senior data engineering roles. The code answers are production-quality — not toy examples. Study the patterns, not just the syntax.


Part 1: Python Fundamentals in Data Engineering Context (10 Questions)

Q1: How do you process a 50GB CSV file without loading it into memory?

Question: You have a 50GB CSV file that must be processed row-by-row and loaded to a database. How do you handle this in Python without an OOM error?

Python
import csv
import psycopg2
from typing import Generator, Iterator


def csv_row_generator(filepath: str, batch_size: int = 10_000) -> Generator[list[dict], None, None]:
    """
    Memory-efficient CSV reader using a generator.
    Yields batches of rows rather than individual rows — reduces DB round trips.
    Memory usage: O(batch_size), not O(file_size).
    """
    with open(filepath, "r", encoding="utf-8", newline="") as f:
        reader = csv.DictReader(f)
        batch  = []
        for row in reader:
            batch.append(row)
            if len(batch) >= batch_size:
                yield batch
                batch = []
        if batch:
            yield batch  # yield the final partial batch


def load_csv_to_db(filepath: str, connection_string: str) -> int:
    """Returns total rows loaded."""
    total = 0
    conn  = psycopg2.connect(connection_string)
    try:
        with conn.cursor() as cur:
            for batch in csv_row_generator(filepath, batch_size=10_000):
                psycopg2.extras.execute_values(
                    cur,
                    "INSERT INTO staging_table (col1, col2, col3) VALUES %s",
                    [(r["col1"], r["col2"], r["col3"]) for r in batch],
                    page_size=1000,
                )
                total += len(batch)
                conn.commit()  # commit per batch  reduces transaction size
    finally:
        conn.close()
    return total

Why this matters: Generators are lazy — they compute one value at a time and don't hold the full dataset in memory. Processing in batches (not row by row) amortises the network cost of DB inserts. execute_values is ~50x faster than individual INSERT statements for PostgreSQL.


Q2: Write a context manager for database connection management

Python
from contextlib import contextmanager
from typing import Generator
import psycopg2
import logging

logger = logging.getLogger(__name__)


@contextmanager
def db_connection(dsn: str, autocommit: bool = False) -> Generator:
    """
    Context manager for PostgreSQL connections.
    - Handles commit/rollback automatically
    - Closes connection on exit (even on exception)
    - Logs connection lifecycle for observability
    """
    conn = None
    try:
        conn = psycopg2.connect(dsn)
        conn.autocommit = autocommit
        logger.debug("DB connection opened: %s", dsn.split("@")[-1])  # redact credentials
        yield conn
        if not autocommit:
            conn.commit()
            logger.debug("Transaction committed")
    except Exception as e:
        if conn and not autocommit:
            conn.rollback()
            logger.warning("Transaction rolled back: %s", e)
        raise
    finally:
        if conn:
            conn.close()
            logger.debug("DB connection closed")


# Usage
with db_connection(DSN) as conn:
    with conn.cursor() as cur:
        cur.execute("INSERT INTO events VALUES (%s, %s)", (event_id, payload))
# commit + close happen automatically

Class-based alternative (when you need __enter__ and __exit__):

Python
class DatabaseConnection:
    def __init__(self, dsn: str):
        self.dsn  = dsn
        self.conn = None

    def __enter__(self):
        self.conn = psycopg2.connect(self.dsn)
        return self.conn

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type:
            self.conn.rollback()
        else:
            self.conn.commit()
        self.conn.close()
        return False  # don't suppress exceptions

Q3: When and how do you use functools.lru_cache in a pipeline?

Python
from functools import lru_cache
import psycopg2

# Perfect use case: slow lookup that's called many times per pipeline run
# but the lookup data doesn't change during a single run

@lru_cache(maxsize=1024)
def get_product_category(product_id: str) -> str:
    """
    Cache product category lookups — called millions of times per pipeline run
    but the catalogue changes rarely.
    Without cache: 1M DB calls. With cache: ~10k unique products → 10k DB calls.
    """
    with db_connection(DSN) as conn:
        with conn.cursor() as cur:
            cur.execute("SELECT category FROM products WHERE product_id = %s", (product_id,))
            row = cur.fetchone()
            return row[0] if row else "unknown"


# Clear the cache between pipeline runs (stale data risk)
def run_pipeline():
    get_product_category.cache_clear()
    for row in csv_row_generator("orders.csv"):
        category = get_product_category(row["product_id"])  # hits cache after first call
        ...


# Cache info for monitoring
info = get_product_category.cache_info()
print(f"Cache hits: {info.hits}, misses: {info.misses}, hit rate: {info.hits/(info.hits+info.misses):.1%}")

When NOT to use lru_cache: When the function has side effects, when arguments aren't hashable (use functools.cache for simpler cases), or when the cache should be shared across processes (use Redis instead).


Q4: Write a type-annotated pipeline function with proper type hints

Python
from typing import Iterator, TypeVar
from collections.abc import Callable
import pandas as pd
from datetime import date

T = TypeVar("T")


def extract_orders(
    start_date: date,
    end_date: date,
    batch_size: int = 50_000,
) -> Iterator[pd.DataFrame]:
    """Extract orders from source in batches. Yields DataFrames."""
    offset = 0
    while True:
        df = pd.read_sql(
            """
            SELECT order_id, customer_id, product_id, revenue, order_date
            FROM orders
            WHERE order_date BETWEEN %(start)s AND %(end)s
            ORDER BY order_id
            LIMIT %(limit)s OFFSET %(offset)s
            """,
            con=get_engine(),
            params={"start": start_date, "end": end_date,
                    "limit": batch_size, "offset": offset},
        )
        if df.empty:
            break
        yield df
        offset += batch_size


def transform_orders(df: pd.DataFrame) -> pd.DataFrame:
    """Apply business transformations. Pure function — no side effects."""
    return (
        df
        .assign(
            revenue       = lambda x: x["revenue"].clip(lower=0),
            order_date    = lambda x: pd.to_datetime(x["order_date"]),
            revenue_tier  = lambda x: pd.cut(
                x["revenue"],
                bins=[0, 50, 200, 1000, float("inf")],
                labels=["micro", "small", "medium", "large"],
            ),
        )
        .dropna(subset=["customer_id", "product_id"])
    )


def load_orders(df: pd.DataFrame, target_table: str, engine) -> int:
    """Load to warehouse. Returns row count loaded."""
    df.to_sql(target_table, con=engine, if_exists="append", index=False, method="multi")
    return len(df)

Q5: Implement a dataclass for pipeline configuration

Python
from dataclasses import dataclass, field
from typing import Optional
import os
import yaml


@dataclass
class PipelineConfig:
    """
    Strongly-typed pipeline configuration.
    Loaded from YAML; secrets injected from environment variables.
    """
    pipeline_name: str
    source_dsn:    str
    target_dsn:    str
    batch_size:    int             = 50_000
    max_retries:   int             = 3
    backoff_base:  float           = 2.0
    tags:          list[str]       = field(default_factory=list)
    dry_run:       bool            = False
    notify_slack:  bool            = True
    slack_channel: Optional[str]   = None

    @classmethod
    def from_yaml(cls, path: str) -> "PipelineConfig":
        with open(path) as f:
            raw = yaml.safe_load(f)
        # Inject secrets from environment
        raw["source_dsn"] = os.environ["SOURCE_DSN"]
        raw["target_dsn"] = os.environ["TARGET_DSN"]
        return cls(**raw)

    def __post_init__(self):
        if self.batch_size <= 0:
            raise ValueError(f"batch_size must be positive, got {self.batch_size}")
        if self.max_retries < 0:
            raise ValueError(f"max_retries must be >= 0, got {self.max_retries}")
        if self.notify_slack and not self.slack_channel:
            raise ValueError("slack_channel required when notify_slack=True")


# config.yaml
# pipeline_name: orders_daily
# batch_size: 50000
# max_retries: 3
# notify_slack: true
# slack_channel: "#data-alerts"

Q6-10: More Python Fundamentals

Python
# Q6: Exception hierarchy for pipeline errors
class PipelineError(Exception):
    """Base exception for all pipeline errors."""
    pass

class ExtractionError(PipelineError):
    """Raised when source data cannot be read."""
    pass

class TransformationError(PipelineError):
    """Raised when transformation logic fails — usually a data quality issue."""
    def __init__(self, column: str, message: str):
        self.column = column
        super().__init__(f"Transformation failed on column '{column}': {message}")

class LoadError(PipelineError):
    """Raised when writing to the target fails."""
    pass


# Q7: Using __slots__ for memory efficiency in high-volume event processing
class Event:
    __slots__ = ("event_id", "user_id", "event_type", "timestamp", "payload")

    def __init__(self, event_id, user_id, event_type, timestamp, payload):
        self.event_id   = event_id
        self.user_id    = user_id
        self.event_type = event_type
        self.timestamp  = timestamp
        self.payload    = payload
# With __slots__, each Event uses ~40% less memory than a dict-based equivalent
# Critical when you're holding 1M+ events in memory during batch processing


# Q8: Protocol for duck-typed pipeline stages
from typing import Protocol

class Extractor(Protocol):
    def extract(self, run_date: date) -> Iterator[pd.DataFrame]: ...

class Transformer(Protocol):
    def transform(self, df: pd.DataFrame) -> pd.DataFrame: ...

class Loader(Protocol):
    def load(self, df: pd.DataFrame) -> int: ...

def run_etl(extractor: Extractor, transformer: Transformer, loader: Loader) -> int:
    """Runs any ETL combination without inheritance."""
    total = 0
    for batch in extractor.extract(date.today()):
        transformed = transformer.transform(batch)
        total += loader.load(transformed)
    return total


# Q9: Using collections.defaultdict for grouping pipeline events
from collections import defaultdict

def group_events_by_user(events: list[dict]) -> dict[str, list[dict]]:
    grouped = defaultdict(list)
    for event in events:
        grouped[event["user_id"]].append(event)
    return dict(grouped)


# Q10: Thread-safe singleton for a shared DB connection pool
import threading
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

class ConnectionPool:
    _instance = None
    _lock      = threading.Lock()

    def __new__(cls, dsn: str):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super().__new__(cls)
                cls._instance._engine = create_engine(
                    dsn,
                    poolclass=QueuePool,
                    pool_size=10,
                    max_overflow=5,
                    pool_pre_ping=True,
                )
        return cls._instance

    @property
    def engine(self):
        return self._engine

Part 2: Pandas and Data Processing (10 Questions)

Q11: Efficient groupby — what's the difference between transform and agg?

Python
import pandas as pd
import numpy as np

df = pd.DataFrame({
    "customer_id": [1, 1, 2, 2, 3],
    "revenue":     [100, 200, 50, 150, 300],
    "region":      ["US", "US", "EU", "EU", "US"],
})

# agg: returns one row per group (reduces cardinality)
customer_totals = df.groupby("customer_id")["revenue"].agg(["sum", "mean", "count"])

# transform: returns a value for every row in the original DataFrame
# Use when you want to add a group-level stat as a new column
df["customer_total"]     = df.groupby("customer_id")["revenue"].transform("sum")
df["region_avg_revenue"] = df.groupby("region")["revenue"].transform("mean")
df["pct_of_customer"]    = df["revenue"] / df["customer_total"]

print(df)

When transform is critical: Feature engineering for ML (add group statistics as features without losing the original row granularity).


Q12: Avoiding apply() bottlenecks

Python
# Bad: apply() with a Python function  bypasses vectorisation
# 100x slower than vectorised equivalents for large DataFrames
df["category"] = df["revenue"].apply(
    lambda x: "high" if x > 1000 else ("medium" if x > 100 else "low")
)

# Good: use pd.cut() for binning
df["category"] = pd.cut(
    df["revenue"],
    bins=[0, 100, 1000, float("inf")],
    labels=["low", "medium", "high"],
)

# Good: use np.where / np.select for conditional logic
df["discount_pct"] = np.select(
    condlist=[
        df["revenue"] > 10_000,
        df["revenue"] > 1_000,
        df["revenue"] > 100,
    ],
    choicelist=[0.20, 0.10, 0.05],
    default=0.0,
)

# Good: use vectorised string operations (not apply + lambda)
df["email_domain"] = df["email"].str.split("@").str[-1]

Rule: If you're calling .apply() on a large DataFrame, ask yourself if there's a vectorised alternative. 90% of the time, there is.


Q13: Memory-efficient chunked reads from a database

Python
def load_large_table_chunked(
    query: str,
    engine,
    chunksize: int = 100_000,
    dtypes: dict | None = None,
) -> pd.DataFrame:
    """
    Read a large table in chunks, process each chunk, return one concatenated DataFrame.
    For very large data, don't concatenate — process and write each chunk separately.
    """
    chunks = []

    for chunk in pd.read_sql(query, con=engine, chunksize=chunksize, dtype=dtypes):
        # Apply lightweight transformations per chunk
        chunk = chunk.assign(
            order_date = pd.to_datetime(chunk["order_date"]),
            revenue    = chunk["revenue"].astype("float32"),  # float32 uses half the memory of float64
        )
        chunk = chunk.dropna(subset=["order_id"])
        chunks.append(chunk)

    return pd.concat(chunks, ignore_index=True)


# Memory tip: use categorical dtype for low-cardinality string columns
def optimise_dtypes(df: pd.DataFrame) -> pd.DataFrame:
    """Reduce DataFrame memory footprint by ~60% for typical pipeline data."""
    for col in df.select_dtypes(include="object").columns:
        cardinality = df[col].nunique()
        if cardinality / len(df) < 0.5:  # < 50% unique  use categorical
            df[col] = df[col].astype("category")
    for col in df.select_dtypes(include="float64").columns:
        df[col] = pd.to_numeric(df[col], downcast="float")
    for col in df.select_dtypes(include="int64").columns:
        df[col] = pd.to_numeric(df[col], downcast="integer")
    return df

Q14: Merge pitfalls and how to avoid them

Python
# Common pitfall 1: unexpected row multiplication (many-to-many join)
orders      = pd.DataFrame({"order_id": [1,2,3], "customer_id": [10,10,20]})
customers   = pd.DataFrame({"customer_id": [10,10,20], "segment": ["A","B","C"]})

# Bad: this creates 4 rows from 3  silent data multiplication
bad_result = orders.merge(customers, on="customer_id")
print(f"Expected 3 rows, got {len(bad_result)}")  # Prints: 4

# Fix: deduplicate the lookup table before joining
customers_deduped = customers.drop_duplicates(subset=["customer_id"], keep="last")
good_result = orders.merge(customers_deduped, on="customer_id")
print(f"Expected 3 rows, got {len(good_result)}")  # Prints: 3


# Common pitfall 2: silent type mismatch on join key
df1 = pd.DataFrame({"id": [1, 2, 3]})            # int64
df2 = pd.DataFrame({"id": ["1", "2", "3"]})       # object
silent_empty = df1.merge(df2, on="id")
print(f"Rows: {len(silent_empty)}")  # Prints: 0  silent failure!

# Fix: always assert dtypes before merging
assert df1["id"].dtype == df2["id"].dtype, \
    f"Join key type mismatch: {df1['id'].dtype} vs {df2['id'].dtype}"

Q15: Handling duplicates in pipeline data

Python
def deduplicate_pipeline_data(
    df: pd.DataFrame,
    key_cols: list[str],
    timestamp_col: str = "updated_at",
    strategy: str = "latest",  # "latest" | "first" | "count_check"
) -> pd.DataFrame:
    """
    Production-grade deduplication with multiple strategies.
    """
    n_before = len(df)

    if strategy == "latest":
        # Keep the most recently updated version of each key
        df = (
            df.sort_values(timestamp_col, ascending=False)
              .drop_duplicates(subset=key_cols, keep="first")
        )
    elif strategy == "first":
        df = df.sort_values(timestamp_col).drop_duplicates(subset=key_cols, keep="first")
    elif strategy == "count_check":
        dup_counts = df.groupby(key_cols).size()
        duplicated_keys = dup_counts[dup_counts > 1]
        if len(duplicated_keys) > 0:
            raise ValueError(
                f"Unexpected duplicates on {key_cols}: "
                f"{len(duplicated_keys)} keys have multiple rows"
            )
        return df

    n_after = len(df)
    dup_rate = (n_before - n_after) / n_before if n_before > 0 else 0
    if dup_rate > 0.05:  # alert if >5% of rows were duplicates
        import logging
        logging.warning("High duplicate rate: %.1f%% (%d rows removed)",
                        dup_rate * 100, n_before - n_after)
    return df

Q16-20: More Pandas Patterns

Python
# Q16: Efficient pivot without pivot_table overhead
# pivot_table is flexible but slow on large DataFrames
# Prefer groupby + unstack for large data

daily_revenue = (
    df.groupby(["product_id", pd.Grouper(key="order_date", freq="M")])["revenue"]
      .sum()
      .unstack(level="order_date")
      .fillna(0)
)


# Q17: Rolling join / asof merge (e.g., apply exchange rates to transactions)
rates = pd.DataFrame({
    "date":     pd.to_datetime(["2025-01-01", "2025-02-01", "2025-03-01"]),
    "eur_usd":  [1.08, 1.09, 1.07],
}).sort_values("date")

transactions = pd.DataFrame({
    "tx_date":  pd.to_datetime(["2025-01-15", "2025-02-10", "2025-03-20"]),
    "amount":   [1000, 2000, 1500],
}).sort_values("tx_date")

# pd.merge_asof: join each transaction to the most recent exchange rate
merged = pd.merge_asof(transactions, rates,
                        left_on="tx_date", right_on="date",
                        direction="backward")


# Q18: Efficient string operations  use str accessor, not apply
df["domain"] = df["email"].str.extract(r"@(.+)$")[0]
df["is_free_email"] = df["domain"].isin(["gmail.com", "yahoo.com", "hotmail.com"])


# Q19: Memory-mapped files for reading large Parquet without loading fully
import pyarrow.parquet as pq

table = pq.read_table(
    "large_file.parquet",
    columns=["order_id", "revenue"],       # column pruning
    filters=[("order_date", ">=", "2025-01-01")],  # row group pruning
)
df = table.to_pandas()


# Q20: Using eval() and query() for performance on large DataFrames
# eval() uses numexpr under the hood  2-5x faster for large arrays
df["profit"] = df.eval("revenue - (unit_cost * quantity)")
high_value   = df.query("revenue > 1000 and region == 'US'")

Part 3: Pipeline Patterns (10 Questions)

Q21: Retry with exponential backoff

Python
import time
import functools
import logging
from typing import TypeVar, Callable

F = TypeVar("F", bound=Callable)
logger = logging.getLogger(__name__)


def retry_with_backoff(
    max_attempts: int = 3,
    backoff_base: float = 2.0,
    exceptions: tuple = (Exception,),
    reraise: bool = True,
):
    """
    Decorator: retry a function with exponential backoff.
    backoff_base=2.0 → waits 2s, 4s, 8s between attempts.
    """
    def decorator(func: F) -> F:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    if attempt == max_attempts:
                        logger.error(
                            "%s failed after %d attempts: %s",
                            func.__name__, max_attempts, e,
                        )
                        if reraise:
                            raise
                        return None
                    wait = backoff_base ** (attempt - 1)
                    logger.warning(
                        "%s attempt %d/%d failed (%s). Retrying in %.1fs.",
                        func.__name__, attempt, max_attempts, e, wait,
                    )
                    time.sleep(wait)
            return None
        return wrapper  # type: ignore
    return decorator


# Usage
@retry_with_backoff(max_attempts=4, backoff_base=2.0, exceptions=(ConnectionError, TimeoutError))
def fetch_from_api(endpoint: str) -> dict:
    import httpx
    response = httpx.get(endpoint, timeout=10)
    response.raise_for_status()
    return response.json()

Q22: Idempotency patterns for pipeline stages

Python
from hashlib import md5
import json


def make_idempotency_key(pipeline: str, run_date: str, table: str) -> str:
    """Generate a deterministic key for a pipeline run."""
    payload = json.dumps({"pipeline": pipeline, "run_date": run_date, "table": table})
    return md5(payload.encode()).hexdigest()


def is_run_already_completed(conn, idempotency_key: str) -> bool:
    """Check if this exact pipeline run already succeeded."""
    with conn.cursor() as cur:
        cur.execute(
            "SELECT 1 FROM pipeline_runs WHERE idempotency_key = %s AND status = 'success'",
            (idempotency_key,),
        )
        return cur.fetchone() is not None


def record_run_start(conn, idempotency_key: str, metadata: dict) -> None:
    with conn.cursor() as cur:
        cur.execute(
            """
            INSERT INTO pipeline_runs (idempotency_key, status, metadata, started_at)
            VALUES (%s, 'running', %s::jsonb, NOW())
            ON CONFLICT (idempotency_key) DO NOTHING
            """,
            (idempotency_key, json.dumps(metadata)),
        )


def record_run_success(conn, idempotency_key: str, rows_loaded: int) -> None:
    with conn.cursor() as cur:
        cur.execute(
            """
            UPDATE pipeline_runs
            SET status = 'success', rows_loaded = %s, completed_at = NOW()
            WHERE idempotency_key = %s
            """,
            (rows_loaded, idempotency_key),
        )


# Idempotent pipeline runner
def run_idempotent_pipeline(pipeline: str, run_date: str, table: str):
    key = make_idempotency_key(pipeline, run_date, table)

    with db_connection(DSN) as conn:
        if is_run_already_completed(conn, key):
            logger.info("Pipeline already completed for key %s — skipping", key)
            return

        record_run_start(conn, key, {"run_date": run_date, "table": table})

    # ... do the actual work ...
    rows = 0

    with db_connection(DSN) as conn:
        record_run_success(conn, key, rows_loaded=rows)

Q23: Config management with Pydantic and environment variables

Python
from pydantic import BaseSettings, SecretStr, validator
from typing import Optional


class PipelineSettings(BaseSettings):
    """
    Pydantic BaseSettings reads from environment variables automatically.
    Secret fields are masked in logs.
    """
    app_env:          str      = "development"
    source_host:      str
    source_port:      int      = 5432
    source_db:        str
    source_password:  SecretStr
    warehouse_url:    SecretStr
    batch_size:       int      = 50_000
    slack_webhook:    Optional[SecretStr] = None
    log_level:        str      = "INFO"

    @validator("batch_size")
    def validate_batch_size(cls, v):
        if not 1000 <= v <= 500_000:
            raise ValueError(f"batch_size must be between 1000 and 500000, got {v}")
        return v

    @validator("log_level")
    def validate_log_level(cls, v):
        if v not in {"DEBUG", "INFO", "WARNING", "ERROR"}:
            raise ValueError(f"Invalid log_level: {v}")
        return v

    @property
    def source_dsn(self) -> str:
        return (
            f"postgresql://{self.source_host}:{self.source_port}/{self.source_db}"
            f"?password={self.source_password.get_secret_value()}"
        )

    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"


# Reads from environment or .env file automatically
settings = PipelineSettings()

Q24: Dependency injection in pipelines

Python
from abc import ABC, abstractmethod
import pandas as pd


class BaseExtractor(ABC):
    @abstractmethod
    def extract(self, run_date: date) -> pd.DataFrame: ...


class BaseLoader(ABC):
    @abstractmethod
    def load(self, df: pd.DataFrame, table: str) -> int: ...


class PostgresExtractor(BaseExtractor):
    def __init__(self, dsn: str):
        self._dsn = dsn

    def extract(self, run_date: date) -> pd.DataFrame:
        return pd.read_sql(
            "SELECT * FROM orders WHERE order_date = %(d)s",
            con=create_engine(self._dsn),
            params={"d": run_date},
        )


class SnowflakeLoader(BaseLoader):
    def __init__(self, conn_params: dict):
        self._conn = conn_params

    def load(self, df: pd.DataFrame, table: str) -> int:
        df.to_sql(table, con=create_engine(**self._conn), if_exists="append", index=False)
        return len(df)


class OrdersPipeline:
    """
    Pipeline with injected dependencies — easy to test with mocks.
    """
    def __init__(self, extractor: BaseExtractor, loader: BaseLoader):
        self._extractor = extractor
        self._loader    = loader

    def run(self, run_date: date) -> int:
        df = self._extractor.extract(run_date)
        df = self._transform(df)
        return self._loader.load(df, "orders_warehouse")

    def _transform(self, df: pd.DataFrame) -> pd.DataFrame:
        return df.assign(processed_at=pd.Timestamp.utcnow())


# Production instantiation
pipeline = OrdersPipeline(
    extractor=PostgresExtractor(dsn=settings.source_dsn),
    loader=SnowflakeLoader(conn_params={"url": settings.warehouse_url.get_secret_value()}),
)

Q25-30: Testing Pipeline Code

Python
# Q25: Unit testing a transformation function
import pytest
import pandas as pd
from pandas.testing import assert_frame_equal

def test_transform_clips_negative_revenue():
    input_df = pd.DataFrame({
        "order_id": [1, 2, 3],
        "revenue":  [100.0, -50.0, 0.0],
    })
    result = transform_orders(input_df)
    assert (result["revenue"] >= 0).all(), "Negative revenue should be clipped to 0"


def test_transform_drops_null_customer():
    input_df = pd.DataFrame({
        "order_id":    [1, 2],
        "customer_id": [10, None],
        "revenue":     [100.0, 200.0],
    })
    result = transform_orders(input_df)
    assert len(result) == 1
    assert result.iloc[0]["order_id"] == 1


# Q26: Integration test with a real (in-memory) database
import duckdb

def test_incremental_load_is_idempotent():
    conn = duckdb.connect()
    conn.execute("""
        CREATE TABLE orders_target (
            order_id INT PRIMARY KEY,
            revenue DOUBLE,
            updated_at TIMESTAMP
        )
    """)

    staging_df = pd.DataFrame({
        "order_id":   [1, 2, 3],
        "revenue":    [100.0, 200.0, 300.0],
        "updated_at": pd.to_datetime(["2026-05-07"]*3),
    })

    conn.register("staging", staging_df)
    conn.execute("""
        INSERT OR REPLACE INTO orders_target
        SELECT * FROM staging
    """)

    first_count = conn.execute("SELECT COUNT(*) FROM orders_target").fetchone()[0]

    # Re-run the same load (idempotency test)
    conn.execute("INSERT OR REPLACE INTO orders_target SELECT * FROM staging")
    second_count = conn.execute("SELECT COUNT(*) FROM orders_target").fetchone()[0]

    assert first_count == second_count == 3, "Idempotent load should not create duplicates"


# Q27: Mocking external dependencies in pipeline tests
from unittest.mock import MagicMock, patch

def test_pipeline_retries_on_connection_error():
    mock_extractor = MagicMock(spec=BaseExtractor)
    mock_extractor.extract.side_effect = [
        ConnectionError("DB unavailable"),
        pd.DataFrame({"order_id": [1], "revenue": [100.0]}),
    ]

    mock_loader = MagicMock(spec=BaseLoader)
    mock_loader.load.return_value = 1

    pipeline = OrdersPipeline(extractor=mock_extractor, loader=mock_loader)

    # With retry decorator, the second attempt should succeed
    with patch("time.sleep"):  # don't actually sleep in tests
        result = pipeline.run(date.today())
    assert result == 1


# Q28: Property-based testing with Hypothesis
from hypothesis import given, strategies as st
from hypothesis.extra.pandas import column, data_frames

@given(data_frames([
    column("order_id",   elements=st.integers(min_value=1)),
    column("revenue",    elements=st.floats(min_value=-10000, max_value=100000)),
    column("customer_id", elements=st.one_of(st.integers(min_value=1), st.none())),
]))
def test_transform_never_raises(df):
    """Transform should handle any valid-typed input without raising."""
    try:
        result = transform_orders(df)
        assert (result["revenue"] >= 0).all()
    except Exception as e:
        pytest.fail(f"transform_orders raised unexpectedly: {e}")


# Q29: Testing row count expectations
def test_deduplication_reduces_rows():
    duped_df = pd.DataFrame({
        "order_id":   [1, 1, 2, 2, 3],
        "revenue":    [100, 100, 200, 250, 300],
        "updated_at": pd.to_datetime(["2026-05-06","2026-05-07","2026-05-06",
                                       "2026-05-07","2026-05-07"]),
    })
    result = deduplicate_pipeline_data(duped_df, ["order_id"], "updated_at")
    assert len(result) == 3
    # Verify we kept the latest version of order_id=2 (revenue=250)
    assert result.set_index("order_id").loc[2, "revenue"] == 250


# Q30: End-to-end pipeline test with fixtures
@pytest.fixture
def sample_orders_df():
    return pd.DataFrame({
        "order_id":    range(1, 101),
        "customer_id": [i % 20 + 1 for i in range(100)],
        "revenue":     [float(i * 10) for i in range(100)],
        "updated_at":  pd.date_range("2026-05-01", periods=100, freq="1h"),
    })

def test_full_pipeline_produces_expected_row_count(sample_orders_df, tmp_path):
    output_path = tmp_path / "output.parquet"

    # Write input
    input_path = tmp_path / "input.parquet"
    sample_orders_df.to_parquet(input_path, index=False)

    # Run pipeline
    df = pd.read_parquet(input_path)
    df = transform_orders(df)
    df = deduplicate_pipeline_data(df, ["order_id"], "updated_at")
    df.to_parquet(output_path, index=False)

    result = pd.read_parquet(output_path)
    assert len(result) == 100
    assert (result["revenue"] >= 0).all()

Interview Tips: What Sets Strong Candidates Apart

On generators: Explain memory implications and when to use yield from for delegating to sub-generators.

On context managers: Know both the @contextmanager decorator form and the class-based __enter__/__exit__ form. Know when the class form is better (when you need state across enter/exit).

On pandas: Show you know the difference between loc, iloc, and [] and when each is appropriate. Know that chained indexing (df["a"]["b"] = x) creates a copy and doesn't modify the original.

On testing: Show that you write tests that verify business logic, not just that code runs. Property-based tests (Hypothesis) impress senior interviewers.

On pipeline patterns: Idempotency is the most important concept — be able to explain it, implement it in SQL and Python, and explain what happens when it's violated.