Back to blog
Data Engineeringadvanced

Data Engineering Python & Pipeline Interview Questions (30 Questions)

30 Python and pipeline interview questions for data engineering roles — generators, context managers, pandas patterns, retry logic, idempotency, config management, and testing pipelines.

LearnixoMay 7, 202617 min read
PythonInterview PrepData EngineeringPandasPipeline PatternsTesting
Share:𝕏

How to Use This Guide

These questions target mid-to-senior data engineering roles. The code answers are production-quality — not toy examples. Study the patterns, not just the syntax.


Part 1: Python Fundamentals in Data Engineering Context (10 Questions)

Q1: How do you process a 50GB CSV file without loading it into memory?

Question: You have a 50GB CSV file that must be processed row-by-row and loaded to a database. How do you handle this in Python without an OOM error?

Python
import csv
import psycopg2
from typing import Generator, Iterator


def csv_row_generator(filepath: str, batch_size: int = 10_000) -> Generator[list[dict], None, None]:
    """
    Memory-efficient CSV reader using a generator.
    Yields batches of rows rather than individual rows — reduces DB round trips.
    Memory usage: O(batch_size), not O(file_size).
    """
    with open(filepath, "r", encoding="utf-8", newline="") as f:
        reader = csv.DictReader(f)
        batch  = []
        for row in reader:
            batch.append(row)
            if len(batch) >= batch_size:
                yield batch
                batch = []
        if batch:
            yield batch  # yield the final partial batch


def load_csv_to_db(filepath: str, connection_string: str) -> int:
    """Returns total rows loaded."""
    total = 0
    conn  = psycopg2.connect(connection_string)
    try:
        with conn.cursor() as cur:
            for batch in csv_row_generator(filepath, batch_size=10_000):
                psycopg2.extras.execute_values(
                    cur,
                    "INSERT INTO staging_table (col1, col2, col3) VALUES %s",
                    [(r["col1"], r["col2"], r["col3"]) for r in batch],
                    page_size=1000,
                )
                total += len(batch)
                conn.commit()  # commit per batch  reduces transaction size
    finally:
        conn.close()
    return total

Why this matters: Generators are lazy — they compute one value at a time and don't hold the full dataset in memory. Processing in batches (not row by row) amortises the network cost of DB inserts. execute_values is ~50x faster than individual INSERT statements for PostgreSQL.


Q2: Write a context manager for database connection management

Python
from contextlib import contextmanager
from typing import Generator
import psycopg2
import logging

logger = logging.getLogger(__name__)


@contextmanager
def db_connection(dsn: str, autocommit: bool = False) -> Generator:
    """
    Context manager for PostgreSQL connections.
    - Handles commit/rollback automatically
    - Closes connection on exit (even on exception)
    - Logs connection lifecycle for observability
    """
    conn = None
    try:
        conn = psycopg2.connect(dsn)
        conn.autocommit = autocommit
        logger.debug("DB connection opened: %s", dsn.split("@")[-1])  # redact credentials
        yield conn
        if not autocommit:
            conn.commit()
            logger.debug("Transaction committed")
    except Exception as e:
        if conn and not autocommit:
            conn.rollback()
            logger.warning("Transaction rolled back: %s", e)
        raise
    finally:
        if conn:
            conn.close()
            logger.debug("DB connection closed")


# Usage
with db_connection(DSN) as conn:
    with conn.cursor() as cur:
        cur.execute("INSERT INTO events VALUES (%s, %s)", (event_id, payload))
# commit + close happen automatically

Class-based alternative (when you need __enter__ and __exit__):

Python
class DatabaseConnection:
    def __init__(self, dsn: str):
        self.dsn  = dsn
        self.conn = None

    def __enter__(self):
        self.conn = psycopg2.connect(self.dsn)
        return self.conn

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type:
            self.conn.rollback()
        else:
            self.conn.commit()
        self.conn.close()
        return False  # don't suppress exceptions

