Back to blog
Data Engineeringadvanced

pytest for Complete Data Pipeline Testing

Test pandas transformations with assert_frame_equal, handle exceptions with pytest.raises, test CLI scripts and FastAPI endpoints, run real database tests with testcontainers, measure coverage, and integrate with GitHub Actions CI.

LearnixoMay 7, 202619 min read
pytestpythondata-engineeringpandasfastapitestcontainerscoverageci-cdgithub-actions
Share:š•

pytest for Complete Data Pipeline Testing

Unit tests for individual functions are essential, but data engineering also requires testing at higher levels: full pipeline runs, CLI entry points, REST APIs that expose pipeline triggers, and integration with real databases. This lesson covers the complete testing stack — from pandas assertion utilities to GitHub Actions CI configuration.

Testing Pandas Transformations with assert_frame_equal

pd.testing.assert_frame_equal is the workhorse of pandas pipeline testing. It provides column-level diffs and handles floating-point comparison, dtype checking, and index alignment.

Python
import pandas as pd
import numpy as np
from pandas.testing import assert_frame_equal, assert_series_equal
import pytest


# --- Source code under test ---
# src/transformers.py

def compute_order_metrics(orders_df: pd.DataFrame, customers_df: pd.DataFrame) -> pd.DataFrame:
    """
    Joins orders to customers and computes per-customer revenue metrics.
    Returns one row per customer with: total_revenue, order_count, avg_order_value,
    first_order_date, last_order_date, revenue_rank.
    """
    joined = orders_df.merge(customers_df[["customer_id", "region", "tier"]], on="customer_id", how="left")

    metrics = (
        joined
        .groupby("customer_id", as_index=False)
        .agg(
            total_revenue=("revenue", "sum"),
            order_count=("order_id", "nunique"),
            avg_order_value=("revenue", "mean"),
            first_order_date=("order_date", "min"),
            last_order_date=("order_date", "max"),
        )
    )

    # Add region and tier from customers
    metrics = metrics.merge(
        customers_df[["customer_id", "region", "tier"]],
        on="customer_id",
        how="left",
    )

    # Revenue rank within region (1 = highest revenue)
    metrics["revenue_rank"] = (
        metrics.groupby("region")["total_revenue"]
        .rank(method="dense", ascending=False)
        .astype(int)
    )

    return metrics.sort_values("customer_id").reset_index(drop=True)

assert_frame_equal Options

Python
# tests/unit/test_compute_order_metrics.py
import pytest
import pandas as pd
from pandas.testing import assert_frame_equal


@pytest.fixture
def orders():
    return pd.DataFrame({
        "order_id": ["O1", "O2", "O3", "O4", "O5"],
        "customer_id": ["C001", "C001", "C002", "C003", "C002"],
        "revenue": [100.0, 150.0, 200.0, 300.0, 50.0],
        "order_date": pd.to_datetime(["2026-01-01", "2026-01-15", "2026-01-05",
                                       "2026-01-10", "2026-01-20"]),
    })


@pytest.fixture
def customers():
    return pd.DataFrame({
        "customer_id": ["C001", "C002", "C003"],
        "region": ["NORTH", "NORTH", "SOUTH"],
        "tier": ["gold", "silver", "gold"],
    })


def test_total_revenue_per_customer(orders, customers):
    from src.transformers import compute_order_metrics
    result = compute_order_metrics(orders, customers)

    expected = pd.DataFrame({
        "customer_id": ["C001", "C002", "C003"],
        "total_revenue": [250.0, 250.0, 300.0],
    })[["customer_id", "total_revenue"]]

    assert_frame_equal(
        result[["customer_id", "total_revenue"]].reset_index(drop=True),
        expected.reset_index(drop=True),
        check_dtype=False,   # Allow int64 vs float64 differences
        rtol=1e-5,           # Relative tolerance for floats
    )


def test_order_count_per_customer(orders, customers):
    from src.transformers import compute_order_metrics
    result = compute_order_metrics(orders, customers)

    c001 = result.loc[result["customer_id"] == "C001", "order_count"].iloc[0]
    c002 = result.loc[result["customer_id"] == "C002", "order_count"].iloc[0]
    c003 = result.loc[result["customer_id"] == "C003", "order_count"].iloc[0]

    assert c001 == 2
    assert c002 == 2
    assert c003 == 1


def test_first_and_last_order_dates(orders, customers):
    from src.transformers import compute_order_metrics
    result = compute_order_metrics(orders, customers)

    c001_row = result[result["customer_id"] == "C001"].iloc[0]
    assert c001_row["first_order_date"] == pd.Timestamp("2026-01-01")
    assert c001_row["last_order_date"] == pd.Timestamp("2026-01-15")


