pytest for Complete Data Pipeline Testing
Test pandas transformations with assert_frame_equal, handle exceptions with pytest.raises, test CLI scripts and FastAPI endpoints, run real database tests with testcontainers, measure coverage, and integrate with GitHub Actions CI.
pytest for Complete Data Pipeline Testing
Unit tests for individual functions are essential, but data engineering also requires testing at higher levels: full pipeline runs, CLI entry points, REST APIs that expose pipeline triggers, and integration with real databases. This lesson covers the complete testing stack ā from pandas assertion utilities to GitHub Actions CI configuration.
Testing Pandas Transformations with assert_frame_equal
pd.testing.assert_frame_equal is the workhorse of pandas pipeline testing. It provides column-level diffs and handles floating-point comparison, dtype checking, and index alignment.
import pandas as pd
import numpy as np
from pandas.testing import assert_frame_equal, assert_series_equal
import pytest
# --- Source code under test ---
# src/transformers.py
def compute_order_metrics(orders_df: pd.DataFrame, customers_df: pd.DataFrame) -> pd.DataFrame:
"""
Joins orders to customers and computes per-customer revenue metrics.
Returns one row per customer with: total_revenue, order_count, avg_order_value,
first_order_date, last_order_date, revenue_rank.
"""
joined = orders_df.merge(customers_df[["customer_id", "region", "tier"]], on="customer_id", how="left")
metrics = (
joined
.groupby("customer_id", as_index=False)
.agg(
total_revenue=("revenue", "sum"),
order_count=("order_id", "nunique"),
avg_order_value=("revenue", "mean"),
first_order_date=("order_date", "min"),
last_order_date=("order_date", "max"),
)
)
# Add region and tier from customers
metrics = metrics.merge(
customers_df[["customer_id", "region", "tier"]],
on="customer_id",
how="left",
)
# Revenue rank within region (1 = highest revenue)
metrics["revenue_rank"] = (
metrics.groupby("region")["total_revenue"]
.rank(method="dense", ascending=False)
.astype(int)
)
return metrics.sort_values("customer_id").reset_index(drop=True)assert_frame_equal Options
# tests/unit/test_compute_order_metrics.py
import pytest
import pandas as pd
from pandas.testing import assert_frame_equal
@pytest.fixture
def orders():
return pd.DataFrame({
"order_id": ["O1", "O2", "O3", "O4", "O5"],
"customer_id": ["C001", "C001", "C002", "C003", "C002"],
"revenue": [100.0, 150.0, 200.0, 300.0, 50.0],
"order_date": pd.to_datetime(["2026-01-01", "2026-01-15", "2026-01-05",
"2026-01-10", "2026-01-20"]),
})
@pytest.fixture
def customers():
return pd.DataFrame({
"customer_id": ["C001", "C002", "C003"],
"region": ["NORTH", "NORTH", "SOUTH"],
"tier": ["gold", "silver", "gold"],
})
def test_total_revenue_per_customer(orders, customers):
from src.transformers import compute_order_metrics
result = compute_order_metrics(orders, customers)
expected = pd.DataFrame({
"customer_id": ["C001", "C002", "C003"],
"total_revenue": [250.0, 250.0, 300.0],
})[["customer_id", "total_revenue"]]
assert_frame_equal(
result[["customer_id", "total_revenue"]].reset_index(drop=True),
expected.reset_index(drop=True),
check_dtype=False, # Allow int64 vs float64 differences
rtol=1e-5, # Relative tolerance for floats
)
def test_order_count_per_customer(orders, customers):
from src.transformers import compute_order_metrics
result = compute_order_metrics(orders, customers)
c001 = result.loc[result["customer_id"] == "C001", "order_count"].iloc[0]
c002 = result.loc[result["customer_id"] == "C002", "order_count"].iloc[0]
c003 = result.loc[result["customer_id"] == "C003", "order_count"].iloc[0]
assert c001 == 2
assert c002 == 2
assert c003 == 1
def test_first_and_last_order_dates(orders, customers):
from src.transformers import compute_order_metrics
result = compute_order_metrics(orders, customers)
c001_row = result[result["customer_id"] == "C001"].iloc[0]
assert c001_row["first_order_date"] == pd.Timestamp("2026-01-01")
assert c001_row["last_order_date"] == pd.Timestamp("2026-01-15")
def test_revenue_rank_within_region(orders, customers):
from src.transformers import compute_order_metrics
result = compute_order_metrics(orders, customers)
# NORTH: C001=250, C002=250 ā both rank 1 (tie)
# SOUTH: C003=300 ā rank 1
north_ranks = result[result["region"] == "NORTH"].set_index("customer_id")["revenue_rank"]
south_ranks = result[result["region"] == "SOUTH"].set_index("customer_id")["revenue_rank"]
assert north_ranks["C001"] == 1
assert north_ranks["C002"] == 1 # Tie
assert south_ranks["C003"] == 1
def test_output_columns_match_schema(orders, customers):
from src.transformers import compute_order_metrics
result = compute_order_metrics(orders, customers)
expected_columns = {
"customer_id", "total_revenue", "order_count", "avg_order_value",
"first_order_date", "last_order_date", "region", "tier", "revenue_rank"
}
assert set(result.columns) == expected_columns
def test_output_is_one_row_per_customer(orders, customers):
from src.transformers import compute_order_metrics
result = compute_order_metrics(orders, customers)
assert result["customer_id"].nunique() == len(result)
assert not result["customer_id"].duplicated().any()
def test_left_join_preserves_unmatched_customers(orders):
"""Customers without matching orders should appear with NaN metrics."""
customers_with_inactive = pd.DataFrame({
"customer_id": ["C001", "C002", "C003", "C999"], # C999 has no orders
"region": ["NORTH", "NORTH", "SOUTH", "EAST"],
"tier": ["gold", "silver", "gold", "bronze"],
})
from src.transformers import compute_order_metrics
# This tests behavior ā adjust if business logic requires excluding inactive customers
result = compute_order_metrics(orders, customers_with_inactive)
assert "C999" not in result["customer_id"].values # Left join from orders side
def test_assert_frame_equal_float_tolerance():
"""Demonstrate float tolerance options in assert_frame_equal."""
actual = pd.DataFrame({"revenue": [99.99999, 200.000001]})
expected = pd.DataFrame({"revenue": [100.0, 200.0]})
# This fails without tolerance
with pytest.raises(AssertionError):
assert_frame_equal(actual, expected)
# This passes with relative tolerance
assert_frame_equal(actual, expected, rtol=1e-3)
# This passes with absolute tolerance
assert_frame_equal(actual, expected, atol=0.01)Testing Error Cases with pytest.raises
# src/validators.py
import pandas as pd
from typing import List
class PipelineValidationError(Exception):
"""Raised when pipeline input fails validation."""
def __init__(self, message: str, failed_checks: List[str]):
super().__init__(message)
self.failed_checks = failed_checks
def validate_and_raise(df: pd.DataFrame, required_columns: List[str]) -> pd.DataFrame:
"""Validate a DataFrame, raising PipelineValidationError if invalid."""
failed = []
missing_cols = [c for c in required_columns if c not in df.columns]
if missing_cols:
failed.append(f"Missing columns: {missing_cols}")
if df.empty:
failed.append("DataFrame is empty")
if "order_id" in df.columns and df["order_id"].duplicated().any():
failed.append("Duplicate order_id values detected")
if failed:
raise PipelineValidationError(
f"Validation failed with {len(failed)} check(s)",
failed_checks=failed,
)
return df# tests/unit/test_validators.py
def test_raises_pipeline_validation_error_for_missing_columns():
df = pd.DataFrame({"order_id": ["O1"], "revenue": [100.0]})
with pytest.raises(PipelineValidationError) as exc_info:
validate_and_raise(df, required_columns=["order_id", "revenue", "customer_id"])
# Inspect the exception
error = exc_info.value
assert "customer_id" in str(error)
assert any("customer_id" in check for check in error.failed_checks)
def test_raises_for_empty_dataframe():
df = pd.DataFrame(columns=["order_id", "revenue"])
with pytest.raises(PipelineValidationError, match="empty"):
validate_and_raise(df, required_columns=["order_id", "revenue"])
def test_raises_for_duplicate_order_ids():
df = pd.DataFrame({
"order_id": ["O1", "O1", "O2"],
"revenue": [100.0, 150.0, 200.0],
})
with pytest.raises(PipelineValidationError) as exc_info:
validate_and_raise(df, required_columns=["order_id", "revenue"])
assert any("Duplicate" in check for check in exc_info.value.failed_checks)
def test_accumulates_all_failures_before_raising():
"""Should report all failures, not short-circuit at the first one."""
df = pd.DataFrame({"x": []}) # Empty AND missing required columns
with pytest.raises(PipelineValidationError) as exc_info:
validate_and_raise(df, required_columns=["order_id", "revenue"])
assert len(exc_info.value.failed_checks) >= 2
def test_returns_dataframe_when_valid():
df = pd.DataFrame({
"order_id": ["O1", "O2"],
"revenue": [100.0, 200.0],
})
result = validate_and_raise(df, required_columns=["order_id", "revenue"])
assert_frame_equal(result, df)
def test_raises_correct_exception_type_not_base_exception():
"""Ensure we raise the specific exception, not a generic one."""
df = pd.DataFrame()
# Verify it's specifically PipelineValidationError, not just Exception
with pytest.raises(PipelineValidationError):
validate_and_raise(df, required_columns=["order_id"])
# This would also pass for any exception ā be specific:
with pytest.raises(PipelineValidationError) as exc_info:
validate_and_raise(df, required_columns=["order_id"])
assert type(exc_info.value) is PipelineValidationErrorTesting Sample Data Fixtures vs Production Data Patterns
A common dilemma: test with tiny fixtures (fast, easy to reason about) or with production-scale data (finds real bugs)?
The answer is both, in the right places:
# tests/conftest.py
@pytest.fixture(scope="session")
def minimal_orders_df():
"""3ā5 rows. Use for logic tests where scale doesn't matter."""
return pd.DataFrame({
"order_id": ["O1", "O2", "O3"],
"customer_id": ["C1", "C2", "C1"],
"revenue": [100.0, 200.0, 150.0],
"order_date": pd.to_datetime(["2026-01-01", "2026-01-02", "2026-01-03"]),
})
@pytest.fixture(scope="session")
def production_scale_orders_df():
"""
Statistically representative sample: 100k rows, realistic distributions.
session-scoped ā generated once per test run.
Use for performance, memory, and distribution tests only.
"""
import numpy as np
rng = np.random.default_rng(42)
n = 100_000
revenues = np.concatenate([
rng.lognormal(mean=4, sigma=1.5, size=int(n * 0.9)), # Typical orders
rng.uniform(5000, 50000, size=int(n * 0.1)), # High-value outliers
])
return pd.DataFrame({
"order_id": [f"ORD-{i:08d}" for i in range(n)],
"customer_id": [f"C{rng.integers(1, 5000):05d}" for _ in range(n)],
"revenue": revenues.round(2),
"order_date": pd.to_datetime(
rng.integers(
pd.Timestamp("2023-01-01").value,
pd.Timestamp("2026-01-01").value,
size=n,
)
),
"status": rng.choice(["shipped", "pending", "cancelled"], size=n, p=[0.65, 0.25, 0.10]),
})
@pytest.mark.slow
def test_aggregation_handles_production_scale(production_scale_orders_df):
from src.aggregators import compute_customer_revenue
result = compute_customer_revenue(production_scale_orders_df)
# Basic integrity: one row per customer
assert result["customer_id"].nunique() == len(result)
# Revenue should be non-negative (no aggregation bugs)
assert (result["total_revenue"] >= 0).all()
def test_aggregation_logic_with_minimal_data(minimal_orders_df):
from src.aggregators import compute_customer_revenue
result = compute_customer_revenue(minimal_orders_df)
c1 = result.loc[result["customer_id"] == "C1", "total_revenue"].iloc[0]
assert c1 == pytest.approx(250.0) # 100 + 150Testing Incremental Pipeline Logic
Incremental pipelines only process new data since the last run. Testing requires controlling the watermark:
# src/incremental.py
import pandas as pd
from datetime import date, datetime
from typing import Optional
class IncrementalProcessor:
def __init__(self, watermark_store):
self.watermark_store = watermark_store
def get_watermark(self, pipeline_name: str) -> Optional[date]:
return self.watermark_store.get(pipeline_name)
def set_watermark(self, pipeline_name: str, new_watermark: date) -> None:
self.watermark_store.set(pipeline_name, new_watermark)
def process(self, df: pd.DataFrame, pipeline_name: str, date_col: str) -> pd.DataFrame:
watermark = self.get_watermark(pipeline_name)
if watermark is not None:
new_records = df[df[date_col] > pd.Timestamp(watermark)]
else:
new_records = df.copy()
if new_records.empty:
return new_records
max_date = new_records[date_col].max().date()
self.set_watermark(pipeline_name, max_date)
return new_records# tests/unit/test_incremental.py
from unittest.mock import MagicMock
import pandas as pd
from datetime import date
import pytest
@pytest.fixture
def sample_df():
return pd.DataFrame({
"order_id": ["O1", "O2", "O3", "O4"],
"order_date": pd.to_datetime(["2026-04-01", "2026-04-15", "2026-05-01", "2026-05-07"]),
"revenue": [100, 200, 300, 400],
})
@pytest.fixture
def watermark_store():
store = MagicMock()
store.get.return_value = None # Default: no watermark (first run)
return store
def test_first_run_processes_all_records(sample_df, watermark_store):
from src.incremental import IncrementalProcessor
processor = IncrementalProcessor(watermark_store)
result = processor.process(sample_df, "orders_pipeline", "order_date")
assert len(result) == 4
def test_incremental_run_only_processes_new_records(sample_df, watermark_store):
watermark_store.get.return_value = date(2026, 4, 30)
from src.incremental import IncrementalProcessor
processor = IncrementalProcessor(watermark_store)
result = processor.process(sample_df, "orders_pipeline", "order_date")
assert len(result) == 2
assert set(result["order_id"]) == {"O3", "O4"}
def test_watermark_is_updated_after_processing(sample_df, watermark_store):
from src.incremental import IncrementalProcessor
processor = IncrementalProcessor(watermark_store)
processor.process(sample_df, "orders_pipeline", "order_date")
watermark_store.set.assert_called_once_with("orders_pipeline", date(2026, 5, 7))
def test_no_new_records_returns_empty_df(sample_df, watermark_store):
watermark_store.get.return_value = date(2026, 12, 31) # Future watermark
from src.incremental import IncrementalProcessor
processor = IncrementalProcessor(watermark_store)
result = processor.process(sample_df, "orders_pipeline", "order_date")
assert result.empty
watermark_store.set.assert_not_called() # Watermark not updated when no new dataTesting CLI Scripts with CliRunner
Data engineers build CLI tools for pipeline orchestration. Test them without spawning subprocesses using Typer's or Click's CliRunner.
# src/cli.py
import typer
import pandas as pd
from pathlib import Path
app = typer.Typer()
@app.command()
def process(
input_file: Path = typer.Argument(..., help="Input CSV file path"),
output_file: Path = typer.Argument(..., help="Output Parquet file path"),
drop_nulls: bool = typer.Option(True, help="Drop rows with null customer_id"),
verbose: bool = typer.Option(False, "--verbose", "-v"),
):
"""Process a sales CSV and write to Parquet."""
if not input_file.exists():
typer.echo(f"Error: Input file not found: {input_file}", err=True)
raise typer.Exit(code=1)
df = pd.read_csv(input_file)
initial_count = len(df)
if drop_nulls and "customer_id" in df.columns:
df = df.dropna(subset=["customer_id"])
df.to_parquet(output_file, index=False)
if verbose:
typer.echo(f"Processed {initial_count} rows ā {len(df)} rows written to {output_file}")
else:
typer.echo(f"Done: {len(df)} rows written.")# tests/unit/test_cli.py
import pytest
import pandas as pd
from typer.testing import CliRunner
from src.cli import app
runner = CliRunner()
@pytest.fixture
def input_csv(tmp_path):
"""Write a sample CSV and return its path."""
df = pd.DataFrame({
"order_id": ["O1", "O2", "O3"],
"customer_id": ["C1", None, "C3"],
"revenue": [100, 200, 300],
})
path = tmp_path / "input.csv"
df.to_csv(path, index=False)
return path
def test_cli_processes_file_successfully(input_csv, tmp_path):
output = tmp_path / "output.parquet"
result = runner.invoke(app, [str(input_csv), str(output)])
assert result.exit_code == 0
assert output.exists()
df = pd.read_parquet(output)
assert len(df) == 2 # One null row dropped
def test_cli_verbose_flag_shows_row_counts(input_csv, tmp_path):
output = tmp_path / "output.parquet"
result = runner.invoke(app, [str(input_csv), str(output), "--verbose"])
assert result.exit_code == 0
assert "3 rows" in result.output # Initial count
assert "2 rows" in result.output # After dropping null
def test_cli_exits_with_error_for_missing_input(tmp_path):
missing_file = tmp_path / "does_not_exist.csv"
output = tmp_path / "output.parquet"
result = runner.invoke(app, [str(missing_file), str(output)])
assert result.exit_code == 1
assert "not found" in result.output.lower() or "Error" in result.output
def test_cli_no_drop_nulls_flag_preserves_all_rows(input_csv, tmp_path):
output = tmp_path / "output.parquet"
result = runner.invoke(app, [str(input_csv), str(output), "--no-drop-nulls"])
assert result.exit_code == 0
df = pd.read_parquet(output)
assert len(df) == 3 # All rows preserved including null customer_id
# Click-based CLI testing (same pattern, different runner import)
# from click.testing import CliRunner as ClickRunnerTesting FastAPI Endpoints with TestClient
Pipelines increasingly expose HTTP APIs for triggering runs, checking status, or serving transformation results.
# src/api.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import pandas as pd
from datetime import date
app = FastAPI(title="Pipeline API")
# In-memory job store (use Redis/DB in production)
_jobs: dict = {}
class PipelineRunRequest(BaseModel):
pipeline_name: str
start_date: date
end_date: date
dry_run: bool = False
class PipelineRunResponse(BaseModel):
job_id: str
status: str
message: Optional[str] = None
@app.get("/health")
def health_check():
return {"status": "healthy", "version": "1.0.0"}
@app.post("/pipeline/run", response_model=PipelineRunResponse)
def trigger_pipeline_run(
request: PipelineRunRequest,
background_tasks: BackgroundTasks,
):
if request.end_date < request.start_date:
raise HTTPException(
status_code=422,
detail="end_date must be >= start_date"
)
job_id = f"job-{request.pipeline_name}-{request.start_date}-{request.end_date}"
if job_id in _jobs:
return PipelineRunResponse(
job_id=job_id,
status="already_running",
message="A job with these parameters is already active",
)
_jobs[job_id] = "queued"
return PipelineRunResponse(job_id=job_id, status="queued")
@app.get("/pipeline/status/{job_id}", response_model=PipelineRunResponse)
def get_job_status(job_id: str):
if job_id not in _jobs:
raise HTTPException(status_code=404, detail=f"Job not found: {job_id}")
return PipelineRunResponse(job_id=job_id, status=_jobs[job_id])# tests/unit/test_api.py
import pytest
from fastapi.testclient import TestClient
from src.api import app, _jobs
@pytest.fixture(autouse=True)
def clear_job_store():
"""Clear the in-memory job store before each test."""
_jobs.clear()
yield
_jobs.clear()
@pytest.fixture
def client():
return TestClient(app)
def test_health_check_returns_200(client):
response = client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"
def test_trigger_pipeline_run_returns_job_id(client):
response = client.post("/pipeline/run", json={
"pipeline_name": "orders_daily",
"start_date": "2026-05-01",
"end_date": "2026-05-07",
})
assert response.status_code == 200
data = response.json()
assert "job_id" in data
assert data["status"] == "queued"
assert "orders_daily" in data["job_id"]
def test_trigger_pipeline_rejects_invalid_date_range(client):
response = client.post("/pipeline/run", json={
"pipeline_name": "orders_daily",
"start_date": "2026-05-07",
"end_date": "2026-05-01", # end < start
})
assert response.status_code == 422
assert "end_date" in response.json()["detail"].lower()
def test_trigger_pipeline_dry_run_flag(client):
response = client.post("/pipeline/run", json={
"pipeline_name": "orders_daily",
"start_date": "2026-05-01",
"end_date": "2026-05-07",
"dry_run": True,
})
assert response.status_code == 200
assert response.json()["status"] == "queued"
def test_duplicate_job_returns_already_running(client):
payload = {
"pipeline_name": "orders_daily",
"start_date": "2026-05-01",
"end_date": "2026-05-07",
}
# First trigger
first = client.post("/pipeline/run", json=payload)
assert first.status_code == 200
# Second trigger with same params
second = client.post("/pipeline/run", json=payload)
assert second.status_code == 200
assert second.json()["status"] == "already_running"
def test_get_job_status_for_existing_job(client):
# Create a job first
create = client.post("/pipeline/run", json={
"pipeline_name": "customers_daily",
"start_date": "2026-05-01",
"end_date": "2026-05-01",
})
job_id = create.json()["job_id"]
# Check status
status = client.get(f"/pipeline/status/{job_id}")
assert status.status_code == 200
assert status.json()["status"] == "queued"
def test_get_job_status_404_for_unknown_job(client):
response = client.get("/pipeline/status/nonexistent-job-id")
assert response.status_code == 404
assert "not found" in response.json()["detail"].lower()
def test_pipeline_run_missing_required_field(client):
response = client.post("/pipeline/run", json={
"pipeline_name": "orders_daily",
# Missing start_date and end_date
})
assert response.status_code == 422 # FastAPI validation errortestcontainers-python: Real Database Tests
testcontainers-python starts real Docker containers for your tests. Use it for integration tests that need to verify SQL logic, index behavior, or stored procedures.
pip install testcontainers[postgres]
# Requires Docker Desktop running# tests/integration/test_postgres_pipeline.py
import pytest
import pandas as pd
import psycopg2
from testcontainers.postgres import PostgresContainer
from sqlalchemy import create_engine
@pytest.fixture(scope="module")
def postgres_container():
"""
Start a PostgreSQL container once for the entire module.
Destroyed after all tests in the module complete.
"""
with PostgresContainer("postgres:16-alpine") as postgres:
yield postgres
@pytest.fixture(scope="module")
def db_engine(postgres_container):
"""SQLAlchemy engine connected to the test PostgreSQL container."""
engine = create_engine(postgres_container.get_connection_url())
return engine
@pytest.fixture(scope="module")
def initialized_db(db_engine):
"""Create schema and seed reference data once per module."""
with db_engine.begin() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS orders (
order_id VARCHAR(50) PRIMARY KEY,
customer_id VARCHAR(50) NOT NULL,
revenue NUMERIC(12, 2) NOT NULL,
order_date DATE NOT NULL,
status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT NOW()
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS customers (
customer_id VARCHAR(50) PRIMARY KEY,
name VARCHAR(200),
region VARCHAR(50),
tier VARCHAR(20)
)
""")
conn.execute("""
INSERT INTO customers VALUES
('C001', 'Alice Martin', 'NORTH', 'gold'),
('C002', 'Bob Chen', 'SOUTH', 'silver'),
('C003', 'Carol White', 'NORTH', 'gold')
ON CONFLICT DO NOTHING
""")
return db_engine
@pytest.fixture
def clean_orders(initialized_db):
"""Ensure orders table is empty before each test."""
with initialized_db.begin() as conn:
conn.execute("DELETE FROM orders")
yield initialized_db
with initialized_db.begin() as conn:
conn.execute("DELETE FROM orders")
@pytest.mark.integration
def test_insert_and_query_orders(clean_orders):
from src.db_repository import OrderRepository
repo = OrderRepository(engine=clean_orders)
orders = [
{"order_id": "O1", "customer_id": "C001", "revenue": 150.00, "order_date": "2026-05-01", "status": "shipped"},
{"order_id": "O2", "customer_id": "C002", "revenue": 75.50, "order_date": "2026-05-02", "status": "pending"},
]
repo.bulk_insert(orders)
result = repo.get_all()
assert len(result) == 2
assert set(result["order_id"].tolist()) == {"O1", "O2"}
@pytest.mark.integration
def test_revenue_aggregation_query(clean_orders):
"""Test a specific SQL aggregation query against real PostgreSQL."""
with clean_orders.begin() as conn:
conn.execute("""
INSERT INTO orders (order_id, customer_id, revenue, order_date)
VALUES
('O1', 'C001', 100.00, '2026-05-01'),
('O2', 'C001', 200.00, '2026-05-02'),
('O3', 'C002', 300.00, '2026-05-01')
""")
from src.db_repository import OrderRepository
repo = OrderRepository(engine=clean_orders)
result = repo.get_customer_revenue_summary()
c001_revenue = result.loc[result["customer_id"] == "C001", "total_revenue"].iloc[0]
assert float(c001_revenue) == pytest.approx(300.0)
@pytest.mark.integration
def test_primary_key_constraint_is_enforced(clean_orders):
"""Verify that duplicate order_id raises IntegrityError."""
import sqlalchemy.exc
with clean_orders.begin() as conn:
conn.execute("INSERT INTO orders (order_id, customer_id, revenue, order_date) VALUES ('O1', 'C001', 100, '2026-05-01')")
with pytest.raises(sqlalchemy.exc.IntegrityError):
with clean_orders.begin() as conn:
conn.execute("INSERT INTO orders (order_id, customer_id, revenue, order_date) VALUES ('O1', 'C002', 200, '2026-05-02')")Test Coverage with pytest-cov
pip install pytest-covRunning with Coverage
# Terminal report: show uncovered lines
pytest --cov=src --cov-report=term-missing
# HTML report: open in browser
pytest --cov=src --cov-report=html:htmlcov
# XML report: for CI tools (SonarQube, Codecov, etc.)
pytest --cov=src --cov-report=xml:coverage.xml
# Fail if coverage drops below threshold
pytest --cov=src --cov-fail-under=80
# Combine all report formats
pytest --cov=src \
--cov-report=term-missing \
--cov-report=html:htmlcov \
--cov-report=xml:coverage.xml \
--cov-fail-under=80.coveragerc (or pyproject.toml equivalent)
# pyproject.toml
[tool.coverage.run]
source = ["src"]
branch = true # Track branch coverage (if/else arms)
omit = [
"tests/*",
"src/migrations/*",
"src/__init__.py",
"src/*/conftest.py",
"src/cli.py", # CLI entry points ā integration tested separately
]
[tool.coverage.report]
precision = 2
show_missing = true
exclude_lines = [
"pragma: no cover",
"if TYPE_CHECKING:",
"raise NotImplementedError",
"@(abc\\.)?abstractmethod",
"if __name__ == ['\"]__main__['\"]:",
"pass",
]
[tool.coverage.html]
directory = "htmlcov"
[tool.coverage.xml]
output = "coverage.xml"pragma: no cover
Mark code that legitimately cannot be tested (or shouldn't block coverage thresholds):
def debug_dump(df: pd.DataFrame) -> None: # pragma: no cover
"""Development-only debug function ā excluded from coverage."""
print(df.to_string())
if __name__ == "__main__": # pragma: no cover
app()Test Organization for Data Engineering Projects
Here is a production-ready test directory structure:
tests/
āāā conftest.py # Root: session/module fixtures, markers
āāā fixtures/ # Static test data files
ā āāā orders_sample.csv
ā āāā customers_sample.csv
ā āāā schemas/
ā ā āāā orders_v1.json
ā ā āāā customers_v1.json
ā āāā parquet/
ā āāā test_orders.parquet
āāā unit/ # Fast, isolated, no I/O
ā āāā conftest.py # Unit-specific fixtures
ā āāā test_transformers.py
ā āāā test_validators.py
ā āāā test_aggregators.py
ā āāā test_enrichment.py
ā āāā test_incremental.py
āāā integration/ # Slower, external services
ā āāā conftest.py # DB connections, container fixtures
ā āāā test_postgres_pipeline.py
ā āāā test_s3_reader.py
ā āāā test_snowflake_query.py
āāā api/ # FastAPI endpoint tests
ā āāā conftest.py
ā āāā test_pipeline_api.py
āāā cli/ # CLI command tests
ā āāā test_process_command.py
āāā e2e/ # Full pipeline runs (slowest)
āāā conftest.py
āāā test_orders_pipeline_e2e.pyMakefile Targets for Common Test Commands
# Makefile
.PHONY: test test-unit test-integration test-coverage test-ci
# Fast feedback loop during development
test-unit:
pytest -x -vs -m "unit and not slow" --tb=short
# Run only integration tests (requires services)
test-integration:
pytest -m integration --tb=short
# Full suite with coverage
test-coverage:
pytest \
--cov=src \
--cov-report=term-missing \
--cov-report=html:htmlcov \
--cov-fail-under=80
# What CI runs
test-ci:
pytest \
-m "not slow" \
--junit-xml=test-results/junit.xml \
--cov=src \
--cov-report=xml:coverage.xml \
--cov-fail-under=80CI Integration: GitHub Actions
# .github/workflows/test.yml
name: Tests
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
unit-tests:
name: Unit Tests (Python ${{ matrix.python-version }})
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11", "3.12"]
fail-fast: false # Run all versions even if one fails
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: pip
- name: Install dependencies
run: |
pip install --upgrade pip
pip install -e ".[test]"
- name: Run unit tests
run: |
pytest \
-m "unit and not slow" \
--junit-xml=test-results/unit-${{ matrix.python-version }}.xml \
--cov=src \
--cov-report=xml:coverage-${{ matrix.python-version }}.xml \
--cov-fail-under=80 \
-v
- name: Upload test results
uses: actions/upload-artifact@v4
if: always()
with:
name: test-results-${{ matrix.python-version }}
path: test-results/
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
with:
file: coverage-${{ matrix.python-version }}.xml
flags: unit-tests
fail_ci_if_error: true
integration-tests:
name: Integration Tests
runs-on: ubuntu-latest
needs: unit-tests # Only run if unit tests pass
services:
postgres:
image: postgres:16-alpine
env:
POSTGRES_DB: test_pipeline
POSTGRES_USER: postgres
POSTGRES_PASSWORD: testpassword
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
cache: pip
- name: Install dependencies
run: pip install -e ".[test]"
- name: Run integration tests
env:
TEST_DB_HOST: localhost
TEST_DB_PORT: 5432
TEST_DB_NAME: test_pipeline
TEST_DB_USER: postgres
TEST_DB_PASSWORD: testpassword
run: |
pytest \
-m integration \
--junit-xml=test-results/integration.xml \
-v \
--tb=short
- name: Upload integration test results
uses: actions/upload-artifact@v4
if: always()
with:
name: integration-test-results
path: test-results/
slow-tests:
name: Slow / Nightly Tests
runs-on: ubuntu-latest
# Only run on schedule or manual trigger ā not every PR
if: github.event_name == 'schedule' || github.event_name == 'workflow_dispatch'
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
cache: pip
- run: pip install -e ".[test]"
- name: Run slow tests
run: pytest -m slow --junit-xml=test-results/slow.xml -v
# Separate nightly workflow for slow tests# .github/workflows/nightly.yml
name: Nightly Full Suite
on:
schedule:
- cron: "0 2 * * *" # 2 AM UTC every day
workflow_dispatch: # Manual trigger from GitHub UI
jobs:
full-suite:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
cache: pip
- run: pip install -e ".[test]"
- name: Run full test suite including slow tests
run: |
pytest \
--junit-xml=test-results/full-suite.xml \
--cov=src \
--cov-report=html:htmlcov \
--cov-report=xml \
-v
- name: Upload coverage HTML
uses: actions/upload-artifact@v4
with:
name: coverage-report
path: htmlcov/pyproject.toml Test Dependencies
# pyproject.toml
[project.optional-dependencies]
test = [
"pytest>=8.0",
"pytest-cov>=5.0",
"pytest-mock>=3.12",
"pytest-xdist>=3.5", # Parallel test execution
"pytest-asyncio>=0.23", # Async test support
"pandas>=2.0",
"pyarrow>=14.0",
"freezegun>=1.4",
"responses>=0.25",
"respx>=0.21",
"testcontainers[postgres]>=4.0",
"httpx>=0.27", # For FastAPI TestClient
"typer[all]>=0.12",
]Parallel Test Execution with pytest-xdist
pip install pytest-xdist
# Run across 4 workers
pytest -n 4
# Auto-detect CPU count
pytest -n auto
# Distribute by file (good for independent test modules)
pytest -n auto --dist=loadfile
# Distribute by module group
pytest -n auto --dist=loadscopeImportant: session-scoped fixtures are NOT shared across workers with xdist. Use scope="session" only for truly independent, read-only fixtures, or use tmp_path_factory for worker-local temp dirs.
@pytest.fixture(scope="session")
def worker_id(tmp_path_factory):
"""Unique ID for each xdist worker ā use to avoid resource conflicts."""
import os
return os.environ.get("PYTEST_XDIST_WORKER", "master")Quick Reference: Reading Test Output
# The -ra flag (show all non-passed) output summary:
# FAILED tests/unit/test_validators.py::test_revenue_positive - AssertionError
# ERROR tests/integration/test_db.py::test_insert - ConnectionError
# SKIPPED tests/integration/test_api.py::test_export (credentials not set)
# XFAILED tests/unit/test_parsers.py::test_edge_case (known bug JIRA-1234)
# Exit codes:
# 0 ā all tests passed
# 1 ā some tests failed
# 2 ā interrupted (Ctrl+C)
# 3 ā internal error
# 4 ā command-line usage error
# 5 ā no tests collected
# Coverage output:
# Name Stmts Miss Branch BrPart Cover Missing
# src/transformers.py 45 3 12 2 93% 42-44, 87
# src/validators.py 28 0 8 0 100%
# TOTAL 73 3 20 2 95%Summary
- Use
pd.testing.assert_frame_equalfor DataFrame comparisons ā it gives column-level diffs and supports float tolerances - Test exception types and messages with
pytest.raisesā always check the exception attributes, not just that it was raised - Keep test data in two tiers: tiny fixtures for logic tests, production-scale fixtures (session-scoped) for performance tests
TestClient(FastAPI) andCliRunner(Typer/Click) test HTTP endpoints and CLI commands without spawning real serverstestcontainersgives you real PostgreSQL (and any other Docker-based service) in integration tests- Configure
pytest-covwith branch coverage and a--cov-fail-underthreshold in CI - Separate unit, integration, slow, and e2e tests with markers ā run them on different schedules
- Use
pytest-xdistfor parallel execution when unit tests grow beyond 30 seconds
This completes the pytest series. The four lessons build a complete testing stack for data engineering pipelines: fundamentals ā fixtures & parametrize ā mocking ā full pipeline testing. Apply these patterns from day one, and refactoring your pipelines becomes safe, confident work.
Enjoyed this article?
Explore the Data Engineering learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.