Q3: When and how do you use functools.lru_cache in a pipeline?

Python
from functools import lru_cache
import psycopg2

# Perfect use case: slow lookup that's called many times per pipeline run
# but the lookup data doesn't change during a single run

@lru_cache(maxsize=1024)
def get_product_category(product_id: str) -> str:
    """
    Cache product category lookups — called millions of times per pipeline run
    but the catalogue changes rarely.
    Without cache: 1M DB calls. With cache: ~10k unique products → 10k DB calls.
    """
    with db_connection(DSN) as conn:
        with conn.cursor() as cur:
            cur.execute("SELECT category FROM products WHERE product_id = %s", (product_id,))
            row = cur.fetchone()
            return row[0] if row else "unknown"


# Clear the cache between pipeline runs (stale data risk)
def run_pipeline():
    get_product_category.cache_clear()
    for row in csv_row_generator("orders.csv"):
        category = get_product_category(row["product_id"])  # hits cache after first call
        ...


# Cache info for monitoring
info = get_product_category.cache_info()
print(f"Cache hits: {info.hits}, misses: {info.misses}, hit rate: {info.hits/(info.hits+info.misses):.1%}")

When NOT to use lru_cache: When the function has side effects, when arguments aren't hashable (use functools.cache for simpler cases), or when the cache should be shared across processes (use Redis instead).


Q4: Write a type-annotated pipeline function with proper type hints

Python
from typing import Iterator, TypeVar
from collections.abc import Callable
import pandas as pd
from datetime import date

T = TypeVar("T")


def extract_orders(
    start_date: date,
    end_date: date,
    batch_size: int = 50_000,
) -> Iterator[pd.DataFrame]:
    """Extract orders from source in batches. Yields DataFrames."""
    offset = 0
    while True:
        df = pd.read_sql(
            """
            SELECT order_id, customer_id, product_id, revenue, order_date
            FROM orders
            WHERE order_date BETWEEN %(start)s AND %(end)s
            ORDER BY order_id
            LIMIT %(limit)s OFFSET %(offset)s
            """,
            con=get_engine(),
            params={"start": start_date, "end": end_date,
                    "limit": batch_size, "offset": offset},
        )
        if df.empty:
            break
        yield df
        offset += batch_size


def transform_orders(df: pd.DataFrame) -> pd.DataFrame:
    """Apply business transformations. Pure function — no side effects."""
    return (
        df
        .assign(
            revenue       = lambda x: x["revenue"].clip(lower=0),
            order_date    = lambda x: pd.to_datetime(x["order_date"]),
            revenue_tier  = lambda x: pd.cut(
                x["revenue"],
                bins=[0, 50, 200, 1000, float("inf")],
                labels=["micro", "small", "medium", "large"],
            ),
        )
        .dropna(subset=["customer_id", "product_id"])
    )


def load_orders(df: pd.DataFrame, target_table: str, engine) -> int:
    """Load to warehouse. Returns row count loaded."""
    df.to_sql(target_table, con=engine, if_exists="append", index=False, method="multi")
    return len(df)

Q5: Implement a dataclass for pipeline configuration

Python
from dataclasses import dataclass, field
from typing import Optional
import os
import yaml


@dataclass
class PipelineConfig:
    """
    Strongly-typed pipeline configuration.
    Loaded from YAML; secrets injected from environment variables.
    """
    pipeline_name: str
    source_dsn:    str
    target_dsn:    str
    batch_size:    int             = 50_000
    max_retries:   int             = 3
    backoff_base:  float           = 2.0
    tags:          list[str]       = field(default_factory=list)
    dry_run:       bool            = False
    notify_slack:  bool            = True
    slack_channel: Optional[str]   = None

    @classmethod
    def from_yaml(cls, path: str) -> "PipelineConfig":
        with open(path) as f:
            raw = yaml.safe_load(f)
        # Inject secrets from environment
        raw["source_dsn"] = os.environ["SOURCE_DSN"]
        raw["target_dsn"] = os.environ["TARGET_DSN"]
        return cls(**raw)

    def __post_init__(self):
        if self.batch_size <= 0:
            raise ValueError(f"batch_size must be positive, got {self.batch_size}")
        if self.max_retries < 0:
            raise ValueError(f"max_retries must be >= 0, got {self.max_retries}")
        if self.notify_slack and not self.slack_channel:
            raise ValueError("slack_channel required when notify_slack=True")