def test_revenue_rank_within_region(orders, customers):
    from src.transformers import compute_order_metrics
    result = compute_order_metrics(orders, customers)

    # NORTH: C001=250, C002=250 → both rank 1 (tie)
    # SOUTH: C003=300 → rank 1
    north_ranks = result[result["region"] == "NORTH"].set_index("customer_id")["revenue_rank"]
    south_ranks = result[result["region"] == "SOUTH"].set_index("customer_id")["revenue_rank"]

    assert north_ranks["C001"] == 1
    assert north_ranks["C002"] == 1  # Tie
    assert south_ranks["C003"] == 1


def test_output_columns_match_schema(orders, customers):
    from src.transformers import compute_order_metrics
    result = compute_order_metrics(orders, customers)

    expected_columns = {
        "customer_id", "total_revenue", "order_count", "avg_order_value",
        "first_order_date", "last_order_date", "region", "tier", "revenue_rank"
    }
    assert set(result.columns) == expected_columns


def test_output_is_one_row_per_customer(orders, customers):
    from src.transformers import compute_order_metrics
    result = compute_order_metrics(orders, customers)

    assert result["customer_id"].nunique() == len(result)
    assert not result["customer_id"].duplicated().any()


def test_left_join_preserves_unmatched_customers(orders):
    """Customers without matching orders should appear with NaN metrics."""
    customers_with_inactive = pd.DataFrame({
        "customer_id": ["C001", "C002", "C003", "C999"],  # C999 has no orders
        "region": ["NORTH", "NORTH", "SOUTH", "EAST"],
        "tier": ["gold", "silver", "gold", "bronze"],
    })
    from src.transformers import compute_order_metrics
    # This tests behavior — adjust if business logic requires excluding inactive customers
    result = compute_order_metrics(orders, customers_with_inactive)
    assert "C999" not in result["customer_id"].values  # Left join from orders side


def test_assert_frame_equal_float_tolerance():
    """Demonstrate float tolerance options in assert_frame_equal."""
    actual = pd.DataFrame({"revenue": [99.99999, 200.000001]})
    expected = pd.DataFrame({"revenue": [100.0, 200.0]})

    # This fails without tolerance
    with pytest.raises(AssertionError):
        assert_frame_equal(actual, expected)

    # This passes with relative tolerance
    assert_frame_equal(actual, expected, rtol=1e-3)

    # This passes with absolute tolerance
    assert_frame_equal(actual, expected, atol=0.01)

Testing Error Cases with pytest.raises

Python
# src/validators.py
import pandas as pd
from typing import List


class PipelineValidationError(Exception):
    """Raised when pipeline input fails validation."""
    def __init__(self, message: str, failed_checks: List[str]):
        super().__init__(message)
        self.failed_checks = failed_checks


def validate_and_raise(df: pd.DataFrame, required_columns: List[str]) -> pd.DataFrame:
    """Validate a DataFrame, raising PipelineValidationError if invalid."""
    failed = []

    missing_cols = [c for c in required_columns if c not in df.columns]
    if missing_cols:
        failed.append(f"Missing columns: {missing_cols}")

    if df.empty:
        failed.append("DataFrame is empty")

    if "order_id" in df.columns and df["order_id"].duplicated().any():
        failed.append("Duplicate order_id values detected")

    if failed:
        raise PipelineValidationError(
            f"Validation failed with {len(failed)} check(s)",
            failed_checks=failed,
        )

    return df
Python
# tests/unit/test_validators.py

def test_raises_pipeline_validation_error_for_missing_columns():
    df = pd.DataFrame({"order_id": ["O1"], "revenue": [100.0]})

    with pytest.raises(PipelineValidationError) as exc_info:
        validate_and_raise(df, required_columns=["order_id", "revenue", "customer_id"])

    # Inspect the exception
    error = exc_info.value
    assert "customer_id" in str(error)
    assert any("customer_id" in check for check in error.failed_checks)


def test_raises_for_empty_dataframe():
    df = pd.DataFrame(columns=["order_id", "revenue"])

    with pytest.raises(PipelineValidationError, match="empty"):
        validate_and_raise(df, required_columns=["order_id", "revenue"])


def test_raises_for_duplicate_order_ids():
    df = pd.DataFrame({
        "order_id": ["O1", "O1", "O2"],
        "revenue": [100.0, 150.0, 200.0],
    })

    with pytest.raises(PipelineValidationError) as exc_info:
        validate_and_raise(df, required_columns=["order_id", "revenue"])

    assert any("Duplicate" in check for check in exc_info.value.failed_checks)


def test_accumulates_all_failures_before_raising():
    """Should report all failures, not short-circuit at the first one."""
    df = pd.DataFrame({"x": []})  # Empty AND missing required columns

    with pytest.raises(PipelineValidationError) as exc_info:
        validate_and_raise(df, required_columns=["order_id", "revenue"])

    assert len(exc_info.value.failed_checks) >= 2


def test_returns_dataframe_when_valid():
    df = pd.DataFrame({
        "order_id": ["O1", "O2"],
        "revenue": [100.0, 200.0],
    })
    result = validate_and_raise(df, required_columns=["order_id", "revenue"])
    assert_frame_equal(result, df)


