Data Engineering Python & Pipeline Interview Questions (30 Questions)
30 Python and pipeline interview questions for data engineering roles — generators, context managers, pandas patterns, retry logic, idempotency, config management, and testing pipelines.
How to Use This Guide
These questions target mid-to-senior data engineering roles. The code answers are production-quality — not toy examples. Study the patterns, not just the syntax.
Part 1: Python Fundamentals in Data Engineering Context (10 Questions)
Q1: How do you process a 50GB CSV file without loading it into memory?
Question: You have a 50GB CSV file that must be processed row-by-row and loaded to a database. How do you handle this in Python without an OOM error?
import csv
import psycopg2
from typing import Generator, Iterator
def csv_row_generator(filepath: str, batch_size: int = 10_000) -> Generator[list[dict], None, None]:
"""
Memory-efficient CSV reader using a generator.
Yields batches of rows rather than individual rows — reduces DB round trips.
Memory usage: O(batch_size), not O(file_size).
"""
with open(filepath, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
batch = []
for row in reader:
batch.append(row)
if len(batch) >= batch_size:
yield batch
batch = []
if batch:
yield batch # yield the final partial batch
def load_csv_to_db(filepath: str, connection_string: str) -> int:
"""Returns total rows loaded."""
total = 0
conn = psycopg2.connect(connection_string)
try:
with conn.cursor() as cur:
for batch in csv_row_generator(filepath, batch_size=10_000):
psycopg2.extras.execute_values(
cur,
"INSERT INTO staging_table (col1, col2, col3) VALUES %s",
[(r["col1"], r["col2"], r["col3"]) for r in batch],
page_size=1000,
)
total += len(batch)
conn.commit() # commit per batch — reduces transaction size
finally:
conn.close()
return totalWhy this matters: Generators are lazy — they compute one value at a time and don't hold the full dataset in memory. Processing in batches (not row by row) amortises the network cost of DB inserts. execute_values is ~50x faster than individual INSERT statements for PostgreSQL.
Q2: Write a context manager for database connection management
from contextlib import contextmanager
from typing import Generator
import psycopg2
import logging
logger = logging.getLogger(__name__)
@contextmanager
def db_connection(dsn: str, autocommit: bool = False) -> Generator:
"""
Context manager for PostgreSQL connections.
- Handles commit/rollback automatically
- Closes connection on exit (even on exception)
- Logs connection lifecycle for observability
"""
conn = None
try:
conn = psycopg2.connect(dsn)
conn.autocommit = autocommit
logger.debug("DB connection opened: %s", dsn.split("@")[-1]) # redact credentials
yield conn
if not autocommit:
conn.commit()
logger.debug("Transaction committed")
except Exception as e:
if conn and not autocommit:
conn.rollback()
logger.warning("Transaction rolled back: %s", e)
raise
finally:
if conn:
conn.close()
logger.debug("DB connection closed")
# Usage
with db_connection(DSN) as conn:
with conn.cursor() as cur:
cur.execute("INSERT INTO events VALUES (%s, %s)", (event_id, payload))
# commit + close happen automaticallyClass-based alternative (when you need __enter__ and __exit__):
class DatabaseConnection:
def __init__(self, dsn: str):
self.dsn = dsn
self.conn = None
def __enter__(self):
self.conn = psycopg2.connect(self.dsn)
return self.conn
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type:
self.conn.rollback()
else:
self.conn.commit()
self.conn.close()
return False # don't suppress exceptionsQ3: When and how do you use functools.lru_cache in a pipeline?
from functools import lru_cache
import psycopg2
# Perfect use case: slow lookup that's called many times per pipeline run
# but the lookup data doesn't change during a single run
@lru_cache(maxsize=1024)
def get_product_category(product_id: str) -> str:
"""
Cache product category lookups — called millions of times per pipeline run
but the catalogue changes rarely.
Without cache: 1M DB calls. With cache: ~10k unique products → 10k DB calls.
"""
with db_connection(DSN) as conn:
with conn.cursor() as cur:
cur.execute("SELECT category FROM products WHERE product_id = %s", (product_id,))
row = cur.fetchone()
return row[0] if row else "unknown"
# Clear the cache between pipeline runs (stale data risk)
def run_pipeline():
get_product_category.cache_clear()
for row in csv_row_generator("orders.csv"):
category = get_product_category(row["product_id"]) # hits cache after first call
...
# Cache info for monitoring
info = get_product_category.cache_info()
print(f"Cache hits: {info.hits}, misses: {info.misses}, hit rate: {info.hits/(info.hits+info.misses):.1%}")When NOT to use lru_cache: When the function has side effects, when arguments aren't hashable (use functools.cache for simpler cases), or when the cache should be shared across processes (use Redis instead).
Q4: Write a type-annotated pipeline function with proper type hints
from typing import Iterator, TypeVar
from collections.abc import Callable
import pandas as pd
from datetime import date
T = TypeVar("T")
def extract_orders(
start_date: date,
end_date: date,
batch_size: int = 50_000,
) -> Iterator[pd.DataFrame]:
"""Extract orders from source in batches. Yields DataFrames."""
offset = 0
while True:
df = pd.read_sql(
"""
SELECT order_id, customer_id, product_id, revenue, order_date
FROM orders
WHERE order_date BETWEEN %(start)s AND %(end)s
ORDER BY order_id
LIMIT %(limit)s OFFSET %(offset)s
""",
con=get_engine(),
params={"start": start_date, "end": end_date,
"limit": batch_size, "offset": offset},
)
if df.empty:
break
yield df
offset += batch_size
def transform_orders(df: pd.DataFrame) -> pd.DataFrame:
"""Apply business transformations. Pure function — no side effects."""
return (
df
.assign(
revenue = lambda x: x["revenue"].clip(lower=0),
order_date = lambda x: pd.to_datetime(x["order_date"]),
revenue_tier = lambda x: pd.cut(
x["revenue"],
bins=[0, 50, 200, 1000, float("inf")],
labels=["micro", "small", "medium", "large"],
),
)
.dropna(subset=["customer_id", "product_id"])
)
def load_orders(df: pd.DataFrame, target_table: str, engine) -> int:
"""Load to warehouse. Returns row count loaded."""
df.to_sql(target_table, con=engine, if_exists="append", index=False, method="multi")
return len(df)Q5: Implement a dataclass for pipeline configuration
from dataclasses import dataclass, field
from typing import Optional
import os
import yaml
@dataclass
class PipelineConfig:
"""
Strongly-typed pipeline configuration.
Loaded from YAML; secrets injected from environment variables.
"""
pipeline_name: str
source_dsn: str
target_dsn: str
batch_size: int = 50_000
max_retries: int = 3
backoff_base: float = 2.0
tags: list[str] = field(default_factory=list)
dry_run: bool = False
notify_slack: bool = True
slack_channel: Optional[str] = None
@classmethod
def from_yaml(cls, path: str) -> "PipelineConfig":
with open(path) as f:
raw = yaml.safe_load(f)
# Inject secrets from environment
raw["source_dsn"] = os.environ["SOURCE_DSN"]
raw["target_dsn"] = os.environ["TARGET_DSN"]
return cls(**raw)
def __post_init__(self):
if self.batch_size <= 0:
raise ValueError(f"batch_size must be positive, got {self.batch_size}")
if self.max_retries < 0:
raise ValueError(f"max_retries must be >= 0, got {self.max_retries}")
if self.notify_slack and not self.slack_channel:
raise ValueError("slack_channel required when notify_slack=True")
# config.yaml
# pipeline_name: orders_daily
# batch_size: 50000
# max_retries: 3
# notify_slack: true
# slack_channel: "#data-alerts"Q6-10: More Python Fundamentals
# Q6: Exception hierarchy for pipeline errors
class PipelineError(Exception):
"""Base exception for all pipeline errors."""
pass
class ExtractionError(PipelineError):
"""Raised when source data cannot be read."""
pass
class TransformationError(PipelineError):
"""Raised when transformation logic fails — usually a data quality issue."""
def __init__(self, column: str, message: str):
self.column = column
super().__init__(f"Transformation failed on column '{column}': {message}")
class LoadError(PipelineError):
"""Raised when writing to the target fails."""
pass
# Q7: Using __slots__ for memory efficiency in high-volume event processing
class Event:
__slots__ = ("event_id", "user_id", "event_type", "timestamp", "payload")
def __init__(self, event_id, user_id, event_type, timestamp, payload):
self.event_id = event_id
self.user_id = user_id
self.event_type = event_type
self.timestamp = timestamp
self.payload = payload
# With __slots__, each Event uses ~40% less memory than a dict-based equivalent
# Critical when you're holding 1M+ events in memory during batch processing
# Q8: Protocol for duck-typed pipeline stages
from typing import Protocol
class Extractor(Protocol):
def extract(self, run_date: date) -> Iterator[pd.DataFrame]: ...
class Transformer(Protocol):
def transform(self, df: pd.DataFrame) -> pd.DataFrame: ...
class Loader(Protocol):
def load(self, df: pd.DataFrame) -> int: ...
def run_etl(extractor: Extractor, transformer: Transformer, loader: Loader) -> int:
"""Runs any ETL combination without inheritance."""
total = 0
for batch in extractor.extract(date.today()):
transformed = transformer.transform(batch)
total += loader.load(transformed)
return total
# Q9: Using collections.defaultdict for grouping pipeline events
from collections import defaultdict
def group_events_by_user(events: list[dict]) -> dict[str, list[dict]]:
grouped = defaultdict(list)
for event in events:
grouped[event["user_id"]].append(event)
return dict(grouped)
# Q10: Thread-safe singleton for a shared DB connection pool
import threading
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
class ConnectionPool:
_instance = None
_lock = threading.Lock()
def __new__(cls, dsn: str):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._engine = create_engine(
dsn,
poolclass=QueuePool,
pool_size=10,
max_overflow=5,
pool_pre_ping=True,
)
return cls._instance
@property
def engine(self):
return self._enginePart 2: Pandas and Data Processing (10 Questions)
Q11: Efficient groupby — what's the difference between transform and agg?
import pandas as pd
import numpy as np
df = pd.DataFrame({
"customer_id": [1, 1, 2, 2, 3],
"revenue": [100, 200, 50, 150, 300],
"region": ["US", "US", "EU", "EU", "US"],
})
# agg: returns one row per group (reduces cardinality)
customer_totals = df.groupby("customer_id")["revenue"].agg(["sum", "mean", "count"])
# transform: returns a value for every row in the original DataFrame
# Use when you want to add a group-level stat as a new column
df["customer_total"] = df.groupby("customer_id")["revenue"].transform("sum")
df["region_avg_revenue"] = df.groupby("region")["revenue"].transform("mean")
df["pct_of_customer"] = df["revenue"] / df["customer_total"]
print(df)When transform is critical: Feature engineering for ML (add group statistics as features without losing the original row granularity).
Q12: Avoiding apply() bottlenecks
# Bad: apply() with a Python function — bypasses vectorisation
# 100x slower than vectorised equivalents for large DataFrames
df["category"] = df["revenue"].apply(
lambda x: "high" if x > 1000 else ("medium" if x > 100 else "low")
)
# Good: use pd.cut() for binning
df["category"] = pd.cut(
df["revenue"],
bins=[0, 100, 1000, float("inf")],
labels=["low", "medium", "high"],
)
# Good: use np.where / np.select for conditional logic
df["discount_pct"] = np.select(
condlist=[
df["revenue"] > 10_000,
df["revenue"] > 1_000,
df["revenue"] > 100,
],
choicelist=[0.20, 0.10, 0.05],
default=0.0,
)
# Good: use vectorised string operations (not apply + lambda)
df["email_domain"] = df["email"].str.split("@").str[-1]Rule: If you're calling .apply() on a large DataFrame, ask yourself if there's a vectorised alternative. 90% of the time, there is.
Q13: Memory-efficient chunked reads from a database
def load_large_table_chunked(
query: str,
engine,
chunksize: int = 100_000,
dtypes: dict | None = None,
) -> pd.DataFrame:
"""
Read a large table in chunks, process each chunk, return one concatenated DataFrame.
For very large data, don't concatenate — process and write each chunk separately.
"""
chunks = []
for chunk in pd.read_sql(query, con=engine, chunksize=chunksize, dtype=dtypes):
# Apply lightweight transformations per chunk
chunk = chunk.assign(
order_date = pd.to_datetime(chunk["order_date"]),
revenue = chunk["revenue"].astype("float32"), # float32 uses half the memory of float64
)
chunk = chunk.dropna(subset=["order_id"])
chunks.append(chunk)
return pd.concat(chunks, ignore_index=True)
# Memory tip: use categorical dtype for low-cardinality string columns
def optimise_dtypes(df: pd.DataFrame) -> pd.DataFrame:
"""Reduce DataFrame memory footprint by ~60% for typical pipeline data."""
for col in df.select_dtypes(include="object").columns:
cardinality = df[col].nunique()
if cardinality / len(df) < 0.5: # < 50% unique → use categorical
df[col] = df[col].astype("category")
for col in df.select_dtypes(include="float64").columns:
df[col] = pd.to_numeric(df[col], downcast="float")
for col in df.select_dtypes(include="int64").columns:
df[col] = pd.to_numeric(df[col], downcast="integer")
return dfQ14: Merge pitfalls and how to avoid them
# Common pitfall 1: unexpected row multiplication (many-to-many join)
orders = pd.DataFrame({"order_id": [1,2,3], "customer_id": [10,10,20]})
customers = pd.DataFrame({"customer_id": [10,10,20], "segment": ["A","B","C"]})
# Bad: this creates 4 rows from 3 — silent data multiplication
bad_result = orders.merge(customers, on="customer_id")
print(f"Expected 3 rows, got {len(bad_result)}") # Prints: 4
# Fix: deduplicate the lookup table before joining
customers_deduped = customers.drop_duplicates(subset=["customer_id"], keep="last")
good_result = orders.merge(customers_deduped, on="customer_id")
print(f"Expected 3 rows, got {len(good_result)}") # Prints: 3
# Common pitfall 2: silent type mismatch on join key
df1 = pd.DataFrame({"id": [1, 2, 3]}) # int64
df2 = pd.DataFrame({"id": ["1", "2", "3"]}) # object
silent_empty = df1.merge(df2, on="id")
print(f"Rows: {len(silent_empty)}") # Prints: 0 — silent failure!
# Fix: always assert dtypes before merging
assert df1["id"].dtype == df2["id"].dtype, \
f"Join key type mismatch: {df1['id'].dtype} vs {df2['id'].dtype}"Q15: Handling duplicates in pipeline data
def deduplicate_pipeline_data(
df: pd.DataFrame,
key_cols: list[str],
timestamp_col: str = "updated_at",
strategy: str = "latest", # "latest" | "first" | "count_check"
) -> pd.DataFrame:
"""
Production-grade deduplication with multiple strategies.
"""
n_before = len(df)
if strategy == "latest":
# Keep the most recently updated version of each key
df = (
df.sort_values(timestamp_col, ascending=False)
.drop_duplicates(subset=key_cols, keep="first")
)
elif strategy == "first":
df = df.sort_values(timestamp_col).drop_duplicates(subset=key_cols, keep="first")
elif strategy == "count_check":
dup_counts = df.groupby(key_cols).size()
duplicated_keys = dup_counts[dup_counts > 1]
if len(duplicated_keys) > 0:
raise ValueError(
f"Unexpected duplicates on {key_cols}: "
f"{len(duplicated_keys)} keys have multiple rows"
)
return df
n_after = len(df)
dup_rate = (n_before - n_after) / n_before if n_before > 0 else 0
if dup_rate > 0.05: # alert if >5% of rows were duplicates
import logging
logging.warning("High duplicate rate: %.1f%% (%d rows removed)",
dup_rate * 100, n_before - n_after)
return dfQ16-20: More Pandas Patterns
# Q16: Efficient pivot without pivot_table overhead
# pivot_table is flexible but slow on large DataFrames
# Prefer groupby + unstack for large data
daily_revenue = (
df.groupby(["product_id", pd.Grouper(key="order_date", freq="M")])["revenue"]
.sum()
.unstack(level="order_date")
.fillna(0)
)
# Q17: Rolling join / asof merge (e.g., apply exchange rates to transactions)
rates = pd.DataFrame({
"date": pd.to_datetime(["2025-01-01", "2025-02-01", "2025-03-01"]),
"eur_usd": [1.08, 1.09, 1.07],
}).sort_values("date")
transactions = pd.DataFrame({
"tx_date": pd.to_datetime(["2025-01-15", "2025-02-10", "2025-03-20"]),
"amount": [1000, 2000, 1500],
}).sort_values("tx_date")
# pd.merge_asof: join each transaction to the most recent exchange rate
merged = pd.merge_asof(transactions, rates,
left_on="tx_date", right_on="date",
direction="backward")
# Q18: Efficient string operations — use str accessor, not apply
df["domain"] = df["email"].str.extract(r"@(.+)$")[0]
df["is_free_email"] = df["domain"].isin(["gmail.com", "yahoo.com", "hotmail.com"])
# Q19: Memory-mapped files for reading large Parquet without loading fully
import pyarrow.parquet as pq
table = pq.read_table(
"large_file.parquet",
columns=["order_id", "revenue"], # column pruning
filters=[("order_date", ">=", "2025-01-01")], # row group pruning
)
df = table.to_pandas()
# Q20: Using eval() and query() for performance on large DataFrames
# eval() uses numexpr under the hood — 2-5x faster for large arrays
df["profit"] = df.eval("revenue - (unit_cost * quantity)")
high_value = df.query("revenue > 1000 and region == 'US'")Part 3: Pipeline Patterns (10 Questions)
Q21: Retry with exponential backoff
import time
import functools
import logging
from typing import TypeVar, Callable
F = TypeVar("F", bound=Callable)
logger = logging.getLogger(__name__)
def retry_with_backoff(
max_attempts: int = 3,
backoff_base: float = 2.0,
exceptions: tuple = (Exception,),
reraise: bool = True,
):
"""
Decorator: retry a function with exponential backoff.
backoff_base=2.0 → waits 2s, 4s, 8s between attempts.
"""
def decorator(func: F) -> F:
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == max_attempts:
logger.error(
"%s failed after %d attempts: %s",
func.__name__, max_attempts, e,
)
if reraise:
raise
return None
wait = backoff_base ** (attempt - 1)
logger.warning(
"%s attempt %d/%d failed (%s). Retrying in %.1fs.",
func.__name__, attempt, max_attempts, e, wait,
)
time.sleep(wait)
return None
return wrapper # type: ignore
return decorator
# Usage
@retry_with_backoff(max_attempts=4, backoff_base=2.0, exceptions=(ConnectionError, TimeoutError))
def fetch_from_api(endpoint: str) -> dict:
import httpx
response = httpx.get(endpoint, timeout=10)
response.raise_for_status()
return response.json()Q22: Idempotency patterns for pipeline stages
from hashlib import md5
import json
def make_idempotency_key(pipeline: str, run_date: str, table: str) -> str:
"""Generate a deterministic key for a pipeline run."""
payload = json.dumps({"pipeline": pipeline, "run_date": run_date, "table": table})
return md5(payload.encode()).hexdigest()
def is_run_already_completed(conn, idempotency_key: str) -> bool:
"""Check if this exact pipeline run already succeeded."""
with conn.cursor() as cur:
cur.execute(
"SELECT 1 FROM pipeline_runs WHERE idempotency_key = %s AND status = 'success'",
(idempotency_key,),
)
return cur.fetchone() is not None
def record_run_start(conn, idempotency_key: str, metadata: dict) -> None:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO pipeline_runs (idempotency_key, status, metadata, started_at)
VALUES (%s, 'running', %s::jsonb, NOW())
ON CONFLICT (idempotency_key) DO NOTHING
""",
(idempotency_key, json.dumps(metadata)),
)
def record_run_success(conn, idempotency_key: str, rows_loaded: int) -> None:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE pipeline_runs
SET status = 'success', rows_loaded = %s, completed_at = NOW()
WHERE idempotency_key = %s
""",
(rows_loaded, idempotency_key),
)
# Idempotent pipeline runner
def run_idempotent_pipeline(pipeline: str, run_date: str, table: str):
key = make_idempotency_key(pipeline, run_date, table)
with db_connection(DSN) as conn:
if is_run_already_completed(conn, key):
logger.info("Pipeline already completed for key %s — skipping", key)
return
record_run_start(conn, key, {"run_date": run_date, "table": table})
# ... do the actual work ...
rows = 0
with db_connection(DSN) as conn:
record_run_success(conn, key, rows_loaded=rows)Q23: Config management with Pydantic and environment variables
from pydantic import BaseSettings, SecretStr, validator
from typing import Optional
class PipelineSettings(BaseSettings):
"""
Pydantic BaseSettings reads from environment variables automatically.
Secret fields are masked in logs.
"""
app_env: str = "development"
source_host: str
source_port: int = 5432
source_db: str
source_password: SecretStr
warehouse_url: SecretStr
batch_size: int = 50_000
slack_webhook: Optional[SecretStr] = None
log_level: str = "INFO"
@validator("batch_size")
def validate_batch_size(cls, v):
if not 1000 <= v <= 500_000:
raise ValueError(f"batch_size must be between 1000 and 500000, got {v}")
return v
@validator("log_level")
def validate_log_level(cls, v):
if v not in {"DEBUG", "INFO", "WARNING", "ERROR"}:
raise ValueError(f"Invalid log_level: {v}")
return v
@property
def source_dsn(self) -> str:
return (
f"postgresql://{self.source_host}:{self.source_port}/{self.source_db}"
f"?password={self.source_password.get_secret_value()}"
)
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
# Reads from environment or .env file automatically
settings = PipelineSettings()Q24: Dependency injection in pipelines
from abc import ABC, abstractmethod
import pandas as pd
class BaseExtractor(ABC):
@abstractmethod
def extract(self, run_date: date) -> pd.DataFrame: ...
class BaseLoader(ABC):
@abstractmethod
def load(self, df: pd.DataFrame, table: str) -> int: ...
class PostgresExtractor(BaseExtractor):
def __init__(self, dsn: str):
self._dsn = dsn
def extract(self, run_date: date) -> pd.DataFrame:
return pd.read_sql(
"SELECT * FROM orders WHERE order_date = %(d)s",
con=create_engine(self._dsn),
params={"d": run_date},
)
class SnowflakeLoader(BaseLoader):
def __init__(self, conn_params: dict):
self._conn = conn_params
def load(self, df: pd.DataFrame, table: str) -> int:
df.to_sql(table, con=create_engine(**self._conn), if_exists="append", index=False)
return len(df)
class OrdersPipeline:
"""
Pipeline with injected dependencies — easy to test with mocks.
"""
def __init__(self, extractor: BaseExtractor, loader: BaseLoader):
self._extractor = extractor
self._loader = loader
def run(self, run_date: date) -> int:
df = self._extractor.extract(run_date)
df = self._transform(df)
return self._loader.load(df, "orders_warehouse")
def _transform(self, df: pd.DataFrame) -> pd.DataFrame:
return df.assign(processed_at=pd.Timestamp.utcnow())
# Production instantiation
pipeline = OrdersPipeline(
extractor=PostgresExtractor(dsn=settings.source_dsn),
loader=SnowflakeLoader(conn_params={"url": settings.warehouse_url.get_secret_value()}),
)Q25-30: Testing Pipeline Code
# Q25: Unit testing a transformation function
import pytest
import pandas as pd
from pandas.testing import assert_frame_equal
def test_transform_clips_negative_revenue():
input_df = pd.DataFrame({
"order_id": [1, 2, 3],
"revenue": [100.0, -50.0, 0.0],
})
result = transform_orders(input_df)
assert (result["revenue"] >= 0).all(), "Negative revenue should be clipped to 0"
def test_transform_drops_null_customer():
input_df = pd.DataFrame({
"order_id": [1, 2],
"customer_id": [10, None],
"revenue": [100.0, 200.0],
})
result = transform_orders(input_df)
assert len(result) == 1
assert result.iloc[0]["order_id"] == 1
# Q26: Integration test with a real (in-memory) database
import duckdb
def test_incremental_load_is_idempotent():
conn = duckdb.connect()
conn.execute("""
CREATE TABLE orders_target (
order_id INT PRIMARY KEY,
revenue DOUBLE,
updated_at TIMESTAMP
)
""")
staging_df = pd.DataFrame({
"order_id": [1, 2, 3],
"revenue": [100.0, 200.0, 300.0],
"updated_at": pd.to_datetime(["2026-05-07"]*3),
})
conn.register("staging", staging_df)
conn.execute("""
INSERT OR REPLACE INTO orders_target
SELECT * FROM staging
""")
first_count = conn.execute("SELECT COUNT(*) FROM orders_target").fetchone()[0]
# Re-run the same load (idempotency test)
conn.execute("INSERT OR REPLACE INTO orders_target SELECT * FROM staging")
second_count = conn.execute("SELECT COUNT(*) FROM orders_target").fetchone()[0]
assert first_count == second_count == 3, "Idempotent load should not create duplicates"
# Q27: Mocking external dependencies in pipeline tests
from unittest.mock import MagicMock, patch
def test_pipeline_retries_on_connection_error():
mock_extractor = MagicMock(spec=BaseExtractor)
mock_extractor.extract.side_effect = [
ConnectionError("DB unavailable"),
pd.DataFrame({"order_id": [1], "revenue": [100.0]}),
]
mock_loader = MagicMock(spec=BaseLoader)
mock_loader.load.return_value = 1
pipeline = OrdersPipeline(extractor=mock_extractor, loader=mock_loader)
# With retry decorator, the second attempt should succeed
with patch("time.sleep"): # don't actually sleep in tests
result = pipeline.run(date.today())
assert result == 1
# Q28: Property-based testing with Hypothesis
from hypothesis import given, strategies as st
from hypothesis.extra.pandas import column, data_frames
@given(data_frames([
column("order_id", elements=st.integers(min_value=1)),
column("revenue", elements=st.floats(min_value=-10000, max_value=100000)),
column("customer_id", elements=st.one_of(st.integers(min_value=1), st.none())),
]))
def test_transform_never_raises(df):
"""Transform should handle any valid-typed input without raising."""
try:
result = transform_orders(df)
assert (result["revenue"] >= 0).all()
except Exception as e:
pytest.fail(f"transform_orders raised unexpectedly: {e}")
# Q29: Testing row count expectations
def test_deduplication_reduces_rows():
duped_df = pd.DataFrame({
"order_id": [1, 1, 2, 2, 3],
"revenue": [100, 100, 200, 250, 300],
"updated_at": pd.to_datetime(["2026-05-06","2026-05-07","2026-05-06",
"2026-05-07","2026-05-07"]),
})
result = deduplicate_pipeline_data(duped_df, ["order_id"], "updated_at")
assert len(result) == 3
# Verify we kept the latest version of order_id=2 (revenue=250)
assert result.set_index("order_id").loc[2, "revenue"] == 250
# Q30: End-to-end pipeline test with fixtures
@pytest.fixture
def sample_orders_df():
return pd.DataFrame({
"order_id": range(1, 101),
"customer_id": [i % 20 + 1 for i in range(100)],
"revenue": [float(i * 10) for i in range(100)],
"updated_at": pd.date_range("2026-05-01", periods=100, freq="1h"),
})
def test_full_pipeline_produces_expected_row_count(sample_orders_df, tmp_path):
output_path = tmp_path / "output.parquet"
# Write input
input_path = tmp_path / "input.parquet"
sample_orders_df.to_parquet(input_path, index=False)
# Run pipeline
df = pd.read_parquet(input_path)
df = transform_orders(df)
df = deduplicate_pipeline_data(df, ["order_id"], "updated_at")
df.to_parquet(output_path, index=False)
result = pd.read_parquet(output_path)
assert len(result) == 100
assert (result["revenue"] >= 0).all()Interview Tips: What Sets Strong Candidates Apart
On generators: Explain memory implications and when to use yield from for delegating to sub-generators.
On context managers: Know both the @contextmanager decorator form and the class-based __enter__/__exit__ form. Know when the class form is better (when you need state across enter/exit).
On pandas: Show you know the difference between loc, iloc, and [] and when each is appropriate. Know that chained indexing (df["a"]["b"] = x) creates a copy and doesn't modify the original.
On testing: Show that you write tests that verify business logic, not just that code runs. Property-based tests (Hypothesis) impress senior interviewers.
On pipeline patterns: Idempotency is the most important concept — be able to explain it, implement it in SQL and Python, and explain what happens when it's violated.
Enjoyed this article?
Explore the Data Engineering learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.