# config.yaml
# pipeline_name: orders_daily
# batch_size: 50000
# max_retries: 3
# notify_slack: true
# slack_channel: "#data-alerts"

Q6-10: More Python Fundamentals

Python
# Q6: Exception hierarchy for pipeline errors
class PipelineError(Exception):
    """Base exception for all pipeline errors."""
    pass

class ExtractionError(PipelineError):
    """Raised when source data cannot be read."""
    pass

class TransformationError(PipelineError):
    """Raised when transformation logic fails — usually a data quality issue."""
    def __init__(self, column: str, message: str):
        self.column = column
        super().__init__(f"Transformation failed on column '{column}': {message}")

class LoadError(PipelineError):
    """Raised when writing to the target fails."""
    pass


# Q7: Using __slots__ for memory efficiency in high-volume event processing
class Event:
    __slots__ = ("event_id", "user_id", "event_type", "timestamp", "payload")

    def __init__(self, event_id, user_id, event_type, timestamp, payload):
        self.event_id   = event_id
        self.user_id    = user_id
        self.event_type = event_type
        self.timestamp  = timestamp
        self.payload    = payload
# With __slots__, each Event uses ~40% less memory than a dict-based equivalent
# Critical when you're holding 1M+ events in memory during batch processing


# Q8: Protocol for duck-typed pipeline stages
from typing import Protocol

class Extractor(Protocol):
    def extract(self, run_date: date) -> Iterator[pd.DataFrame]: ...

class Transformer(Protocol):
    def transform(self, df: pd.DataFrame) -> pd.DataFrame: ...

class Loader(Protocol):
    def load(self, df: pd.DataFrame) -> int: ...

def run_etl(extractor: Extractor, transformer: Transformer, loader: Loader) -> int:
    """Runs any ETL combination without inheritance."""
    total = 0
    for batch in extractor.extract(date.today()):
        transformed = transformer.transform(batch)
        total += loader.load(transformed)
    return total


# Q9: Using collections.defaultdict for grouping pipeline events
from collections import defaultdict

def group_events_by_user(events: list[dict]) -> dict[str, list[dict]]:
    grouped = defaultdict(list)
    for event in events:
        grouped[event["user_id"]].append(event)
    return dict(grouped)


# Q10: Thread-safe singleton for a shared DB connection pool
import threading
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

class ConnectionPool:
    _instance = None
    _lock      = threading.Lock()

    def __new__(cls, dsn: str):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super().__new__(cls)
                cls._instance._engine = create_engine(
                    dsn,
                    poolclass=QueuePool,
                    pool_size=10,
                    max_overflow=5,
                    pool_pre_ping=True,
                )
        return cls._instance

    @property
    def engine(self):
        return self._engine

Part 2: Pandas and Data Processing (10 Questions)

Q11: Efficient groupby — what's the difference between transform and agg?

Python
import pandas as pd
import numpy as np

df = pd.DataFrame({
    "customer_id": [1, 1, 2, 2, 3],
    "revenue":     [100, 200, 50, 150, 300],
    "region":      ["US", "US", "EU", "EU", "US"],
})

# agg: returns one row per group (reduces cardinality)
customer_totals = df.groupby("customer_id")["revenue"].agg(["sum", "mean", "count"])

# transform: returns a value for every row in the original DataFrame
# Use when you want to add a group-level stat as a new column
df["customer_total"]     = df.groupby("customer_id")["revenue"].transform("sum")
df["region_avg_revenue"] = df.groupby("region")["revenue"].transform("mean")
df["pct_of_customer"]    = df["revenue"] / df["customer_total"]