def test_raises_correct_exception_type_not_base_exception():
    """Ensure we raise the specific exception, not a generic one."""
    df = pd.DataFrame()

    # Verify it's specifically PipelineValidationError, not just Exception
    with pytest.raises(PipelineValidationError):
        validate_and_raise(df, required_columns=["order_id"])

    # This would also pass for any exception — be specific:
    with pytest.raises(PipelineValidationError) as exc_info:
        validate_and_raise(df, required_columns=["order_id"])
    assert type(exc_info.value) is PipelineValidationError

Testing Sample Data Fixtures vs Production Data Patterns

A common dilemma: test with tiny fixtures (fast, easy to reason about) or with production-scale data (finds real bugs)?

The answer is both, in the right places:

Python
# tests/conftest.py

@pytest.fixture(scope="session")
def minimal_orders_df():
    """3–5 rows. Use for logic tests where scale doesn't matter."""
    return pd.DataFrame({
        "order_id": ["O1", "O2", "O3"],
        "customer_id": ["C1", "C2", "C1"],
        "revenue": [100.0, 200.0, 150.0],
        "order_date": pd.to_datetime(["2026-01-01", "2026-01-02", "2026-01-03"]),
    })


@pytest.fixture(scope="session")
def production_scale_orders_df():
    """
    Statistically representative sample: 100k rows, realistic distributions.
    session-scoped — generated once per test run.
    Use for performance, memory, and distribution tests only.
    """
    import numpy as np
    rng = np.random.default_rng(42)
    n = 100_000

    revenues = np.concatenate([
        rng.lognormal(mean=4, sigma=1.5, size=int(n * 0.9)),  # Typical orders
        rng.uniform(5000, 50000, size=int(n * 0.1)),           # High-value outliers
    ])

    return pd.DataFrame({
        "order_id": [f"ORD-{i:08d}" for i in range(n)],
        "customer_id": [f"C{rng.integers(1, 5000):05d}" for _ in range(n)],
        "revenue": revenues.round(2),
        "order_date": pd.to_datetime(
            rng.integers(
                pd.Timestamp("2023-01-01").value,
                pd.Timestamp("2026-01-01").value,
                size=n,
            )
        ),
        "status": rng.choice(["shipped", "pending", "cancelled"], size=n, p=[0.65, 0.25, 0.10]),
    })


@pytest.mark.slow
def test_aggregation_handles_production_scale(production_scale_orders_df):
    from src.aggregators import compute_customer_revenue

    result = compute_customer_revenue(production_scale_orders_df)

    # Basic integrity: one row per customer
    assert result["customer_id"].nunique() == len(result)
    # Revenue should be non-negative (no aggregation bugs)
    assert (result["total_revenue"] >= 0).all()


def test_aggregation_logic_with_minimal_data(minimal_orders_df):
    from src.aggregators import compute_customer_revenue

    result = compute_customer_revenue(minimal_orders_df)

    c1 = result.loc[result["customer_id"] == "C1", "total_revenue"].iloc[0]
    assert c1 == pytest.approx(250.0)  # 100 + 150

Testing Incremental Pipeline Logic

Incremental pipelines only process new data since the last run. Testing requires controlling the watermark:

Python
# src/incremental.py
import pandas as pd
from datetime import date, datetime
from typing import Optional


class IncrementalProcessor:
    def __init__(self, watermark_store):
        self.watermark_store = watermark_store

    def get_watermark(self, pipeline_name: str) -> Optional[date]:
        return self.watermark_store.get(pipeline_name)

    def set_watermark(self, pipeline_name: str, new_watermark: date) -> None:
        self.watermark_store.set(pipeline_name, new_watermark)

    def process(self, df: pd.DataFrame, pipeline_name: str, date_col: str) -> pd.DataFrame:
        watermark = self.get_watermark(pipeline_name)

        if watermark is not None:
            new_records = df[df[date_col] > pd.Timestamp(watermark)]
        else:
            new_records = df.copy()

        if new_records.empty:
            return new_records

        max_date = new_records[date_col].max().date()
        self.set_watermark(pipeline_name, max_date)

        return new_records
Python
# tests/unit/test_incremental.py
from unittest.mock import MagicMock
import pandas as pd
from datetime import date
import pytest


@pytest.fixture
def sample_df():
    return pd.DataFrame({
        "order_id": ["O1", "O2", "O3", "O4"],
        "order_date": pd.to_datetime(["2026-04-01", "2026-04-15", "2026-05-01", "2026-05-07"]),
        "revenue": [100, 200, 300, 400],
    })


@pytest.fixture
def watermark_store():
    store = MagicMock()
    store.get.return_value = None  # Default: no watermark (first run)
    return store


