pytest: Testing Python Pipelines · Lesson 4 of 4
Pipeline Testing: DataFrames, CLIs, APIs & Testcontainers
pytest for Complete Data Pipeline Testing
Unit tests for individual functions are essential, but data engineering also requires testing at higher levels: full pipeline runs, CLI entry points, REST APIs that expose pipeline triggers, and integration with real databases. This lesson covers the complete testing stack — from pandas assertion utilities to GitHub Actions CI configuration.
Testing Pandas Transformations with assert_frame_equal
pd.testing.assert_frame_equal is the workhorse of pandas pipeline testing. It provides column-level diffs and handles floating-point comparison, dtype checking, and index alignment.
import pandas as pd
import numpy as np
from pandas.testing import assert_frame_equal, assert_series_equal
import pytest
# --- Source code under test ---
# src/transformers.py
def compute_order_metrics(orders_df: pd.DataFrame, customers_df: pd.DataFrame) -> pd.DataFrame:
"""
Joins orders to customers and computes per-customer revenue metrics.
Returns one row per customer with: total_revenue, order_count, avg_order_value,
first_order_date, last_order_date, revenue_rank.
"""
joined = orders_df.merge(customers_df[["customer_id", "region", "tier"]], on="customer_id", how="left")
metrics = (
joined
.groupby("customer_id", as_index=False)
.agg(
total_revenue=("revenue", "sum"),
order_count=("order_id", "nunique"),
avg_order_value=("revenue", "mean"),
first_order_date=("order_date", "min"),
last_order_date=("order_date", "max"),
)
)
# Add region and tier from customers
metrics = metrics.merge(
customers_df[["customer_id", "region", "tier"]],
on="customer_id",
how="left",
)
# Revenue rank within region (1 = highest revenue)
metrics["revenue_rank"] = (
metrics.groupby("region")["total_revenue"]
.rank(method="dense", ascending=False)
.astype(int)
)
return metrics.sort_values("customer_id").reset_index(drop=True)assert_frame_equal Options
# tests/unit/test_compute_order_metrics.py
import pytest
import pandas as pd
from pandas.testing import assert_frame_equal
@pytest.fixture
def orders():
return pd.DataFrame({
"order_id": ["O1", "O2", "O3", "O4", "O5"],
"customer_id": ["C001", "C001", "C002", "C003", "C002"],
"revenue": [100.0, 150.0, 200.0, 300.0, 50.0],
"order_date": pd.to_datetime(["2026-01-01", "2026-01-15", "2026-01-05",
"2026-01-10", "2026-01-20"]),
})
@pytest.fixture
def customers():
return pd.DataFrame({
"customer_id": ["C001", "C002", "C003"],
"region": ["NORTH", "NORTH", "SOUTH"],
"tier": ["gold", "silver", "gold"],
})
def test_total_revenue_per_customer(orders, customers):
from src.transformers import compute_order_metrics
result = compute_order_metrics(orders, customers)
expected = pd.DataFrame({
"customer_id": ["C001", "C002", "C003"],
"total_revenue": [250.0, 250.0, 300.0],
})[["customer_id", "total_revenue"]]
assert_frame_equal(
result[["customer_id", "total_revenue"]].reset_index(drop=True),
expected.reset_index(drop=True),
check_dtype=False, # Allow int64 vs float64 differences
rtol=1e-5, # Relative tolerance for floats
)
def test_order_count_per_customer(orders, customers):
from src.transformers import compute_order_metrics
result = compute_order_metrics(orders, customers)
c001 = result.loc[result["customer_id"] == "C001", "order_count"].iloc[0]
c002 = result.loc[result["customer_id"] == "C002", "order_count"].iloc[0]
c003 = result.loc[result["customer_id"] == "C003", "order_count"].iloc[0]
assert c001 == 2
assert c002 == 2
assert c003 == 1
def test_first_and_last_order_dates(orders, customers):
from src.transformers import compute_order_metrics
result = compute_order_metrics(orders, customers)
c001_row = result[result["customer_id"] == "C001"].iloc[0]
assert c001_row["first_order_date"] == pd.Timestamp("2026-01-01")
assert c001_row["last_order_date"] == pd.Timestamp("2026-01-15")
def test_revenue_rank_within_region(orders, customers):
from src.transformers import compute_order_metrics
result = compute_order_metrics(orders, customers)
# NORTH: C001=250, C002=250 → both rank 1 (tie)
# SOUTH: C003=300 → rank 1
north_ranks = result[result["region"] == "NORTH"].set_index("customer_id")["revenue_rank"]
south_ranks = result[result["region"] == "SOUTH"].set_index("customer_id")["revenue_rank"]
assert north_ranks["C001"] == 1
assert north_ranks["C002"] == 1 # Tie
assert south_ranks["C003"] == 1
def test_output_columns_match_schema(orders, customers):
from src.transformers import compute_order_metrics
result = compute_order_metrics(orders, customers)
expected_columns = {
"customer_id", "total_revenue", "order_count", "avg_order_value",
"first_order_date", "last_order_date", "region", "tier", "revenue_rank"
}
assert set(result.columns) == expected_columns
def test_output_is_one_row_per_customer(orders, customers):
from src.transformers import compute_order_metrics
result = compute_order_metrics(orders, customers)
assert result["customer_id"].nunique() == len(result)
assert not result["customer_id"].duplicated().any()
def test_left_join_preserves_unmatched_customers(orders):
"""Customers without matching orders should appear with NaN metrics."""
customers_with_inactive = pd.DataFrame({
"customer_id": ["C001", "C002", "C003", "C999"], # C999 has no orders
"region": ["NORTH", "NORTH", "SOUTH", "EAST"],
"tier": ["gold", "silver", "gold", "bronze"],
})
from src.transformers import compute_order_metrics
# This tests behavior — adjust if business logic requires excluding inactive customers
result = compute_order_metrics(orders, customers_with_inactive)
assert "C999" not in result["customer_id"].values # Left join from orders side
def test_assert_frame_equal_float_tolerance():
"""Demonstrate float tolerance options in assert_frame_equal."""
actual = pd.DataFrame({"revenue": [99.99999, 200.000001]})
expected = pd.DataFrame({"revenue": [100.0, 200.0]})
# This fails without tolerance
with pytest.raises(AssertionError):
assert_frame_equal(actual, expected)
# This passes with relative tolerance
assert_frame_equal(actual, expected, rtol=1e-3)
# This passes with absolute tolerance
assert_frame_equal(actual, expected, atol=0.01)Testing Error Cases with pytest.raises
# src/validators.py
import pandas as pd
from typing import List
class PipelineValidationError(Exception):
"""Raised when pipeline input fails validation."""
def __init__(self, message: str, failed_checks: List[str]):
super().__init__(message)
self.failed_checks = failed_checks
def validate_and_raise(df: pd.DataFrame, required_columns: List[str]) -> pd.DataFrame:
"""Validate a DataFrame, raising PipelineValidationError if invalid."""
failed = []
missing_cols = [c for c in required_columns if c not in df.columns]
if missing_cols:
failed.append(f"Missing columns: {missing_cols}")
if df.empty:
failed.append("DataFrame is empty")
if "order_id" in df.columns and df["order_id"].duplicated().any():
failed.append("Duplicate order_id values detected")
if failed:
raise PipelineValidationError(
f"Validation failed with {len(failed)} check(s)",
failed_checks=failed,
)
return df# tests/unit/test_validators.py
def test_raises_pipeline_validation_error_for_missing_columns():
df = pd.DataFrame({"order_id": ["O1"], "revenue": [100.0]})
with pytest.raises(PipelineValidationError) as exc_info:
validate_and_raise(df, required_columns=["order_id", "revenue", "customer_id"])
# Inspect the exception
error = exc_info.value
assert "customer_id" in str(error)
assert any("customer_id" in check for check in error.failed_checks)
def test_raises_for_empty_dataframe():
df = pd.DataFrame(columns=["order_id", "revenue"])
with pytest.raises(PipelineValidationError, match="empty"):
validate_and_raise(df, required_columns=["order_id", "revenue"])
def test_raises_for_duplicate_order_ids():
df = pd.DataFrame({
"order_id": ["O1", "O1", "O2"],
"revenue": [100.0, 150.0, 200.0],
})
with pytest.raises(PipelineValidationError) as exc_info:
validate_and_raise(df, required_columns=["order_id", "revenue"])
assert any("Duplicate" in check for check in exc_info.value.failed_checks)
def test_accumulates_all_failures_before_raising():
"""Should report all failures, not short-circuit at the first one."""
df = pd.DataFrame({"x": []}) # Empty AND missing required columns
with pytest.raises(PipelineValidationError) as exc_info:
validate_and_raise(df, required_columns=["order_id", "revenue"])
assert len(exc_info.value.failed_checks) >= 2
def test_returns_dataframe_when_valid():
df = pd.DataFrame({
"order_id": ["O1", "O2"],
"revenue": [100.0, 200.0],
})
result = validate_and_raise(df, required_columns=["order_id", "revenue"])
assert_frame_equal(result, df)
def test_raises_correct_exception_type_not_base_exception():
"""Ensure we raise the specific exception, not a generic one."""
df = pd.DataFrame()
# Verify it's specifically PipelineValidationError, not just Exception
with pytest.raises(PipelineValidationError):
validate_and_raise(df, required_columns=["order_id"])
# This would also pass for any exception — be specific:
with pytest.raises(PipelineValidationError) as exc_info:
validate_and_raise(df, required_columns=["order_id"])
assert type(exc_info.value) is PipelineValidationErrorTesting Sample Data Fixtures vs Production Data Patterns
A common dilemma: test with tiny fixtures (fast, easy to reason about) or with production-scale data (finds real bugs)?
The answer is both, in the right places:
# tests/conftest.py
@pytest.fixture(scope="session")
def minimal_orders_df():
"""3–5 rows. Use for logic tests where scale doesn't matter."""
return pd.DataFrame({
"order_id": ["O1", "O2", "O3"],
"customer_id": ["C1", "C2", "C1"],
"revenue": [100.0, 200.0, 150.0],
"order_date": pd.to_datetime(["2026-01-01", "2026-01-02", "2026-01-03"]),
})
@pytest.fixture(scope="session")
def production_scale_orders_df():
"""
Statistically representative sample: 100k rows, realistic distributions.
session-scoped — generated once per test run.
Use for performance, memory, and distribution tests only.
"""
import numpy as np
rng = np.random.default_rng(42)
n = 100_000
revenues = np.concatenate([
rng.lognormal(mean=4, sigma=1.5, size=int(n * 0.9)), # Typical orders
rng.uniform(5000, 50000, size=int(n * 0.1)), # High-value outliers
])
return pd.DataFrame({
"order_id": [f"ORD-{i:08d}" for i in range(n)],
"customer_id": [f"C{rng.integers(1, 5000):05d}" for _ in range(n)],
"revenue": revenues.round(2),
"order_date": pd.to_datetime(
rng.integers(
pd.Timestamp("2023-01-01").value,
pd.Timestamp("2026-01-01").value,
size=n,
)
),
"status": rng.choice(["shipped", "pending", "cancelled"], size=n, p=[0.65, 0.25, 0.10]),
})
@pytest.mark.slow
def test_aggregation_handles_production_scale(production_scale_orders_df):
from src.aggregators import compute_customer_revenue
result = compute_customer_revenue(production_scale_orders_df)
# Basic integrity: one row per customer
assert result["customer_id"].nunique() == len(result)
# Revenue should be non-negative (no aggregation bugs)
assert (result["total_revenue"] >= 0).all()
def test_aggregation_logic_with_minimal_data(minimal_orders_df):
from src.aggregators import compute_customer_revenue
result = compute_customer_revenue(minimal_orders_df)
c1 = result.loc[result["customer_id"] == "C1", "total_revenue"].iloc[0]
assert c1 == pytest.approx(250.0) # 100 + 150Testing Incremental Pipeline Logic
Incremental pipelines only process new data since the last run. Testing requires controlling the watermark:
# src/incremental.py
import pandas as pd
from datetime import date, datetime
from typing import Optional
class IncrementalProcessor:
def __init__(self, watermark_store):
self.watermark_store = watermark_store
def get_watermark(self, pipeline_name: str) -> Optional[date]:
return self.watermark_store.get(pipeline_name)
def set_watermark(self, pipeline_name: str, new_watermark: date) -> None:
self.watermark_store.set(pipeline_name, new_watermark)
def process(self, df: pd.DataFrame, pipeline_name: str, date_col: str) -> pd.DataFrame:
watermark = self.get_watermark(pipeline_name)
if watermark is not None:
new_records = df[df[date_col] > pd.Timestamp(watermark)]
else:
new_records = df.copy()
if new_records.empty:
return new_records
max_date = new_records[date_col].max().date()
self.set_watermark(pipeline_name, max_date)
return new_records# tests/unit/test_incremental.py
from unittest.mock import MagicMock
import pandas as pd
from datetime import date
import pytest
@pytest.fixture
def sample_df():
return pd.DataFrame({
"order_id": ["O1", "O2", "O3", "O4"],
"order_date": pd.to_datetime(["2026-04-01", "2026-04-15", "2026-05-01", "2026-05-07"]),
"revenue": [100, 200, 300, 400],
})
@pytest.fixture
def watermark_store():
store = MagicMock()
store.get.return_value = None # Default: no watermark (first run)
return store
def test_first_run_processes_all_records(sample_df, watermark_store):
from src.incremental import IncrementalProcessor
processor = IncrementalProcessor(watermark_store)
result = processor.process(sample_df, "orders_pipeline", "order_date")
assert len(result) == 4
def test_incremental_run_only_processes_new_records(sample_df, watermark_store):
watermark_store.get.return_value = date(2026, 4, 30)
from src.incremental import IncrementalProcessor
processor = IncrementalProcessor(watermark_store)
result = processor.process(sample_df, "orders_pipeline", "order_date")
assert len(result) == 2
assert set(result["order_id"]) == {"O3", "O4"}
def test_watermark_is_updated_after_processing(sample_df, watermark_store):
from src.incremental import IncrementalProcessor
processor = IncrementalProcessor(watermark_store)
processor.process(sample_df, "orders_pipeline", "order_date")
watermark_store.set.assert_called_once_with("orders_pipeline", date(2026, 5, 7))
def test_no_new_records_returns_empty_df(sample_df, watermark_store):
watermark_store.get.return_value = date(2026, 12, 31) # Future watermark
from src.incremental import IncrementalProcessor
processor = IncrementalProcessor(watermark_store)
result = processor.process(sample_df, "orders_pipeline", "order_date")
assert result.empty
watermark_store.set.assert_not_called() # Watermark not updated when no new dataTesting CLI Scripts with CliRunner
Data engineers build CLI tools for pipeline orchestration. Test them without spawning subprocesses using Typer's or Click's CliRunner.
# src/cli.py
import typer
import pandas as pd
from pathlib import Path
app = typer.Typer()
@app.command()
def process(
input_file: Path = typer.Argument(..., help="Input CSV file path"),
output_file: Path = typer.Argument(..., help="Output Parquet file path"),
drop_nulls: bool = typer.Option(True, help="Drop rows with null customer_id"),
verbose: bool = typer.Option(False, "--verbose", "-v"),
):
"""Process a sales CSV and write to Parquet."""
if not input_file.exists():
typer.echo(f"Error: Input file not found: {input_file}", err=True)
raise typer.Exit(code=1)
df = pd.read_csv(input_file)
initial_count = len(df)
if drop_nulls and "customer_id" in df.columns:
df = df.dropna(subset=["customer_id"])
df.to_parquet(output_file, index=False)
if verbose:
typer.echo(f"Processed {initial_count} rows → {len(df)} rows written to {output_file}")
else:
typer.echo(f"Done: {len(df)} rows written.")# tests/unit/test_cli.py
import pytest
import pandas as pd
from typer.testing import CliRunner
from src.cli import app
runner = CliRunner()
@pytest.fixture
def input_csv(tmp_path):
"""Write a sample CSV and return its path."""
df = pd.DataFrame({
"order_id": ["O1", "O2", "O3"],
"customer_id": ["C1", None, "C3"],
"revenue": [100, 200, 300],
})
path = tmp_path / "input.csv"
df.to_csv(path, index=False)
return path
def test_cli_processes_file_successfully(input_csv, tmp_path):
output = tmp_path / "output.parquet"
result = runner.invoke(app, [str(input_csv), str(output)])
assert result.exit_code == 0
assert output.exists()
df = pd.read_parquet(output)
assert len(df) == 2 # One null row dropped
def test_cli_verbose_flag_shows_row_counts(input_csv, tmp_path):
output = tmp_path / "output.parquet"
result = runner.invoke(app, [str(input_csv), str(output), "--verbose"])
assert result.exit_code == 0
assert "3 rows" in result.output # Initial count
assert "2 rows" in result.output # After dropping null
def test_cli_exits_with_error_for_missing_input(tmp_path):
missing_file = tmp_path / "does_not_exist.csv"
output = tmp_path / "output.parquet"
result = runner.invoke(app, [str(missing_file), str(output)])
assert result.exit_code == 1
assert "not found" in result.output.lower() or "Error" in result.output
def test_cli_no_drop_nulls_flag_preserves_all_rows(input_csv, tmp_path):
output = tmp_path / "output.parquet"
result = runner.invoke(app, [str(input_csv), str(output), "--no-drop-nulls"])
assert result.exit_code == 0
df = pd.read_parquet(output)
assert len(df) == 3 # All rows preserved including null customer_id
# Click-based CLI testing (same pattern, different runner import)
# from click.testing import CliRunner as ClickRunnerTesting FastAPI Endpoints with TestClient
Pipelines increasingly expose HTTP APIs for triggering runs, checking status, or serving transformation results.
# src/api.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import pandas as pd
from datetime import date
app = FastAPI(title="Pipeline API")
# In-memory job store (use Redis/DB in production)
_jobs: dict = {}
class PipelineRunRequest(BaseModel):
pipeline_name: str
start_date: date
end_date: date
dry_run: bool = False
class PipelineRunResponse(BaseModel):
job_id: str
status: str
message: Optional[str] = None
@app.get("/health")
def health_check():
return {"status": "healthy", "version": "1.0.0"}
@app.post("/pipeline/run", response_model=PipelineRunResponse)
def trigger_pipeline_run(
request: PipelineRunRequest,
background_tasks: BackgroundTasks,
):
if request.end_date < request.start_date:
raise HTTPException(
status_code=422,
detail="end_date must be >= start_date"
)
job_id = f"job-{request.pipeline_name}-{request.start_date}-{request.end_date}"
if job_id in _jobs:
return PipelineRunResponse(
job_id=job_id,
status="already_running",
message="A job with these parameters is already active",
)
_jobs[job_id] = "queued"
return PipelineRunResponse(job_id=job_id, status="queued")
@app.get("/pipeline/status/{job_id}", response_model=PipelineRunResponse)
def get_job_status(job_id: str):
if job_id not in _jobs:
raise HTTPException(status_code=404, detail=f"Job not found: {job_id}")
return PipelineRunResponse(job_id=job_id, status=_jobs[job_id])# tests/unit/test_api.py
import pytest
from fastapi.testclient import TestClient
from src.api import app, _jobs
@pytest.fixture(autouse=True)
def clear_job_store():
"""Clear the in-memory job store before each test."""
_jobs.clear()
yield
_jobs.clear()
@pytest.fixture
def client():
return TestClient(app)
def test_health_check_returns_200(client):
response = client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"
def test_trigger_pipeline_run_returns_job_id(client):
response = client.post("/pipeline/run", json={
"pipeline_name": "orders_daily",
"start_date": "2026-05-01",
"end_date": "2026-05-07",
})
assert response.status_code == 200
data = response.json()
assert "job_id" in data
assert data["status"] == "queued"
assert "orders_daily" in data["job_id"]
def test_trigger_pipeline_rejects_invalid_date_range(client):
response = client.post("/pipeline/run", json={
"pipeline_name": "orders_daily",
"start_date": "2026-05-07",
"end_date": "2026-05-01", # end < start
})
assert response.status_code == 422
assert "end_date" in response.json()["detail"].lower()
def test_trigger_pipeline_dry_run_flag(client):
response = client.post("/pipeline/run", json={
"pipeline_name": "orders_daily",
"start_date": "2026-05-01",
"end_date": "2026-05-07",
"dry_run": True,
})
assert response.status_code == 200
assert response.json()["status"] == "queued"
def test_duplicate_job_returns_already_running(client):
payload = {
"pipeline_name": "orders_daily",
"start_date": "2026-05-01",
"end_date": "2026-05-07",
}
# First trigger
first = client.post("/pipeline/run", json=payload)
assert first.status_code == 200
# Second trigger with same params
second = client.post("/pipeline/run", json=payload)
assert second.status_code == 200
assert second.json()["status"] == "already_running"
def test_get_job_status_for_existing_job(client):
# Create a job first
create = client.post("/pipeline/run", json={
"pipeline_name": "customers_daily",
"start_date": "2026-05-01",
"end_date": "2026-05-01",
})
job_id = create.json()["job_id"]
# Check status
status = client.get(f"/pipeline/status/{job_id}")
assert status.status_code == 200
assert status.json()["status"] == "queued"
def test_get_job_status_404_for_unknown_job(client):
response = client.get("/pipeline/status/nonexistent-job-id")
assert response.status_code == 404
assert "not found" in response.json()["detail"].lower()
def test_pipeline_run_missing_required_field(client):
response = client.post("/pipeline/run", json={
"pipeline_name": "orders_daily",
# Missing start_date and end_date
})
assert response.status_code == 422 # FastAPI validation errortestcontainers-python: Real Database Tests
testcontainers-python starts real Docker containers for your tests. Use it for integration tests that need to verify SQL logic, index behavior, or stored procedures.
pip install testcontainers[postgres]
# Requires Docker Desktop running# tests/integration/test_postgres_pipeline.py
import pytest
import pandas as pd
import psycopg2
from testcontainers.postgres import PostgresContainer
from sqlalchemy import create_engine
@pytest.fixture(scope="module")
def postgres_container():
"""
Start a PostgreSQL container once for the entire module.
Destroyed after all tests in the module complete.
"""
with PostgresContainer("postgres:16-alpine") as postgres:
yield postgres
@pytest.fixture(scope="module")
def db_engine(postgres_container):
"""SQLAlchemy engine connected to the test PostgreSQL container."""
engine = create_engine(postgres_container.get_connection_url())
return engine
@pytest.fixture(scope="module")
def initialized_db(db_engine):
"""Create schema and seed reference data once per module."""
with db_engine.begin() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS orders (
order_id VARCHAR(50) PRIMARY KEY,
customer_id VARCHAR(50) NOT NULL,
revenue NUMERIC(12, 2) NOT NULL,
order_date DATE NOT NULL,
status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT NOW()
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS customers (
customer_id VARCHAR(50) PRIMARY KEY,
name VARCHAR(200),
region VARCHAR(50),
tier VARCHAR(20)
)
""")
conn.execute("""
INSERT INTO customers VALUES
('C001', 'Alice Martin', 'NORTH', 'gold'),
('C002', 'Bob Chen', 'SOUTH', 'silver'),
('C003', 'Carol White', 'NORTH', 'gold')
ON CONFLICT DO NOTHING
""")
return db_engine
@pytest.fixture
def clean_orders(initialized_db):
"""Ensure orders table is empty before each test."""
with initialized_db.begin() as conn:
conn.execute("DELETE FROM orders")
yield initialized_db
with initialized_db.begin() as conn:
conn.execute("DELETE FROM orders")
@pytest.mark.integration
def test_insert_and_query_orders(clean_orders):
from src.db_repository import OrderRepository
repo = OrderRepository(engine=clean_orders)
orders = [
{"order_id": "O1", "customer_id": "C001", "revenue": 150.00, "order_date": "2026-05-01", "status": "shipped"},
{"order_id": "O2", "customer_id": "C002", "revenue": 75.50, "order_date": "2026-05-02", "status": "pending"},
]
repo.bulk_insert(orders)
result = repo.get_all()
assert len(result) == 2
assert set(result["order_id"].tolist()) == {"O1", "O2"}
@pytest.mark.integration
def test_revenue_aggregation_query(clean_orders):
"""Test a specific SQL aggregation query against real PostgreSQL."""
with clean_orders.begin() as conn:
conn.execute("""
INSERT INTO orders (order_id, customer_id, revenue, order_date)
VALUES
('O1', 'C001', 100.00, '2026-05-01'),
('O2', 'C001', 200.00, '2026-05-02'),
('O3', 'C002', 300.00, '2026-05-01')
""")
from src.db_repository import OrderRepository
repo = OrderRepository(engine=clean_orders)
result = repo.get_customer_revenue_summary()
c001_revenue = result.loc[result["customer_id"] == "C001", "total_revenue"].iloc[0]
assert float(c001_revenue) == pytest.approx(300.0)
@pytest.mark.integration
def test_primary_key_constraint_is_enforced(clean_orders):
"""Verify that duplicate order_id raises IntegrityError."""
import sqlalchemy.exc
with clean_orders.begin() as conn:
conn.execute("INSERT INTO orders (order_id, customer_id, revenue, order_date) VALUES ('O1', 'C001', 100, '2026-05-01')")
with pytest.raises(sqlalchemy.exc.IntegrityError):
with clean_orders.begin() as conn:
conn.execute("INSERT INTO orders (order_id, customer_id, revenue, order_date) VALUES ('O1', 'C002', 200, '2026-05-02')")Test Coverage with pytest-cov
pip install pytest-covRunning with Coverage
# Terminal report: show uncovered lines
pytest --cov=src --cov-report=term-missing
# HTML report: open in browser
pytest --cov=src --cov-report=html:htmlcov
# XML report: for CI tools (SonarQube, Codecov, etc.)
pytest --cov=src --cov-report=xml:coverage.xml
# Fail if coverage drops below threshold
pytest --cov=src --cov-fail-under=80
# Combine all report formats
pytest --cov=src \
--cov-report=term-missing \
--cov-report=html:htmlcov \
--cov-report=xml:coverage.xml \
--cov-fail-under=80.coveragerc (or pyproject.toml equivalent)
# pyproject.toml
[tool.coverage.run]
source = ["src"]
branch = true # Track branch coverage (if/else arms)
omit = [
"tests/*",
"src/migrations/*",
"src/__init__.py",
"src/*/conftest.py",
"src/cli.py", # CLI entry points — integration tested separately
]
[tool.coverage.report]
precision = 2
show_missing = true
exclude_lines = [
"pragma: no cover",
"if TYPE_CHECKING:",
"raise NotImplementedError",
"@(abc\\.)?abstractmethod",
"if __name__ == ['\"]__main__['\"]:",
"pass",
]
[tool.coverage.html]
directory = "htmlcov"
[tool.coverage.xml]
output = "coverage.xml"pragma: no cover
Mark code that legitimately cannot be tested (or shouldn't block coverage thresholds):
def debug_dump(df: pd.DataFrame) -> None: # pragma: no cover
"""Development-only debug function — excluded from coverage."""
print(df.to_string())
if __name__ == "__main__": # pragma: no cover
app()Test Organization for Data Engineering Projects
Here is a production-ready test directory structure:
tests/
├── conftest.py # Root: session/module fixtures, markers
├── fixtures/ # Static test data files
│ ├── orders_sample.csv
│ ├── customers_sample.csv
│ ├── schemas/
│ │ ├── orders_v1.json
│ │ └── customers_v1.json
│ └── parquet/
│ └── test_orders.parquet
├── unit/ # Fast, isolated, no I/O
│ ├── conftest.py # Unit-specific fixtures
│ ├── test_transformers.py
│ ├── test_validators.py
│ ├── test_aggregators.py
│ ├── test_enrichment.py
│ └── test_incremental.py
├── integration/ # Slower, external services
│ ├── conftest.py # DB connections, container fixtures
│ ├── test_postgres_pipeline.py
│ ├── test_s3_reader.py
│ └── test_snowflake_query.py
├── api/ # FastAPI endpoint tests
│ ├── conftest.py
│ └── test_pipeline_api.py
├── cli/ # CLI command tests
│ └── test_process_command.py
└── e2e/ # Full pipeline runs (slowest)
├── conftest.py
└── test_orders_pipeline_e2e.pyMakefile Targets for Common Test Commands
# Makefile
.PHONY: test test-unit test-integration test-coverage test-ci
# Fast feedback loop during development
test-unit:
pytest -x -vs -m "unit and not slow" --tb=short
# Run only integration tests (requires services)
test-integration:
pytest -m integration --tb=short
# Full suite with coverage
test-coverage:
pytest \
--cov=src \
--cov-report=term-missing \
--cov-report=html:htmlcov \
--cov-fail-under=80
# What CI runs
test-ci:
pytest \
-m "not slow" \
--junit-xml=test-results/junit.xml \
--cov=src \
--cov-report=xml:coverage.xml \
--cov-fail-under=80CI Integration: GitHub Actions
# .github/workflows/test.yml
name: Tests
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
unit-tests:
name: Unit Tests (Python ${{ matrix.python-version }})
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11", "3.12"]
fail-fast: false # Run all versions even if one fails
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: pip
- name: Install dependencies
run: |
pip install --upgrade pip
pip install -e ".[test]"
- name: Run unit tests
run: |
pytest \
-m "unit and not slow" \
--junit-xml=test-results/unit-${{ matrix.python-version }}.xml \
--cov=src \
--cov-report=xml:coverage-${{ matrix.python-version }}.xml \
--cov-fail-under=80 \
-v
- name: Upload test results
uses: actions/upload-artifact@v4
if: always()
with:
name: test-results-${{ matrix.python-version }}
path: test-results/
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
with:
file: coverage-${{ matrix.python-version }}.xml
flags: unit-tests
fail_ci_if_error: true
integration-tests:
name: Integration Tests
runs-on: ubuntu-latest
needs: unit-tests # Only run if unit tests pass
services:
postgres:
image: postgres:16-alpine
env:
POSTGRES_DB: test_pipeline
POSTGRES_USER: postgres
POSTGRES_PASSWORD: testpassword
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
cache: pip
- name: Install dependencies
run: pip install -e ".[test]"
- name: Run integration tests
env:
TEST_DB_HOST: localhost
TEST_DB_PORT: 5432
TEST_DB_NAME: test_pipeline
TEST_DB_USER: postgres
TEST_DB_PASSWORD: testpassword
run: |
pytest \
-m integration \
--junit-xml=test-results/integration.xml \
-v \
--tb=short
- name: Upload integration test results
uses: actions/upload-artifact@v4
if: always()
with:
name: integration-test-results
path: test-results/
slow-tests:
name: Slow / Nightly Tests
runs-on: ubuntu-latest
# Only run on schedule or manual trigger — not every PR
if: github.event_name == 'schedule' || github.event_name == 'workflow_dispatch'
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
cache: pip
- run: pip install -e ".[test]"
- name: Run slow tests
run: pytest -m slow --junit-xml=test-results/slow.xml -v
# Separate nightly workflow for slow tests# .github/workflows/nightly.yml
name: Nightly Full Suite
on:
schedule:
- cron: "0 2 * * *" # 2 AM UTC every day
workflow_dispatch: # Manual trigger from GitHub UI
jobs:
full-suite:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
cache: pip
- run: pip install -e ".[test]"
- name: Run full test suite including slow tests
run: |
pytest \
--junit-xml=test-results/full-suite.xml \
--cov=src \
--cov-report=html:htmlcov \
--cov-report=xml \
-v
- name: Upload coverage HTML
uses: actions/upload-artifact@v4
with:
name: coverage-report
path: htmlcov/pyproject.toml Test Dependencies
# pyproject.toml
[project.optional-dependencies]
test = [
"pytest>=8.0",
"pytest-cov>=5.0",
"pytest-mock>=3.12",
"pytest-xdist>=3.5", # Parallel test execution
"pytest-asyncio>=0.23", # Async test support
"pandas>=2.0",
"pyarrow>=14.0",
"freezegun>=1.4",
"responses>=0.25",
"respx>=0.21",
"testcontainers[postgres]>=4.0",
"httpx>=0.27", # For FastAPI TestClient
"typer[all]>=0.12",
]Parallel Test Execution with pytest-xdist
pip install pytest-xdist
# Run across 4 workers
pytest -n 4
# Auto-detect CPU count
pytest -n auto
# Distribute by file (good for independent test modules)
pytest -n auto --dist=loadfile
# Distribute by module group
pytest -n auto --dist=loadscopeImportant: session-scoped fixtures are NOT shared across workers with xdist. Use scope="session" only for truly independent, read-only fixtures, or use tmp_path_factory for worker-local temp dirs.
@pytest.fixture(scope="session")
def worker_id(tmp_path_factory):
"""Unique ID for each xdist worker — use to avoid resource conflicts."""
import os
return os.environ.get("PYTEST_XDIST_WORKER", "master")Quick Reference: Reading Test Output
# The -ra flag (show all non-passed) output summary:
# FAILED tests/unit/test_validators.py::test_revenue_positive - AssertionError
# ERROR tests/integration/test_db.py::test_insert - ConnectionError
# SKIPPED tests/integration/test_api.py::test_export (credentials not set)
# XFAILED tests/unit/test_parsers.py::test_edge_case (known bug JIRA-1234)
# Exit codes:
# 0 — all tests passed
# 1 — some tests failed
# 2 — interrupted (Ctrl+C)
# 3 — internal error
# 4 — command-line usage error
# 5 — no tests collected
# Coverage output:
# Name Stmts Miss Branch BrPart Cover Missing
# src/transformers.py 45 3 12 2 93% 42-44, 87
# src/validators.py 28 0 8 0 100%
# TOTAL 73 3 20 2 95%Summary
- Use
pd.testing.assert_frame_equalfor DataFrame comparisons — it gives column-level diffs and supports float tolerances - Test exception types and messages with
pytest.raises— always check the exception attributes, not just that it was raised - Keep test data in two tiers: tiny fixtures for logic tests, production-scale fixtures (session-scoped) for performance tests
TestClient(FastAPI) andCliRunner(Typer/Click) test HTTP endpoints and CLI commands without spawning real serverstestcontainersgives you real PostgreSQL (and any other Docker-based service) in integration tests- Configure
pytest-covwith branch coverage and a--cov-fail-underthreshold in CI - Separate unit, integration, slow, and e2e tests with markers — run them on different schedules
- Use
pytest-xdistfor parallel execution when unit tests grow beyond 30 seconds
This completes the pytest series. The four lessons build a complete testing stack for data engineering pipelines: fundamentals → fixtures & parametrize → mocking → full pipeline testing. Apply these patterns from day one, and refactoring your pipelines becomes safe, confident work.