print(df)

When transform is critical: Feature engineering for ML (add group statistics as features without losing the original row granularity).


Q12: Avoiding apply() bottlenecks

Python
# Bad: apply() with a Python function  bypasses vectorisation
# 100x slower than vectorised equivalents for large DataFrames
df["category"] = df["revenue"].apply(
    lambda x: "high" if x > 1000 else ("medium" if x > 100 else "low")
)

# Good: use pd.cut() for binning
df["category"] = pd.cut(
    df["revenue"],
    bins=[0, 100, 1000, float("inf")],
    labels=["low", "medium", "high"],
)

# Good: use np.where / np.select for conditional logic
df["discount_pct"] = np.select(
    condlist=[
        df["revenue"] > 10_000,
        df["revenue"] > 1_000,
        df["revenue"] > 100,
    ],
    choicelist=[0.20, 0.10, 0.05],
    default=0.0,
)

# Good: use vectorised string operations (not apply + lambda)
df["email_domain"] = df["email"].str.split("@").str[-1]

Rule: If you're calling .apply() on a large DataFrame, ask yourself if there's a vectorised alternative. 90% of the time, there is.


Q13: Memory-efficient chunked reads from a database

Python
def load_large_table_chunked(
    query: str,
    engine,
    chunksize: int = 100_000,
    dtypes: dict | None = None,
) -> pd.DataFrame:
    """
    Read a large table in chunks, process each chunk, return one concatenated DataFrame.
    For very large data, don't concatenate — process and write each chunk separately.
    """
    chunks = []

    for chunk in pd.read_sql(query, con=engine, chunksize=chunksize, dtype=dtypes):
        # Apply lightweight transformations per chunk
        chunk = chunk.assign(
            order_date = pd.to_datetime(chunk["order_date"]),
            revenue    = chunk["revenue"].astype("float32"),  # float32 uses half the memory of float64
        )
        chunk = chunk.dropna(subset=["order_id"])
        chunks.append(chunk)

    return pd.concat(chunks, ignore_index=True)


# Memory tip: use categorical dtype for low-cardinality string columns
def optimise_dtypes(df: pd.DataFrame) -> pd.DataFrame:
    """Reduce DataFrame memory footprint by ~60% for typical pipeline data."""
    for col in df.select_dtypes(include="object").columns:
        cardinality = df[col].nunique()
        if cardinality / len(df) < 0.5:  # < 50% unique  use categorical
            df[col] = df[col].astype("category")
    for col in df.select_dtypes(include="float64").columns:
        df[col] = pd.to_numeric(df[col], downcast="float")
    for col in df.select_dtypes(include="int64").columns:
        df[col] = pd.to_numeric(df[col], downcast="integer")
    return df

Q14: Merge pitfalls and how to avoid them

Python
# Common pitfall 1: unexpected row multiplication (many-to-many join)
orders      = pd.DataFrame({"order_id": [1,2,3], "customer_id": [10,10,20]})
customers   = pd.DataFrame({"customer_id": [10,10,20], "segment": ["A","B","C"]})

# Bad: this creates 4 rows from 3  silent data multiplication
bad_result = orders.merge(customers, on="customer_id")
print(f"Expected 3 rows, got {len(bad_result)}")  # Prints: 4

# Fix: deduplicate the lookup table before joining
customers_deduped = customers.drop_duplicates(subset=["customer_id"], keep="last")
good_result = orders.merge(customers_deduped, on="customer_id")
print(f"Expected 3 rows, got {len(good_result)}")  # Prints: 3


# Common pitfall 2: silent type mismatch on join key
df1 = pd.DataFrame({"id": [1, 2, 3]})            # int64
df2 = pd.DataFrame({"id": ["1", "2", "3"]})       # object
silent_empty = df1.merge(df2, on="id")
print(f"Rows: {len(silent_empty)}")  # Prints: 0  silent failure!