def test_first_run_processes_all_records(sample_df, watermark_store):
    from src.incremental import IncrementalProcessor

    processor = IncrementalProcessor(watermark_store)
    result = processor.process(sample_df, "orders_pipeline", "order_date")

    assert len(result) == 4


def test_incremental_run_only_processes_new_records(sample_df, watermark_store):
    watermark_store.get.return_value = date(2026, 4, 30)

    from src.incremental import IncrementalProcessor
    processor = IncrementalProcessor(watermark_store)
    result = processor.process(sample_df, "orders_pipeline", "order_date")

    assert len(result) == 2
    assert set(result["order_id"]) == {"O3", "O4"}


def test_watermark_is_updated_after_processing(sample_df, watermark_store):
    from src.incremental import IncrementalProcessor
    processor = IncrementalProcessor(watermark_store)
    processor.process(sample_df, "orders_pipeline", "order_date")

    watermark_store.set.assert_called_once_with("orders_pipeline", date(2026, 5, 7))


def test_no_new_records_returns_empty_df(sample_df, watermark_store):
    watermark_store.get.return_value = date(2026, 12, 31)  # Future watermark

    from src.incremental import IncrementalProcessor
    processor = IncrementalProcessor(watermark_store)
    result = processor.process(sample_df, "orders_pipeline", "order_date")

    assert result.empty
    watermark_store.set.assert_not_called()  # Watermark not updated when no new data

Testing CLI Scripts with CliRunner

Data engineers build CLI tools for pipeline orchestration. Test them without spawning subprocesses using Typer's or Click's CliRunner.

Python
# src/cli.py
import typer
import pandas as pd
from pathlib import Path

app = typer.Typer()


@app.command()
def process(
    input_file: Path = typer.Argument(..., help="Input CSV file path"),
    output_file: Path = typer.Argument(..., help="Output Parquet file path"),
    drop_nulls: bool = typer.Option(True, help="Drop rows with null customer_id"),
    verbose: bool = typer.Option(False, "--verbose", "-v"),
):
    """Process a sales CSV and write to Parquet."""
    if not input_file.exists():
        typer.echo(f"Error: Input file not found: {input_file}", err=True)
        raise typer.Exit(code=1)

    df = pd.read_csv(input_file)
    initial_count = len(df)

    if drop_nulls and "customer_id" in df.columns:
        df = df.dropna(subset=["customer_id"])

    df.to_parquet(output_file, index=False)

    if verbose:
        typer.echo(f"Processed {initial_count} rows → {len(df)} rows written to {output_file}")
    else:
        typer.echo(f"Done: {len(df)} rows written.")
Python
# tests/unit/test_cli.py
import pytest
import pandas as pd
from typer.testing import CliRunner
from src.cli import app

runner = CliRunner()


@pytest.fixture
def input_csv(tmp_path):
    """Write a sample CSV and return its path."""
    df = pd.DataFrame({
        "order_id": ["O1", "O2", "O3"],
        "customer_id": ["C1", None, "C3"],
        "revenue": [100, 200, 300],
    })
    path = tmp_path / "input.csv"
    df.to_csv(path, index=False)
    return path


def test_cli_processes_file_successfully(input_csv, tmp_path):
    output = tmp_path / "output.parquet"
    result = runner.invoke(app, [str(input_csv), str(output)])

    assert result.exit_code == 0
    assert output.exists()
    df = pd.read_parquet(output)
    assert len(df) == 2  # One null row dropped


def test_cli_verbose_flag_shows_row_counts(input_csv, tmp_path):
    output = tmp_path / "output.parquet"
    result = runner.invoke(app, [str(input_csv), str(output), "--verbose"])

    assert result.exit_code == 0
    assert "3 rows" in result.output  # Initial count
    assert "2 rows" in result.output  # After dropping null


def test_cli_exits_with_error_for_missing_input(tmp_path):
    missing_file = tmp_path / "does_not_exist.csv"
    output = tmp_path / "output.parquet"

    result = runner.invoke(app, [str(missing_file), str(output)])
    assert result.exit_code == 1
    assert "not found" in result.output.lower() or "Error" in result.output


def test_cli_no_drop_nulls_flag_preserves_all_rows(input_csv, tmp_path):
    output = tmp_path / "output.parquet"
    result = runner.invoke(app, [str(input_csv), str(output), "--no-drop-nulls"])

    assert result.exit_code == 0
    df = pd.read_parquet(output)
    assert len(df) == 3  # All rows preserved including null customer_id


# Click-based CLI testing (same pattern, different runner import)
# from click.testing import CliRunner as ClickRunner

Testing FastAPI Endpoints with TestClient

Pipelines increasingly expose HTTP APIs for triggering runs, checking status, or serving transformation results.

Python
# src/api.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import pandas as pd
from datetime import date

app = FastAPI(title="Pipeline API")

# In-memory job store (use Redis/DB in production)
_jobs: dict = {}


class PipelineRunRequest(BaseModel):
    pipeline_name: str
    start_date: date
    end_date: date
    dry_run: bool = False


