Data Engineering Interview Prep · Lesson 3 of 3
Python Interview: 30 Pipeline & Engineering Questions
How to Use This Guide
These questions target mid-to-senior data engineering roles. The code answers are production-quality — not toy examples. Study the patterns, not just the syntax.
Part 1: Python Fundamentals in Data Engineering Context (10 Questions)
Q1: How do you process a 50GB CSV file without loading it into memory?
Question: You have a 50GB CSV file that must be processed row-by-row and loaded to a database. How do you handle this in Python without an OOM error?
import csv
import psycopg2
from typing import Generator, Iterator
def csv_row_generator(filepath: str, batch_size: int = 10_000) -> Generator[list[dict], None, None]:
"""
Memory-efficient CSV reader using a generator.
Yields batches of rows rather than individual rows — reduces DB round trips.
Memory usage: O(batch_size), not O(file_size).
"""
with open(filepath, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
batch = []
for row in reader:
batch.append(row)
if len(batch) >= batch_size:
yield batch
batch = []
if batch:
yield batch # yield the final partial batch
def load_csv_to_db(filepath: str, connection_string: str) -> int:
"""Returns total rows loaded."""
total = 0
conn = psycopg2.connect(connection_string)
try:
with conn.cursor() as cur:
for batch in csv_row_generator(filepath, batch_size=10_000):
psycopg2.extras.execute_values(
cur,
"INSERT INTO staging_table (col1, col2, col3) VALUES %s",
[(r["col1"], r["col2"], r["col3"]) for r in batch],
page_size=1000,
)
total += len(batch)
conn.commit() # commit per batch — reduces transaction size
finally:
conn.close()
return totalWhy this matters: Generators are lazy — they compute one value at a time and don't hold the full dataset in memory. Processing in batches (not row by row) amortises the network cost of DB inserts. execute_values is ~50x faster than individual INSERT statements for PostgreSQL.
Q2: Write a context manager for database connection management
from contextlib import contextmanager
from typing import Generator
import psycopg2
import logging
logger = logging.getLogger(__name__)
@contextmanager
def db_connection(dsn: str, autocommit: bool = False) -> Generator:
"""
Context manager for PostgreSQL connections.
- Handles commit/rollback automatically
- Closes connection on exit (even on exception)
- Logs connection lifecycle for observability
"""
conn = None
try:
conn = psycopg2.connect(dsn)
conn.autocommit = autocommit
logger.debug("DB connection opened: %s", dsn.split("@")[-1]) # redact credentials
yield conn
if not autocommit:
conn.commit()
logger.debug("Transaction committed")
except Exception as e:
if conn and not autocommit:
conn.rollback()
logger.warning("Transaction rolled back: %s", e)
raise
finally:
if conn:
conn.close()
logger.debug("DB connection closed")
# Usage
with db_connection(DSN) as conn:
with conn.cursor() as cur:
cur.execute("INSERT INTO events VALUES (%s, %s)", (event_id, payload))
# commit + close happen automaticallyClass-based alternative (when you need __enter__ and __exit__):
class DatabaseConnection:
def __init__(self, dsn: str):
self.dsn = dsn
self.conn = None
def __enter__(self):
self.conn = psycopg2.connect(self.dsn)
return self.conn
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type:
self.conn.rollback()
else:
self.conn.commit()
self.conn.close()
return False # don't suppress exceptionsQ3: When and how do you use functools.lru_cache in a pipeline?
from functools import lru_cache
import psycopg2
# Perfect use case: slow lookup that's called many times per pipeline run
# but the lookup data doesn't change during a single run
@lru_cache(maxsize=1024)
def get_product_category(product_id: str) -> str:
"""
Cache product category lookups — called millions of times per pipeline run
but the catalogue changes rarely.
Without cache: 1M DB calls. With cache: ~10k unique products → 10k DB calls.
"""
with db_connection(DSN) as conn:
with conn.cursor() as cur:
cur.execute("SELECT category FROM products WHERE product_id = %s", (product_id,))
row = cur.fetchone()
return row[0] if row else "unknown"
# Clear the cache between pipeline runs (stale data risk)
def run_pipeline():
get_product_category.cache_clear()
for row in csv_row_generator("orders.csv"):
category = get_product_category(row["product_id"]) # hits cache after first call
...
# Cache info for monitoring
info = get_product_category.cache_info()
print(f"Cache hits: {info.hits}, misses: {info.misses}, hit rate: {info.hits/(info.hits+info.misses):.1%}")When NOT to use lru_cache: When the function has side effects, when arguments aren't hashable (use functools.cache for simpler cases), or when the cache should be shared across processes (use Redis instead).
Q4: Write a type-annotated pipeline function with proper type hints
from typing import Iterator, TypeVar
from collections.abc import Callable
import pandas as pd
from datetime import date
T = TypeVar("T")
def extract_orders(
start_date: date,
end_date: date,
batch_size: int = 50_000,
) -> Iterator[pd.DataFrame]:
"""Extract orders from source in batches. Yields DataFrames."""
offset = 0
while True:
df = pd.read_sql(
"""
SELECT order_id, customer_id, product_id, revenue, order_date
FROM orders
WHERE order_date BETWEEN %(start)s AND %(end)s
ORDER BY order_id
LIMIT %(limit)s OFFSET %(offset)s
""",
con=get_engine(),
params={"start": start_date, "end": end_date,
"limit": batch_size, "offset": offset},
)
if df.empty:
break
yield df
offset += batch_size
def transform_orders(df: pd.DataFrame) -> pd.DataFrame:
"""Apply business transformations. Pure function — no side effects."""
return (
df
.assign(
revenue = lambda x: x["revenue"].clip(lower=0),
order_date = lambda x: pd.to_datetime(x["order_date"]),
revenue_tier = lambda x: pd.cut(
x["revenue"],
bins=[0, 50, 200, 1000, float("inf")],
labels=["micro", "small", "medium", "large"],
),
)
.dropna(subset=["customer_id", "product_id"])
)
def load_orders(df: pd.DataFrame, target_table: str, engine) -> int:
"""Load to warehouse. Returns row count loaded."""
df.to_sql(target_table, con=engine, if_exists="append", index=False, method="multi")
return len(df)Q5: Implement a dataclass for pipeline configuration
from dataclasses import dataclass, field
from typing import Optional
import os
import yaml
@dataclass
class PipelineConfig:
"""
Strongly-typed pipeline configuration.
Loaded from YAML; secrets injected from environment variables.
"""
pipeline_name: str
source_dsn: str
target_dsn: str
batch_size: int = 50_000
max_retries: int = 3
backoff_base: float = 2.0
tags: list[str] = field(default_factory=list)
dry_run: bool = False
notify_slack: bool = True
slack_channel: Optional[str] = None
@classmethod
def from_yaml(cls, path: str) -> "PipelineConfig":
with open(path) as f:
raw = yaml.safe_load(f)
# Inject secrets from environment
raw["source_dsn"] = os.environ["SOURCE_DSN"]
raw["target_dsn"] = os.environ["TARGET_DSN"]
return cls(**raw)
def __post_init__(self):
if self.batch_size <= 0:
raise ValueError(f"batch_size must be positive, got {self.batch_size}")
if self.max_retries < 0:
raise ValueError(f"max_retries must be >= 0, got {self.max_retries}")
if self.notify_slack and not self.slack_channel:
raise ValueError("slack_channel required when notify_slack=True")
# config.yaml
# pipeline_name: orders_daily
# batch_size: 50000
# max_retries: 3
# notify_slack: true
# slack_channel: "#data-alerts"Q6-10: More Python Fundamentals
# Q6: Exception hierarchy for pipeline errors
class PipelineError(Exception):
"""Base exception for all pipeline errors."""
pass
class ExtractionError(PipelineError):
"""Raised when source data cannot be read."""
pass
class TransformationError(PipelineError):
"""Raised when transformation logic fails — usually a data quality issue."""
def __init__(self, column: str, message: str):
self.column = column
super().__init__(f"Transformation failed on column '{column}': {message}")
class LoadError(PipelineError):
"""Raised when writing to the target fails."""
pass
# Q7: Using __slots__ for memory efficiency in high-volume event processing
class Event:
__slots__ = ("event_id", "user_id", "event_type", "timestamp", "payload")
def __init__(self, event_id, user_id, event_type, timestamp, payload):
self.event_id = event_id
self.user_id = user_id
self.event_type = event_type
self.timestamp = timestamp
self.payload = payload
# With __slots__, each Event uses ~40% less memory than a dict-based equivalent
# Critical when you're holding 1M+ events in memory during batch processing
# Q8: Protocol for duck-typed pipeline stages
from typing import Protocol
class Extractor(Protocol):
def extract(self, run_date: date) -> Iterator[pd.DataFrame]: ...
class Transformer(Protocol):
def transform(self, df: pd.DataFrame) -> pd.DataFrame: ...
class Loader(Protocol):
def load(self, df: pd.DataFrame) -> int: ...
def run_etl(extractor: Extractor, transformer: Transformer, loader: Loader) -> int:
"""Runs any ETL combination without inheritance."""
total = 0
for batch in extractor.extract(date.today()):
transformed = transformer.transform(batch)
total += loader.load(transformed)
return total
# Q9: Using collections.defaultdict for grouping pipeline events
from collections import defaultdict
def group_events_by_user(events: list[dict]) -> dict[str, list[dict]]:
grouped = defaultdict(list)
for event in events:
grouped[event["user_id"]].append(event)
return dict(grouped)
# Q10: Thread-safe singleton for a shared DB connection pool
import threading
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
class ConnectionPool:
_instance = None
_lock = threading.Lock()
def __new__(cls, dsn: str):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._engine = create_engine(
dsn,
poolclass=QueuePool,
pool_size=10,
max_overflow=5,
pool_pre_ping=True,
)
return cls._instance
@property
def engine(self):
return self._enginePart 2: Pandas and Data Processing (10 Questions)
Q11: Efficient groupby — what's the difference between transform and agg?
import pandas as pd
import numpy as np
df = pd.DataFrame({
"customer_id": [1, 1, 2, 2, 3],
"revenue": [100, 200, 50, 150, 300],
"region": ["US", "US", "EU", "EU", "US"],
})
# agg: returns one row per group (reduces cardinality)
customer_totals = df.groupby("customer_id")["revenue"].agg(["sum", "mean", "count"])
# transform: returns a value for every row in the original DataFrame
# Use when you want to add a group-level stat as a new column
df["customer_total"] = df.groupby("customer_id")["revenue"].transform("sum")
df["region_avg_revenue"] = df.groupby("region")["revenue"].transform("mean")
df["pct_of_customer"] = df["revenue"] / df["customer_total"]
print(df)When transform is critical: Feature engineering for ML (add group statistics as features without losing the original row granularity).
Q12: Avoiding apply() bottlenecks
# Bad: apply() with a Python function — bypasses vectorisation
# 100x slower than vectorised equivalents for large DataFrames
df["category"] = df["revenue"].apply(
lambda x: "high" if x > 1000 else ("medium" if x > 100 else "low")
)
# Good: use pd.cut() for binning
df["category"] = pd.cut(
df["revenue"],
bins=[0, 100, 1000, float("inf")],
labels=["low", "medium", "high"],
)
# Good: use np.where / np.select for conditional logic
df["discount_pct"] = np.select(
condlist=[
df["revenue"] > 10_000,
df["revenue"] > 1_000,
df["revenue"] > 100,
],
choicelist=[0.20, 0.10, 0.05],
default=0.0,
)
# Good: use vectorised string operations (not apply + lambda)
df["email_domain"] = df["email"].str.split("@").str[-1]Rule: If you're calling .apply() on a large DataFrame, ask yourself if there's a vectorised alternative. 90% of the time, there is.
Q13: Memory-efficient chunked reads from a database
def load_large_table_chunked(
query: str,
engine,
chunksize: int = 100_000,
dtypes: dict | None = None,
) -> pd.DataFrame:
"""
Read a large table in chunks, process each chunk, return one concatenated DataFrame.
For very large data, don't concatenate — process and write each chunk separately.
"""
chunks = []
for chunk in pd.read_sql(query, con=engine, chunksize=chunksize, dtype=dtypes):
# Apply lightweight transformations per chunk
chunk = chunk.assign(
order_date = pd.to_datetime(chunk["order_date"]),
revenue = chunk["revenue"].astype("float32"), # float32 uses half the memory of float64
)
chunk = chunk.dropna(subset=["order_id"])
chunks.append(chunk)
return pd.concat(chunks, ignore_index=True)
# Memory tip: use categorical dtype for low-cardinality string columns
def optimise_dtypes(df: pd.DataFrame) -> pd.DataFrame:
"""Reduce DataFrame memory footprint by ~60% for typical pipeline data."""
for col in df.select_dtypes(include="object").columns:
cardinality = df[col].nunique()
if cardinality / len(df) < 0.5: # < 50% unique → use categorical
df[col] = df[col].astype("category")
for col in df.select_dtypes(include="float64").columns:
df[col] = pd.to_numeric(df[col], downcast="float")
for col in df.select_dtypes(include="int64").columns:
df[col] = pd.to_numeric(df[col], downcast="integer")
return dfQ14: Merge pitfalls and how to avoid them
# Common pitfall 1: unexpected row multiplication (many-to-many join)
orders = pd.DataFrame({"order_id": [1,2,3], "customer_id": [10,10,20]})
customers = pd.DataFrame({"customer_id": [10,10,20], "segment": ["A","B","C"]})
# Bad: this creates 4 rows from 3 — silent data multiplication
bad_result = orders.merge(customers, on="customer_id")
print(f"Expected 3 rows, got {len(bad_result)}") # Prints: 4
# Fix: deduplicate the lookup table before joining
customers_deduped = customers.drop_duplicates(subset=["customer_id"], keep="last")
good_result = orders.merge(customers_deduped, on="customer_id")
print(f"Expected 3 rows, got {len(good_result)}") # Prints: 3
# Common pitfall 2: silent type mismatch on join key
df1 = pd.DataFrame({"id": [1, 2, 3]}) # int64
df2 = pd.DataFrame({"id": ["1", "2", "3"]}) # object
silent_empty = df1.merge(df2, on="id")
print(f"Rows: {len(silent_empty)}") # Prints: 0 — silent failure!
# Fix: always assert dtypes before merging
assert df1["id"].dtype == df2["id"].dtype, \
f"Join key type mismatch: {df1['id'].dtype} vs {df2['id'].dtype}"Q15: Handling duplicates in pipeline data
def deduplicate_pipeline_data(
df: pd.DataFrame,
key_cols: list[str],
timestamp_col: str = "updated_at",
strategy: str = "latest", # "latest" | "first" | "count_check"
) -> pd.DataFrame:
"""
Production-grade deduplication with multiple strategies.
"""
n_before = len(df)
if strategy == "latest":
# Keep the most recently updated version of each key
df = (
df.sort_values(timestamp_col, ascending=False)
.drop_duplicates(subset=key_cols, keep="first")
)
elif strategy == "first":
df = df.sort_values(timestamp_col).drop_duplicates(subset=key_cols, keep="first")
elif strategy == "count_check":
dup_counts = df.groupby(key_cols).size()
duplicated_keys = dup_counts[dup_counts > 1]
if len(duplicated_keys) > 0:
raise ValueError(
f"Unexpected duplicates on {key_cols}: "
f"{len(duplicated_keys)} keys have multiple rows"
)
return df
n_after = len(df)
dup_rate = (n_before - n_after) / n_before if n_before > 0 else 0
if dup_rate > 0.05: # alert if >5% of rows were duplicates
import logging
logging.warning("High duplicate rate: %.1f%% (%d rows removed)",
dup_rate * 100, n_before - n_after)
return dfQ16-20: More Pandas Patterns
# Q16: Efficient pivot without pivot_table overhead
# pivot_table is flexible but slow on large DataFrames
# Prefer groupby + unstack for large data
daily_revenue = (
df.groupby(["product_id", pd.Grouper(key="order_date", freq="M")])["revenue"]
.sum()
.unstack(level="order_date")
.fillna(0)
)
# Q17: Rolling join / asof merge (e.g., apply exchange rates to transactions)
rates = pd.DataFrame({
"date": pd.to_datetime(["2025-01-01", "2025-02-01", "2025-03-01"]),
"eur_usd": [1.08, 1.09, 1.07],
}).sort_values("date")
transactions = pd.DataFrame({
"tx_date": pd.to_datetime(["2025-01-15", "2025-02-10", "2025-03-20"]),
"amount": [1000, 2000, 1500],
}).sort_values("tx_date")
# pd.merge_asof: join each transaction to the most recent exchange rate
merged = pd.merge_asof(transactions, rates,
left_on="tx_date", right_on="date",
direction="backward")
# Q18: Efficient string operations — use str accessor, not apply
df["domain"] = df["email"].str.extract(r"@(.+)$")[0]
df["is_free_email"] = df["domain"].isin(["gmail.com", "yahoo.com", "hotmail.com"])
# Q19: Memory-mapped files for reading large Parquet without loading fully
import pyarrow.parquet as pq
table = pq.read_table(
"large_file.parquet",
columns=["order_id", "revenue"], # column pruning
filters=[("order_date", ">=", "2025-01-01")], # row group pruning
)
df = table.to_pandas()
# Q20: Using eval() and query() for performance on large DataFrames
# eval() uses numexpr under the hood — 2-5x faster for large arrays
df["profit"] = df.eval("revenue - (unit_cost * quantity)")
high_value = df.query("revenue > 1000 and region == 'US'")Part 3: Pipeline Patterns (10 Questions)
Q21: Retry with exponential backoff
import time
import functools
import logging
from typing import TypeVar, Callable
F = TypeVar("F", bound=Callable)
logger = logging.getLogger(__name__)
def retry_with_backoff(
max_attempts: int = 3,
backoff_base: float = 2.0,
exceptions: tuple = (Exception,),
reraise: bool = True,
):
"""
Decorator: retry a function with exponential backoff.
backoff_base=2.0 → waits 2s, 4s, 8s between attempts.
"""
def decorator(func: F) -> F:
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == max_attempts:
logger.error(
"%s failed after %d attempts: %s",
func.__name__, max_attempts, e,
)
if reraise:
raise
return None
wait = backoff_base ** (attempt - 1)
logger.warning(
"%s attempt %d/%d failed (%s). Retrying in %.1fs.",
func.__name__, attempt, max_attempts, e, wait,
)
time.sleep(wait)
return None
return wrapper # type: ignore
return decorator
# Usage
@retry_with_backoff(max_attempts=4, backoff_base=2.0, exceptions=(ConnectionError, TimeoutError))
def fetch_from_api(endpoint: str) -> dict:
import httpx
response = httpx.get(endpoint, timeout=10)
response.raise_for_status()
return response.json()Q22: Idempotency patterns for pipeline stages
from hashlib import md5
import json
def make_idempotency_key(pipeline: str, run_date: str, table: str) -> str:
"""Generate a deterministic key for a pipeline run."""
payload = json.dumps({"pipeline": pipeline, "run_date": run_date, "table": table})
return md5(payload.encode()).hexdigest()
def is_run_already_completed(conn, idempotency_key: str) -> bool:
"""Check if this exact pipeline run already succeeded."""
with conn.cursor() as cur:
cur.execute(
"SELECT 1 FROM pipeline_runs WHERE idempotency_key = %s AND status = 'success'",
(idempotency_key,),
)
return cur.fetchone() is not None
def record_run_start(conn, idempotency_key: str, metadata: dict) -> None:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO pipeline_runs (idempotency_key, status, metadata, started_at)
VALUES (%s, 'running', %s::jsonb, NOW())
ON CONFLICT (idempotency_key) DO NOTHING
""",
(idempotency_key, json.dumps(metadata)),
)
def record_run_success(conn, idempotency_key: str, rows_loaded: int) -> None:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE pipeline_runs
SET status = 'success', rows_loaded = %s, completed_at = NOW()
WHERE idempotency_key = %s
""",
(rows_loaded, idempotency_key),
)
# Idempotent pipeline runner
def run_idempotent_pipeline(pipeline: str, run_date: str, table: str):
key = make_idempotency_key(pipeline, run_date, table)
with db_connection(DSN) as conn:
if is_run_already_completed(conn, key):
logger.info("Pipeline already completed for key %s — skipping", key)
return
record_run_start(conn, key, {"run_date": run_date, "table": table})
# ... do the actual work ...
rows = 0
with db_connection(DSN) as conn:
record_run_success(conn, key, rows_loaded=rows)Q23: Config management with Pydantic and environment variables
from pydantic import BaseSettings, SecretStr, validator
from typing import Optional
class PipelineSettings(BaseSettings):
"""
Pydantic BaseSettings reads from environment variables automatically.
Secret fields are masked in logs.
"""
app_env: str = "development"
source_host: str
source_port: int = 5432
source_db: str
source_password: SecretStr
warehouse_url: SecretStr
batch_size: int = 50_000
slack_webhook: Optional[SecretStr] = None
log_level: str = "INFO"
@validator("batch_size")
def validate_batch_size(cls, v):
if not 1000 <= v <= 500_000:
raise ValueError(f"batch_size must be between 1000 and 500000, got {v}")
return v
@validator("log_level")
def validate_log_level(cls, v):
if v not in {"DEBUG", "INFO", "WARNING", "ERROR"}:
raise ValueError(f"Invalid log_level: {v}")
return v
@property
def source_dsn(self) -> str:
return (
f"postgresql://{self.source_host}:{self.source_port}/{self.source_db}"
f"?password={self.source_password.get_secret_value()}"
)
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
# Reads from environment or .env file automatically
settings = PipelineSettings()Q24: Dependency injection in pipelines
from abc import ABC, abstractmethod
import pandas as pd
class BaseExtractor(ABC):
@abstractmethod
def extract(self, run_date: date) -> pd.DataFrame: ...
class BaseLoader(ABC):
@abstractmethod
def load(self, df: pd.DataFrame, table: str) -> int: ...
class PostgresExtractor(BaseExtractor):
def __init__(self, dsn: str):
self._dsn = dsn
def extract(self, run_date: date) -> pd.DataFrame:
return pd.read_sql(
"SELECT * FROM orders WHERE order_date = %(d)s",
con=create_engine(self._dsn),
params={"d": run_date},
)
class SnowflakeLoader(BaseLoader):
def __init__(self, conn_params: dict):
self._conn = conn_params
def load(self, df: pd.DataFrame, table: str) -> int:
df.to_sql(table, con=create_engine(**self._conn), if_exists="append", index=False)
return len(df)
class OrdersPipeline:
"""
Pipeline with injected dependencies — easy to test with mocks.
"""
def __init__(self, extractor: BaseExtractor, loader: BaseLoader):
self._extractor = extractor
self._loader = loader
def run(self, run_date: date) -> int:
df = self._extractor.extract(run_date)
df = self._transform(df)
return self._loader.load(df, "orders_warehouse")
def _transform(self, df: pd.DataFrame) -> pd.DataFrame:
return df.assign(processed_at=pd.Timestamp.utcnow())
# Production instantiation
pipeline = OrdersPipeline(
extractor=PostgresExtractor(dsn=settings.source_dsn),
loader=SnowflakeLoader(conn_params={"url": settings.warehouse_url.get_secret_value()}),
)Q25-30: Testing Pipeline Code
# Q25: Unit testing a transformation function
import pytest
import pandas as pd
from pandas.testing import assert_frame_equal
def test_transform_clips_negative_revenue():
input_df = pd.DataFrame({
"order_id": [1, 2, 3],
"revenue": [100.0, -50.0, 0.0],
})
result = transform_orders(input_df)
assert (result["revenue"] >= 0).all(), "Negative revenue should be clipped to 0"
def test_transform_drops_null_customer():
input_df = pd.DataFrame({
"order_id": [1, 2],
"customer_id": [10, None],
"revenue": [100.0, 200.0],
})
result = transform_orders(input_df)
assert len(result) == 1
assert result.iloc[0]["order_id"] == 1
# Q26: Integration test with a real (in-memory) database
import duckdb
def test_incremental_load_is_idempotent():
conn = duckdb.connect()
conn.execute("""
CREATE TABLE orders_target (
order_id INT PRIMARY KEY,
revenue DOUBLE,
updated_at TIMESTAMP
)
""")
staging_df = pd.DataFrame({
"order_id": [1, 2, 3],
"revenue": [100.0, 200.0, 300.0],
"updated_at": pd.to_datetime(["2026-05-07"]*3),
})
conn.register("staging", staging_df)
conn.execute("""
INSERT OR REPLACE INTO orders_target
SELECT * FROM staging
""")
first_count = conn.execute("SELECT COUNT(*) FROM orders_target").fetchone()[0]
# Re-run the same load (idempotency test)
conn.execute("INSERT OR REPLACE INTO orders_target SELECT * FROM staging")
second_count = conn.execute("SELECT COUNT(*) FROM orders_target").fetchone()[0]
assert first_count == second_count == 3, "Idempotent load should not create duplicates"
# Q27: Mocking external dependencies in pipeline tests
from unittest.mock import MagicMock, patch
def test_pipeline_retries_on_connection_error():
mock_extractor = MagicMock(spec=BaseExtractor)
mock_extractor.extract.side_effect = [
ConnectionError("DB unavailable"),
pd.DataFrame({"order_id": [1], "revenue": [100.0]}),
]
mock_loader = MagicMock(spec=BaseLoader)
mock_loader.load.return_value = 1
pipeline = OrdersPipeline(extractor=mock_extractor, loader=mock_loader)
# With retry decorator, the second attempt should succeed
with patch("time.sleep"): # don't actually sleep in tests
result = pipeline.run(date.today())
assert result == 1
# Q28: Property-based testing with Hypothesis
from hypothesis import given, strategies as st
from hypothesis.extra.pandas import column, data_frames
@given(data_frames([
column("order_id", elements=st.integers(min_value=1)),
column("revenue", elements=st.floats(min_value=-10000, max_value=100000)),
column("customer_id", elements=st.one_of(st.integers(min_value=1), st.none())),
]))
def test_transform_never_raises(df):
"""Transform should handle any valid-typed input without raising."""
try:
result = transform_orders(df)
assert (result["revenue"] >= 0).all()
except Exception as e:
pytest.fail(f"transform_orders raised unexpectedly: {e}")
# Q29: Testing row count expectations
def test_deduplication_reduces_rows():
duped_df = pd.DataFrame({
"order_id": [1, 1, 2, 2, 3],
"revenue": [100, 100, 200, 250, 300],
"updated_at": pd.to_datetime(["2026-05-06","2026-05-07","2026-05-06",
"2026-05-07","2026-05-07"]),
})
result = deduplicate_pipeline_data(duped_df, ["order_id"], "updated_at")
assert len(result) == 3
# Verify we kept the latest version of order_id=2 (revenue=250)
assert result.set_index("order_id").loc[2, "revenue"] == 250
# Q30: End-to-end pipeline test with fixtures
@pytest.fixture
def sample_orders_df():
return pd.DataFrame({
"order_id": range(1, 101),
"customer_id": [i % 20 + 1 for i in range(100)],
"revenue": [float(i * 10) for i in range(100)],
"updated_at": pd.date_range("2026-05-01", periods=100, freq="1h"),
})
def test_full_pipeline_produces_expected_row_count(sample_orders_df, tmp_path):
output_path = tmp_path / "output.parquet"
# Write input
input_path = tmp_path / "input.parquet"
sample_orders_df.to_parquet(input_path, index=False)
# Run pipeline
df = pd.read_parquet(input_path)
df = transform_orders(df)
df = deduplicate_pipeline_data(df, ["order_id"], "updated_at")
df.to_parquet(output_path, index=False)
result = pd.read_parquet(output_path)
assert len(result) == 100
assert (result["revenue"] >= 0).all()Interview Tips: What Sets Strong Candidates Apart
On generators: Explain memory implications and when to use yield from for delegating to sub-generators.
On context managers: Know both the @contextmanager decorator form and the class-based __enter__/__exit__ form. Know when the class form is better (when you need state across enter/exit).
On pandas: Show you know the difference between loc, iloc, and [] and when each is appropriate. Know that chained indexing (df["a"]["b"] = x) creates a copy and doesn't modify the original.
On testing: Show that you write tests that verify business logic, not just that code runs. Property-based tests (Hypothesis) impress senior interviewers.
On pipeline patterns: Idempotency is the most important concept — be able to explain it, implement it in SQL and Python, and explain what happens when it's violated.