# Fix: always assert dtypes before merging
assert df1["id"].dtype == df2["id"].dtype, \
    f"Join key type mismatch: {df1['id'].dtype} vs {df2['id'].dtype}"

Q15: Handling duplicates in pipeline data

Python
def deduplicate_pipeline_data(
    df: pd.DataFrame,
    key_cols: list[str],
    timestamp_col: str = "updated_at",
    strategy: str = "latest",  # "latest" | "first" | "count_check"
) -> pd.DataFrame:
    """
    Production-grade deduplication with multiple strategies.
    """
    n_before = len(df)

    if strategy == "latest":
        # Keep the most recently updated version of each key
        df = (
            df.sort_values(timestamp_col, ascending=False)
              .drop_duplicates(subset=key_cols, keep="first")
        )
    elif strategy == "first":
        df = df.sort_values(timestamp_col).drop_duplicates(subset=key_cols, keep="first")
    elif strategy == "count_check":
        dup_counts = df.groupby(key_cols).size()
        duplicated_keys = dup_counts[dup_counts > 1]
        if len(duplicated_keys) > 0:
            raise ValueError(
                f"Unexpected duplicates on {key_cols}: "
                f"{len(duplicated_keys)} keys have multiple rows"
            )
        return df

    n_after = len(df)
    dup_rate = (n_before - n_after) / n_before if n_before > 0 else 0
    if dup_rate > 0.05:  # alert if >5% of rows were duplicates
        import logging
        logging.warning("High duplicate rate: %.1f%% (%d rows removed)",
                        dup_rate * 100, n_before - n_after)
    return df

Q16-20: More Pandas Patterns

Python
# Q16: Efficient pivot without pivot_table overhead
# pivot_table is flexible but slow on large DataFrames
# Prefer groupby + unstack for large data

daily_revenue = (
    df.groupby(["product_id", pd.Grouper(key="order_date", freq="M")])["revenue"]
      .sum()
      .unstack(level="order_date")
      .fillna(0)
)


# Q17: Rolling join / asof merge (e.g., apply exchange rates to transactions)
rates = pd.DataFrame({
    "date":     pd.to_datetime(["2025-01-01", "2025-02-01", "2025-03-01"]),
    "eur_usd":  [1.08, 1.09, 1.07],
}).sort_values("date")

transactions = pd.DataFrame({
    "tx_date":  pd.to_datetime(["2025-01-15", "2025-02-10", "2025-03-20"]),
    "amount":   [1000, 2000, 1500],
}).sort_values("tx_date")

# pd.merge_asof: join each transaction to the most recent exchange rate
merged = pd.merge_asof(transactions, rates,
                        left_on="tx_date", right_on="date",
                        direction="backward")


# Q18: Efficient string operations  use str accessor, not apply
df["domain"] = df["email"].str.extract(r"@(.+)$")[0]
df["is_free_email"] = df["domain"].isin(["gmail.com", "yahoo.com", "hotmail.com"])


# Q19: Memory-mapped files for reading large Parquet without loading fully
import pyarrow.parquet as pq

table = pq.read_table(
    "large_file.parquet",
    columns=["order_id", "revenue"],       # column pruning
    filters=[("order_date", ">=", "2025-01-01")],  # row group pruning
)
df = table.to_pandas()


# Q20: Using eval() and query() for performance on large DataFrames
# eval() uses numexpr under the hood  2-5x faster for large arrays
df["profit"] = df.eval("revenue - (unit_cost * quantity)")
high_value   = df.query("revenue > 1000 and region == 'US'")

Part 3: Pipeline Patterns (10 Questions)

Q21: Retry with exponential backoff

Python
import time
import functools
import logging
from typing import TypeVar, Callable

F = TypeVar("F", bound=Callable)
logger = logging.getLogger(__name__)