class PipelineRunResponse(BaseModel):
    job_id: str
    status: str
    message: Optional[str] = None


@app.get("/health")
def health_check():
    return {"status": "healthy", "version": "1.0.0"}


@app.post("/pipeline/run", response_model=PipelineRunResponse)
def trigger_pipeline_run(
    request: PipelineRunRequest,
    background_tasks: BackgroundTasks,
):
    if request.end_date < request.start_date:
        raise HTTPException(
            status_code=422,
            detail="end_date must be >= start_date"
        )

    job_id = f"job-{request.pipeline_name}-{request.start_date}-{request.end_date}"

    if job_id in _jobs:
        return PipelineRunResponse(
            job_id=job_id,
            status="already_running",
            message="A job with these parameters is already active",
        )

    _jobs[job_id] = "queued"
    return PipelineRunResponse(job_id=job_id, status="queued")


@app.get("/pipeline/status/{job_id}", response_model=PipelineRunResponse)
def get_job_status(job_id: str):
    if job_id not in _jobs:
        raise HTTPException(status_code=404, detail=f"Job not found: {job_id}")
    return PipelineRunResponse(job_id=job_id, status=_jobs[job_id])
Python
# tests/unit/test_api.py
import pytest
from fastapi.testclient import TestClient
from src.api import app, _jobs


@pytest.fixture(autouse=True)
def clear_job_store():
    """Clear the in-memory job store before each test."""
    _jobs.clear()
    yield
    _jobs.clear()


@pytest.fixture
def client():
    return TestClient(app)


def test_health_check_returns_200(client):
    response = client.get("/health")
    assert response.status_code == 200
    assert response.json()["status"] == "healthy"


def test_trigger_pipeline_run_returns_job_id(client):
    response = client.post("/pipeline/run", json={
        "pipeline_name": "orders_daily",
        "start_date": "2026-05-01",
        "end_date": "2026-05-07",
    })
    assert response.status_code == 200
    data = response.json()
    assert "job_id" in data
    assert data["status"] == "queued"
    assert "orders_daily" in data["job_id"]


def test_trigger_pipeline_rejects_invalid_date_range(client):
    response = client.post("/pipeline/run", json={
        "pipeline_name": "orders_daily",
        "start_date": "2026-05-07",
        "end_date": "2026-05-01",  # end < start
    })
    assert response.status_code == 422
    assert "end_date" in response.json()["detail"].lower()


def test_trigger_pipeline_dry_run_flag(client):
    response = client.post("/pipeline/run", json={
        "pipeline_name": "orders_daily",
        "start_date": "2026-05-01",
        "end_date": "2026-05-07",
        "dry_run": True,
    })
    assert response.status_code == 200
    assert response.json()["status"] == "queued"


def test_duplicate_job_returns_already_running(client):
    payload = {
        "pipeline_name": "orders_daily",
        "start_date": "2026-05-01",
        "end_date": "2026-05-07",
    }
    # First trigger
    first = client.post("/pipeline/run", json=payload)
    assert first.status_code == 200

    # Second trigger with same params
    second = client.post("/pipeline/run", json=payload)
    assert second.status_code == 200
    assert second.json()["status"] == "already_running"


def test_get_job_status_for_existing_job(client):
    # Create a job first
    create = client.post("/pipeline/run", json={
        "pipeline_name": "customers_daily",
        "start_date": "2026-05-01",
        "end_date": "2026-05-01",
    })
    job_id = create.json()["job_id"]

    # Check status
    status = client.get(f"/pipeline/status/{job_id}")
    assert status.status_code == 200
    assert status.json()["status"] == "queued"


def test_get_job_status_404_for_unknown_job(client):
    response = client.get("/pipeline/status/nonexistent-job-id")
    assert response.status_code == 404
    assert "not found" in response.json()["detail"].lower()


def test_pipeline_run_missing_required_field(client):
    response = client.post("/pipeline/run", json={
        "pipeline_name": "orders_daily",
        # Missing start_date and end_date
    })
    assert response.status_code == 422  # FastAPI validation error

testcontainers-python: Real Database Tests

testcontainers-python starts real Docker containers for your tests. Use it for integration tests that need to verify SQL logic, index behavior, or stored procedures.

Bash
pip install testcontainers[postgres]
# Requires Docker Desktop running
Python
# tests/integration/test_postgres_pipeline.py
import pytest
import pandas as pd
import psycopg2
from testcontainers.postgres import PostgresContainer
from sqlalchemy import create_engine


@pytest.fixture(scope="module")
def postgres_container():
    """
    Start a PostgreSQL container once for the entire module.
    Destroyed after all tests in the module complete.
    """
    with PostgresContainer("postgres:16-alpine") as postgres:
        yield postgres


@pytest.fixture(scope="module")
def db_engine(postgres_container):
    """SQLAlchemy engine connected to the test PostgreSQL container."""
    engine = create_engine(postgres_container.get_connection_url())
    return engine


@pytest.fixture(scope="module")
def initialized_db(db_engine):
    """Create schema and seed reference data once per module."""
    with db_engine.begin() as conn:
        conn.execute("""
            CREATE TABLE IF NOT EXISTS orders (
                order_id VARCHAR(50) PRIMARY KEY,
                customer_id VARCHAR(50) NOT NULL,
                revenue NUMERIC(12, 2) NOT NULL,
                order_date DATE NOT NULL,
                status VARCHAR(20) DEFAULT 'pending',
                created_at TIMESTAMP DEFAULT NOW()
            )
        """)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS customers (
                customer_id VARCHAR(50) PRIMARY KEY,
                name VARCHAR(200),
                region VARCHAR(50),
                tier VARCHAR(20)
            )
        """)
        conn.execute("""
            INSERT INTO customers VALUES
                ('C001', 'Alice Martin', 'NORTH', 'gold'),
                ('C002', 'Bob Chen', 'SOUTH', 'silver'),
                ('C003', 'Carol White', 'NORTH', 'gold')
            ON CONFLICT DO NOTHING
        """)
    return db_engine


@pytest.fixture
def clean_orders(initialized_db):
    """Ensure orders table is empty before each test."""
    with initialized_db.begin() as conn:
        conn.execute("DELETE FROM orders")
    yield initialized_db
    with initialized_db.begin() as conn:
        conn.execute("DELETE FROM orders")


@pytest.mark.integration
def test_insert_and_query_orders(clean_orders):
    from src.db_repository import OrderRepository

    repo = OrderRepository(engine=clean_orders)
    orders = [
        {"order_id": "O1", "customer_id": "C001", "revenue": 150.00, "order_date": "2026-05-01", "status": "shipped"},
        {"order_id": "O2", "customer_id": "C002", "revenue": 75.50, "order_date": "2026-05-02", "status": "pending"},
    ]
    repo.bulk_insert(orders)

    result = repo.get_all()
    assert len(result) == 2
    assert set(result["order_id"].tolist()) == {"O1", "O2"}


@pytest.mark.integration
def test_revenue_aggregation_query(clean_orders):
    """Test a specific SQL aggregation query against real PostgreSQL."""
    with clean_orders.begin() as conn:
        conn.execute("""
            INSERT INTO orders (order_id, customer_id, revenue, order_date)
            VALUES
                ('O1', 'C001', 100.00, '2026-05-01'),
                ('O2', 'C001', 200.00, '2026-05-02'),
                ('O3', 'C002', 300.00, '2026-05-01')
        """)

    from src.db_repository import OrderRepository
    repo = OrderRepository(engine=clean_orders)
    result = repo.get_customer_revenue_summary()

    c001_revenue = result.loc[result["customer_id"] == "C001", "total_revenue"].iloc[0]
    assert float(c001_revenue) == pytest.approx(300.0)


@pytest.mark.integration
def test_primary_key_constraint_is_enforced(clean_orders):
    """Verify that duplicate order_id raises IntegrityError."""
    import sqlalchemy.exc

    with clean_orders.begin() as conn:
        conn.execute("INSERT INTO orders (order_id, customer_id, revenue, order_date) VALUES ('O1', 'C001', 100, '2026-05-01')")

    with pytest.raises(sqlalchemy.exc.IntegrityError):
        with clean_orders.begin() as conn:
            conn.execute("INSERT INTO orders (order_id, customer_id, revenue, order_date) VALUES ('O1', 'C002', 200, '2026-05-02')")

Test Coverage with pytest-cov

Bash
pip install pytest-cov

Running with Coverage

Bash
# Terminal report: show uncovered lines
pytest --cov=src --cov-report=term-missing

# HTML report: open in browser
pytest --cov=src --cov-report=html:htmlcov

# XML report: for CI tools (SonarQube, Codecov, etc.)
pytest --cov=src --cov-report=xml:coverage.xml

# Fail if coverage drops below threshold
pytest --cov=src --cov-fail-under=80

# Combine all report formats
pytest --cov=src \
       --cov-report=term-missing \
       --cov-report=html:htmlcov \
       --cov-report=xml:coverage.xml \
       --cov-fail-under=80

.coveragerc (or pyproject.toml equivalent)

TOML
# pyproject.toml
[tool.coverage.run]
source = ["src"]
branch = true   # Track branch coverage (if/else arms)
omit = [
    "tests/*",
    "src/migrations/*",
    "src/__init__.py",
    "src/*/conftest.py",
    "src/cli.py",        # CLI entry points — integration tested separately
]

[tool.coverage.report]
precision = 2
show_missing = true
exclude_lines = [
    "pragma: no cover",
    "if TYPE_CHECKING:",
    "raise NotImplementedError",
    "@(abc\\.)?abstractmethod",
    "if __name__ == ['\"]__main__['\"]:",
    "pass",
]

[tool.coverage.html]
directory = "htmlcov"

[tool.coverage.xml]
output = "coverage.xml"

pragma: no cover

Mark code that legitimately cannot be tested (or shouldn't block coverage thresholds):

Python
def debug_dump(df: pd.DataFrame) -> None:  # pragma: no cover
    """Development-only debug function — excluded from coverage."""
    print(df.to_string())


if __name__ == "__main__":  # pragma: no cover
    app()

Test Organization for Data Engineering Projects

Here is a production-ready test directory structure:

tests/
ā”œā”€ā”€ conftest.py                     # Root: session/module fixtures, markers
ā”œā”€ā”€ fixtures/                       # Static test data files
│   ā”œā”€ā”€ orders_sample.csv
│   ā”œā”€ā”€ customers_sample.csv
│   ā”œā”€ā”€ schemas/
│   │   ā”œā”€ā”€ orders_v1.json
│   │   └── customers_v1.json
│   └── parquet/
│       └── test_orders.parquet
ā”œā”€ā”€ unit/                           # Fast, isolated, no I/O
│   ā”œā”€ā”€ conftest.py                 # Unit-specific fixtures
│   ā”œā”€ā”€ test_transformers.py
│   ā”œā”€ā”€ test_validators.py
│   ā”œā”€ā”€ test_aggregators.py
│   ā”œā”€ā”€ test_enrichment.py
│   └── test_incremental.py
ā”œā”€ā”€ integration/                    # Slower, external services
│   ā”œā”€ā”€ conftest.py                 # DB connections, container fixtures
│   ā”œā”€ā”€ test_postgres_pipeline.py
│   ā”œā”€ā”€ test_s3_reader.py
│   └── test_snowflake_query.py
ā”œā”€ā”€ api/                            # FastAPI endpoint tests
│   ā”œā”€ā”€ conftest.py
│   └── test_pipeline_api.py
ā”œā”€ā”€ cli/                            # CLI command tests
│   └── test_process_command.py
└── e2e/                            # Full pipeline runs (slowest)
    ā”œā”€ā”€ conftest.py
    └── test_orders_pipeline_e2e.py

Makefile Targets for Common Test Commands

MAKEFILE
# Makefile
.PHONY: test test-unit test-integration test-coverage test-ci

# Fast feedback loop during development
test-unit:
	pytest -x -vs -m "unit and not slow" --tb=short

# Run only integration tests (requires services)
test-integration:
	pytest -m integration --tb=short

# Full suite with coverage
test-coverage:
	pytest \
		--cov=src \
		--cov-report=term-missing \
		--cov-report=html:htmlcov \
		--cov-fail-under=80

# What CI runs
test-ci:
	pytest \
		-m "not slow" \
		--junit-xml=test-results/junit.xml \
		--cov=src \
		--cov-report=xml:coverage.xml \
		--cov-fail-under=80

CI Integration: GitHub Actions

YAML
# .github/workflows/test.yml
name: Tests

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

jobs:
  unit-tests:
    name: Unit Tests (Python ${{ matrix.python-version }})
    runs-on: ubuntu-latest
    strategy:
      matrix:
        python-version: ["3.11", "3.12"]
      fail-fast: false  # Run all versions even if one fails

    steps:
      - uses: actions/checkout@v4

      - name: Set up Python ${{ matrix.python-version }}
        uses: actions/setup-python@v5
        with:
          python-version: ${{ matrix.python-version }}
          cache: pip

      - name: Install dependencies
        run: |
          pip install --upgrade pip
          pip install -e ".[test]"

      - name: Run unit tests
        run: |
          pytest \
            -m "unit and not slow" \
            --junit-xml=test-results/unit-${{ matrix.python-version }}.xml \
            --cov=src \
            --cov-report=xml:coverage-${{ matrix.python-version }}.xml \
            --cov-fail-under=80 \
            -v

      - name: Upload test results
        uses: actions/upload-artifact@v4
        if: always()
        with:
          name: test-results-${{ matrix.python-version }}
          path: test-results/

      - name: Upload coverage to Codecov
        uses: codecov/codecov-action@v4
        with:
          file: coverage-${{ matrix.python-version }}.xml
          flags: unit-tests
          fail_ci_if_error: true

  integration-tests:
    name: Integration Tests
    runs-on: ubuntu-latest
    needs: unit-tests  # Only run if unit tests pass

    services:
      postgres:
        image: postgres:16-alpine
        env:
          POSTGRES_DB: test_pipeline
          POSTGRES_USER: postgres
          POSTGRES_PASSWORD: testpassword
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5
        ports:
          - 5432:5432

    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.12"
          cache: pip

      - name: Install dependencies
        run: pip install -e ".[test]"

      - name: Run integration tests
        env:
          TEST_DB_HOST: localhost
          TEST_DB_PORT: 5432
          TEST_DB_NAME: test_pipeline
          TEST_DB_USER: postgres
          TEST_DB_PASSWORD: testpassword
        run: |
          pytest \
            -m integration \
            --junit-xml=test-results/integration.xml \
            -v \
            --tb=short

      - name: Upload integration test results
        uses: actions/upload-artifact@v4
        if: always()
        with:
          name: integration-test-results
          path: test-results/

  slow-tests:
    name: Slow / Nightly Tests
    runs-on: ubuntu-latest
    # Only run on schedule or manual trigger — not every PR
    if: github.event_name == 'schedule' || github.event_name == 'workflow_dispatch'

    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.12"
          cache: pip
      - run: pip install -e ".[test]"
      - name: Run slow tests
        run: pytest -m slow --junit-xml=test-results/slow.xml -v

# Separate nightly workflow for slow tests
YAML
# .github/workflows/nightly.yml
name: Nightly Full Suite

on:
  schedule:
    - cron: "0 2 * * *"  # 2 AM UTC every day
  workflow_dispatch:       # Manual trigger from GitHub UI

jobs:
  full-suite:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.12"
          cache: pip
      - run: pip install -e ".[test]"
      - name: Run full test suite including slow tests
        run: |
          pytest \
            --junit-xml=test-results/full-suite.xml \
            --cov=src \
            --cov-report=html:htmlcov \
            --cov-report=xml \
            -v
      - name: Upload coverage HTML
        uses: actions/upload-artifact@v4
        with:
          name: coverage-report
          path: htmlcov/

pyproject.toml Test Dependencies

TOML
# pyproject.toml
[project.optional-dependencies]
test = [
    "pytest>=8.0",
    "pytest-cov>=5.0",
    "pytest-mock>=3.12",
    "pytest-xdist>=3.5",       # Parallel test execution
    "pytest-asyncio>=0.23",     # Async test support
    "pandas>=2.0",
    "pyarrow>=14.0",
    "freezegun>=1.4",
    "responses>=0.25",
    "respx>=0.21",
    "testcontainers[postgres]>=4.0",
    "httpx>=0.27",              # For FastAPI TestClient
    "typer[all]>=0.12",
]

Parallel Test Execution with pytest-xdist

Bash
pip install pytest-xdist

# Run across 4 workers
pytest -n 4

# Auto-detect CPU count
pytest -n auto

# Distribute by file (good for independent test modules)
pytest -n auto --dist=loadfile

# Distribute by module group
pytest -n auto --dist=loadscope

Important: session-scoped fixtures are NOT shared across workers with xdist. Use scope="session" only for truly independent, read-only fixtures, or use tmp_path_factory for worker-local temp dirs.

Python
@pytest.fixture(scope="session")
def worker_id(tmp_path_factory):
    """Unique ID for each xdist worker — use to avoid resource conflicts."""
    import os
    return os.environ.get("PYTEST_XDIST_WORKER", "master")

Quick Reference: Reading Test Output

Bash
# The -ra flag (show all non-passed) output summary:
# FAILED tests/unit/test_validators.py::test_revenue_positive - AssertionError
# ERROR  tests/integration/test_db.py::test_insert - ConnectionError
# SKIPPED tests/integration/test_api.py::test_export (credentials not set)
# XFAILED tests/unit/test_parsers.py::test_edge_case (known bug JIRA-1234)

# Exit codes:
# 0 — all tests passed
# 1 — some tests failed
# 2 — interrupted (Ctrl+C)
# 3 — internal error
# 4 — command-line usage error
# 5 — no tests collected

# Coverage output:
# Name                     Stmts   Miss Branch BrPart  Cover   Missing
# src/transformers.py         45      3     12      2    93%   42-44, 87
# src/validators.py           28      0      8      0   100%
# TOTAL                       73      3     20      2    95%

Summary

  • Use pd.testing.assert_frame_equal for DataFrame comparisons — it gives column-level diffs and supports float tolerances
  • Test exception types and messages with pytest.raises — always check the exception attributes, not just that it was raised
  • Keep test data in two tiers: tiny fixtures for logic tests, production-scale fixtures (session-scoped) for performance tests
  • TestClient (FastAPI) and CliRunner (Typer/Click) test HTTP endpoints and CLI commands without spawning real servers
  • testcontainers gives you real PostgreSQL (and any other Docker-based service) in integration tests
  • Configure pytest-cov with branch coverage and a --cov-fail-under threshold in CI
  • Separate unit, integration, slow, and e2e tests with markers — run them on different schedules
  • Use pytest-xdist for parallel execution when unit tests grow beyond 30 seconds

This completes the pytest series. The four lessons build a complete testing stack for data engineering pipelines: fundamentals → fixtures & parametrize → mocking → full pipeline testing. Apply these patterns from day one, and refactoring your pipelines becomes safe, confident work.

Enjoyed this article?

Explore the Data Engineering learning path for more.

Found this helpful?

Share:š•

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.