def retry_with_backoff(
    max_attempts: int = 3,
    backoff_base: float = 2.0,
    exceptions: tuple = (Exception,),
    reraise: bool = True,
):
    """
    Decorator: retry a function with exponential backoff.
    backoff_base=2.0 → waits 2s, 4s, 8s between attempts.
    """
    def decorator(func: F) -> F:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    if attempt == max_attempts:
                        logger.error(
                            "%s failed after %d attempts: %s",
                            func.__name__, max_attempts, e,
                        )
                        if reraise:
                            raise
                        return None
                    wait = backoff_base ** (attempt - 1)
                    logger.warning(
                        "%s attempt %d/%d failed (%s). Retrying in %.1fs.",
                        func.__name__, attempt, max_attempts, e, wait,
                    )
                    time.sleep(wait)
            return None
        return wrapper  # type: ignore
    return decorator


# Usage
@retry_with_backoff(max_attempts=4, backoff_base=2.0, exceptions=(ConnectionError, TimeoutError))
def fetch_from_api(endpoint: str) -> dict:
    import httpx
    response = httpx.get(endpoint, timeout=10)
    response.raise_for_status()
    return response.json()

Q22: Idempotency patterns for pipeline stages

Python
from hashlib import md5
import json


def make_idempotency_key(pipeline: str, run_date: str, table: str) -> str:
    """Generate a deterministic key for a pipeline run."""
    payload = json.dumps({"pipeline": pipeline, "run_date": run_date, "table": table})
    return md5(payload.encode()).hexdigest()


def is_run_already_completed(conn, idempotency_key: str) -> bool:
    """Check if this exact pipeline run already succeeded."""
    with conn.cursor() as cur:
        cur.execute(
            "SELECT 1 FROM pipeline_runs WHERE idempotency_key = %s AND status = 'success'",
            (idempotency_key,),
        )
        return cur.fetchone() is not None


def record_run_start(conn, idempotency_key: str, metadata: dict) -> None:
    with conn.cursor() as cur:
        cur.execute(
            """
            INSERT INTO pipeline_runs (idempotency_key, status, metadata, started_at)
            VALUES (%s, 'running', %s::jsonb, NOW())
            ON CONFLICT (idempotency_key) DO NOTHING
            """,
            (idempotency_key, json.dumps(metadata)),
        )


def record_run_success(conn, idempotency_key: str, rows_loaded: int) -> None:
    with conn.cursor() as cur:
        cur.execute(
            """
            UPDATE pipeline_runs
            SET status = 'success', rows_loaded = %s, completed_at = NOW()
            WHERE idempotency_key = %s
            """,
            (rows_loaded, idempotency_key),
        )


# Idempotent pipeline runner
def run_idempotent_pipeline(pipeline: str, run_date: str, table: str):
    key = make_idempotency_key(pipeline, run_date, table)

    with db_connection(DSN) as conn:
        if is_run_already_completed(conn, key):
            logger.info("Pipeline already completed for key %s — skipping", key)
            return

        record_run_start(conn, key, {"run_date": run_date, "table": table})

    # ... do the actual work ...
    rows = 0

    with db_connection(DSN) as conn:
        record_run_success(conn, key, rows_loaded=rows)

Q23: Config management with Pydantic and environment variables

Python
from pydantic import BaseSettings, SecretStr, validator
from typing import Optional


class PipelineSettings(BaseSettings):
    """
    Pydantic BaseSettings reads from environment variables automatically.
    Secret fields are masked in logs.
    """
    app_env:          str      = "development"
    source_host:      str
    source_port:      int      = 5432
    source_db:        str
    source_password:  SecretStr
    warehouse_url:    SecretStr
    batch_size:       int      = 50_000
    slack_webhook:    Optional[SecretStr] = None
    log_level:        str      = "INFO"

    @validator("batch_size")
    def validate_batch_size(cls, v):
        if not 1000 <= v <= 500_000:
            raise ValueError(f"batch_size must be between 1000 and 500000, got {v}")
        return v

    @validator("log_level")
    def validate_log_level(cls, v):
        if v not in {"DEBUG", "INFO", "WARNING", "ERROR"}:
            raise ValueError(f"Invalid log_level: {v}")
        return v

    @property
    def source_dsn(self) -> str:
        return (
            f"postgresql://{self.source_host}:{self.source_port}/{self.source_db}"
            f"?password={self.source_password.get_secret_value()}"
        )

    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"


# Reads from environment or .env file automatically
settings = PipelineSettings()

Q24: Dependency injection in pipelines

Python
from abc import ABC, abstractmethod
import pandas as pd


class BaseExtractor(ABC):
    @abstractmethod
    def extract(self, run_date: date) -> pd.DataFrame: ...


class BaseLoader(ABC):
    @abstractmethod
    def load(self, df: pd.DataFrame, table: str) -> int: ...


class PostgresExtractor(BaseExtractor):
    def __init__(self, dsn: str):
        self._dsn = dsn

    def extract(self, run_date: date) -> pd.DataFrame:
        return pd.read_sql(
            "SELECT * FROM orders WHERE order_date = %(d)s",
            con=create_engine(self._dsn),
            params={"d": run_date},
        )


class SnowflakeLoader(BaseLoader):
    def __init__(self, conn_params: dict):
        self._conn = conn_params

    def load(self, df: pd.DataFrame, table: str) -> int:
        df.to_sql(table, con=create_engine(**self._conn), if_exists="append", index=False)
        return len(df)


class OrdersPipeline:
    """
    Pipeline with injected dependencies — easy to test with mocks.
    """
    def __init__(self, extractor: BaseExtractor, loader: BaseLoader):
        self._extractor = extractor
        self._loader    = loader

    def run(self, run_date: date) -> int:
        df = self._extractor.extract(run_date)
        df = self._transform(df)
        return self._loader.load(df, "orders_warehouse")

    def _transform(self, df: pd.DataFrame) -> pd.DataFrame:
        return df.assign(processed_at=pd.Timestamp.utcnow())


# Production instantiation
pipeline = OrdersPipeline(
    extractor=PostgresExtractor(dsn=settings.source_dsn),
    loader=SnowflakeLoader(conn_params={"url": settings.warehouse_url.get_secret_value()}),
)

Q25-30: Testing Pipeline Code

Python
# Q25: Unit testing a transformation function
import pytest
import pandas as pd
from pandas.testing import assert_frame_equal

def test_transform_clips_negative_revenue():
    input_df = pd.DataFrame({
        "order_id": [1, 2, 3],
        "revenue":  [100.0, -50.0, 0.0],
    })
    result = transform_orders(input_df)
    assert (result["revenue"] >= 0).all(), "Negative revenue should be clipped to 0"


def test_transform_drops_null_customer():
    input_df = pd.DataFrame({
        "order_id":    [1, 2],
        "customer_id": [10, None],
        "revenue":     [100.0, 200.0],
    })
    result = transform_orders(input_df)
    assert len(result) == 1
    assert result.iloc[0]["order_id"] == 1


# Q26: Integration test with a real (in-memory) database
import duckdb

def test_incremental_load_is_idempotent():
    conn = duckdb.connect()
    conn.execute("""
        CREATE TABLE orders_target (
            order_id INT PRIMARY KEY,
            revenue DOUBLE,
            updated_at TIMESTAMP
        )
    """)

    staging_df = pd.DataFrame({
        "order_id":   [1, 2, 3],
        "revenue":    [100.0, 200.0, 300.0],
        "updated_at": pd.to_datetime(["2026-05-07"]*3),
    })

    conn.register("staging", staging_df)
    conn.execute("""
        INSERT OR REPLACE INTO orders_target
        SELECT * FROM staging
    """)

    first_count = conn.execute("SELECT COUNT(*) FROM orders_target").fetchone()[0]

    # Re-run the same load (idempotency test)
    conn.execute("INSERT OR REPLACE INTO orders_target SELECT * FROM staging")
    second_count = conn.execute("SELECT COUNT(*) FROM orders_target").fetchone()[0]

    assert first_count == second_count == 3, "Idempotent load should not create duplicates"


# Q27: Mocking external dependencies in pipeline tests
from unittest.mock import MagicMock, patch

def test_pipeline_retries_on_connection_error():
    mock_extractor = MagicMock(spec=BaseExtractor)
    mock_extractor.extract.side_effect = [
        ConnectionError("DB unavailable"),
        pd.DataFrame({"order_id": [1], "revenue": [100.0]}),
    ]

    mock_loader = MagicMock(spec=BaseLoader)
    mock_loader.load.return_value = 1

    pipeline = OrdersPipeline(extractor=mock_extractor, loader=mock_loader)

    # With retry decorator, the second attempt should succeed
    with patch("time.sleep"):  # don't actually sleep in tests
        result = pipeline.run(date.today())
    assert result == 1


# Q28: Property-based testing with Hypothesis
from hypothesis import given, strategies as st
from hypothesis.extra.pandas import column, data_frames

@given(data_frames([
    column("order_id",   elements=st.integers(min_value=1)),
    column("revenue",    elements=st.floats(min_value=-10000, max_value=100000)),
    column("customer_id", elements=st.one_of(st.integers(min_value=1), st.none())),
]))
def test_transform_never_raises(df):
    """Transform should handle any valid-typed input without raising."""
    try:
        result = transform_orders(df)
        assert (result["revenue"] >= 0).all()
    except Exception as e:
        pytest.fail(f"transform_orders raised unexpectedly: {e}")


# Q29: Testing row count expectations
def test_deduplication_reduces_rows():
    duped_df = pd.DataFrame({
        "order_id":   [1, 1, 2, 2, 3],
        "revenue":    [100, 100, 200, 250, 300],
        "updated_at": pd.to_datetime(["2026-05-06","2026-05-07","2026-05-06",
                                       "2026-05-07","2026-05-07"]),
    })
    result = deduplicate_pipeline_data(duped_df, ["order_id"], "updated_at")
    assert len(result) == 3
    # Verify we kept the latest version of order_id=2 (revenue=250)
    assert result.set_index("order_id").loc[2, "revenue"] == 250


# Q30: End-to-end pipeline test with fixtures
@pytest.fixture
def sample_orders_df():
    return pd.DataFrame({
        "order_id":    range(1, 101),
        "customer_id": [i % 20 + 1 for i in range(100)],
        "revenue":     [float(i * 10) for i in range(100)],
        "updated_at":  pd.date_range("2026-05-01", periods=100, freq="1h"),
    })

def test_full_pipeline_produces_expected_row_count(sample_orders_df, tmp_path):
    output_path = tmp_path / "output.parquet"

    # Write input
    input_path = tmp_path / "input.parquet"
    sample_orders_df.to_parquet(input_path, index=False)

    # Run pipeline
    df = pd.read_parquet(input_path)
    df = transform_orders(df)
    df = deduplicate_pipeline_data(df, ["order_id"], "updated_at")
    df.to_parquet(output_path, index=False)

    result = pd.read_parquet(output_path)
    assert len(result) == 100
    assert (result["revenue"] >= 0).all()

Interview Tips: What Sets Strong Candidates Apart

On generators: Explain memory implications and when to use yield from for delegating to sub-generators.

On context managers: Know both the @contextmanager decorator form and the class-based __enter__/__exit__ form. Know when the class form is better (when you need state across enter/exit).

On pandas: Show you know the difference between loc, iloc, and [] and when each is appropriate. Know that chained indexing (df["a"]["b"] = x) creates a copy and doesn't modify the original.

On testing: Show that you write tests that verify business logic, not just that code runs. Property-based tests (Hypothesis) impress senior interviewers.

On pipeline patterns: Idempotency is the most important concept — be able to explain it, implement it in SQL and Python, and explain what happens when it's violated.

Enjoyed this article?

Explore the Data Engineering learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.