Back to blog
Data Engineeringbeginner

pytest Fundamentals for Data Engineers

Master pytest from the ground up: configuration, test discovery, assert rewriting, markers, filtering, and testing real pandas pipelines and ETL classes with production-quality patterns.

LearnixoMay 7, 202616 min read
pytestpythontestingdata-engineeringpandasetl
Share:𝕏

pytest Fundamentals for Data Engineers

Testing data pipelines is not optional. Without tests, every refactor is a gamble, every deployment is a prayer, and every data quality incident is a surprise. This guide builds a solid pytest foundation using examples you will encounter in real data engineering work — pandas transformations, validation functions, and ETL classes.

Why pytest Over unittest

Python ships with unittest, but pytest has become the standard for good reasons:

  • Plain assert statements with rich diff output — no assertEqual, assertIn, assertRaises ceremony
  • Automatic test discovery — no base classes required
  • Fixture system that scales from unit tests to integration suites
  • Plugin ecosystem: pytest-cov, pytest-mock, pytest-xdist, pytest-asyncio, and hundreds more
  • Parameterization as a first-class feature
Python
# unittest style  verbose boilerplate
import unittest

class TestValidation(unittest.TestCase):
    def test_non_null_columns(self):
        import pandas as pd
        df = pd.DataFrame({"id": [1, 2, None]})
        self.assertFalse(df["id"].isnull().any(), "id column contains nulls")

# pytest style  readable, minimal
import pandas as pd

def test_non_null_columns():
    df = pd.DataFrame({"id": [1, 2, None]})
    assert not df["id"].isnull().any(), "id column contains nulls"

Installation

Bash
# Minimal install
pip install pytest

# Data engineering test stack
pip install pytest pytest-cov pytest-mock pytest-xdist pandas pyarrow

# Lock versions in your pyproject.toml

Verify the install:

Bash
pytest --version
# pytest 8.x.x

Configuration: pyproject.toml vs pytest.ini

pyproject.toml (Recommended)

Modern Python projects consolidate all tooling configuration into pyproject.toml. This avoids the sprawl of setup.cfg, pytest.ini, .coveragerc, and friends.

TOML
# pyproject.toml

[tool.pytest.ini_options]
# Where pytest looks for tests
testpaths = ["tests"]

# Minimum pytest version required
minversion = "8.0"

# Default flags applied to every run
addopts = [
    "--strict-markers",     # Error on unknown markers instead of warning
    "--strict-config",      # Error on config warnings
    "-ra",                  # Show short summary for all except passed
    "--tb=short",           # Shorter tracebacks
]

# Register custom markers (required when --strict-markers is set)
markers = [
    "unit: fast, isolated unit tests",
    "integration: tests that hit real databases or external services",
    "slow: tests that take more than 5 seconds",
    "smoke: minimal set to verify basic functionality",
]

# Log capture settings
log_cli = true
log_cli_level = "INFO"
log_format = "%(asctime)s %(levelname)s %(message)s"
log_date_format = "%Y-%m-%d %H:%M:%S"

# Coverage via pytest-cov (when using --cov flag)
# Separate [tool.coverage] section handles .coveragerc equivalent

[tool.coverage.run]
source = ["src"]
omit = ["tests/*", "*/migrations/*", "*/conftest.py"]

[tool.coverage.report]
exclude_lines = [
    "pragma: no cover",
    "if TYPE_CHECKING:",
    "raise NotImplementedError",
    "if __name__ == .__main__.:",
]

pytest.ini (Legacy but still common)

If you are working in a project that predates pyproject.toml adoption:

INI
# pytest.ini
[pytest]
testpaths = tests
minversion = 8.0
addopts = --strict-markers --strict-config -ra --tb=short
markers =
    unit: fast isolated unit tests
    integration: tests requiring external services
    slow: tests taking more than 5 seconds
log_cli = true
log_cli_level = INFO

setup.cfg (Avoid for new projects)

INI
# setup.cfg  still works, but migrate to pyproject.toml
[tool:pytest]
testpaths = tests
addopts = --strict-markers

Test Discovery Rules

pytest discovers tests by walking directories and applying these rules in order:

  1. Start from testpaths (or current directory if not set)
  2. Recurse into directories that match norecursedirs exclusion patterns
  3. Collect files matching python_files pattern (default: test_*.py or *_test.py)
  4. Inside those files, collect functions matching python_functions (default: test_*)
  5. Inside those files, collect classes matching python_classes (default: Test*)
  6. Inside Test* classes, collect methods matching test_*
tests/
├── conftest.py               # Shared fixtures — auto-loaded
├── unit/
│   ├── __init__.py           # Not required, but common
│   ├── test_validators.py    # Discovered
│   └── test_transformers.py  # Discovered
├── integration/
│   ├── test_database.py      # Discovered
│   └── test_api.py           # Discovered
└── fixtures/
    └── sample_data.csv       # NOT collected (no test_ prefix)

Customize discovery in pyproject.toml:

TOML
[tool.pytest.ini_options]
python_files = ["test_*.py", "*_test.py", "check_*.py"]
python_classes = ["Test*", "Check*"]
python_functions = ["test_*", "check_*"]
norecursedirs = [".git", "node_modules", ".venv", "dist", "build", "__pycache__"]

Assert Rewriting: The pytest Superpower

pytest rewrites assert statements at import time to provide detailed failure messages without any extra code. This is one of the most underappreciated features.

Python
import pandas as pd
import numpy as np

def test_assert_rewriting_demo():
    expected = pd.DataFrame({
        "customer_id": [1, 2, 3],
        "revenue": [100.0, 200.0, 300.0],
    })
    actual = pd.DataFrame({
        "customer_id": [1, 2, 3],
        "revenue": [100.0, 250.0, 300.0],  # 200  250 intentional mismatch
    })

    # Plain assert  pytest makes this informative
    assert list(actual["revenue"]) == list(expected["revenue"])
    # Output:
    # AssertionError: assert [100.0, 250.0, 300.0] == [100.0, 200.0, 300.0]
    # At index 1 diff: 250.0 != 200.0

For DataFrame comparisons, use pd.testing.assert_frame_equal which gives column-level diffs:

Python
def test_dataframe_equality():
    expected = pd.DataFrame({"a": [1, 2], "b": [3, 4]})
    actual = pd.DataFrame({"a": [1, 2], "b": [3, 5]})  # b[1] differs

    pd.testing.assert_frame_equal(actual, expected)
    # AssertionError: DataFrame.iloc[:, 1] are different
    # Column values are different (50.0 %)
    # [left]:  [3, 5]
    # [right]: [3, 4]

Test Functions vs Test Classes

Test Functions (Preferred for most cases)

Python
# tests/unit/test_validators.py

def test_revenue_is_positive():
    assert validate_revenue(100.0) is True

def test_revenue_rejects_negative():
    assert validate_revenue(-1.0) is False

def test_revenue_rejects_zero():
    assert validate_revenue(0.0) is False

Test Classes (Use for grouping related tests with shared setup)

Test classes in pytest do NOT inherit from unittest.TestCase. They are plain classes.

Python
class TestRevenueValidator:
    """Group tests for the revenue validation function."""

    def test_accepts_positive_integer(self):
        assert validate_revenue(100) is True

    def test_accepts_positive_float(self):
        assert validate_revenue(99.99) is True

    def test_rejects_negative(self):
        assert validate_revenue(-1) is False

    def test_rejects_zero(self):
        assert validate_revenue(0) is False

    def test_rejects_none(self):
        assert validate_revenue(None) is False

    def test_rejects_string(self):
        assert validate_revenue("100") is False

Key rule: Do NOT use __init__ in test classes. pytest does not call __init__ and will refuse to collect classes that define it.

Markers

Markers let you label tests for selective execution. Register all custom markers in pyproject.toml when using --strict-markers.

Built-in Markers

Python
import pytest

# Skip a test unconditionally
@pytest.mark.skip(reason="API endpoint not yet implemented")
def test_export_api():
    ...

# Skip conditionally
@pytest.mark.skipif(
    condition=not is_snowflake_available(),
    reason="Snowflake credentials not configured"
)
def test_snowflake_query():
    ...

# Mark as expected failure
@pytest.mark.xfail(reason="Known bug in date parsing — JIRA-1234")
def test_date_parsing_edge_case():
    ...

# Mark as expected failure, fail if it unexpectedly passes
@pytest.mark.xfail(strict=True, reason="Should fail until fix is merged")
def test_future_feature():
    ...

Custom Markers for Data Engineering

Python
# tests/unit/test_transformers.py
import pytest

@pytest.mark.unit
def test_clean_phone_numbers():
    from src.transformers import clean_phone
    assert clean_phone("+1 (555) 867-5309") == "15558675309"
    assert clean_phone("555.867.5309") == "5558675309"
    assert clean_phone(None) is None

@pytest.mark.unit
def test_normalize_currency():
    from src.transformers import normalize_currency
    assert normalize_currency("$1,234.56") == 1234.56
    assert normalize_currency("EUR 500") == 500.0

@pytest.mark.integration
def test_pipeline_reads_from_s3():
    # Hits real S3  only run in CI with credentials
    ...

@pytest.mark.slow
def test_full_historical_backfill():
    # Processes 10M rows  only run nightly
    ...

Apply markers at the class level to tag all methods:

Python
@pytest.mark.unit
class TestPhoneNormalizer:
    def test_us_format(self): ...
    def test_international_format(self): ...
    def test_strips_whitespace(self): ...
    # All three tests get the 'unit' marker

Running Tests: Flags That Matter

Basic Execution

Bash
# Run all tests
pytest

# Run with verbose output (shows each test name)
pytest -v

# Run with extra verbose (shows fixture setup/teardown)
pytest -vv

# Show print() output and logging.info() during tests
pytest -s

# Combine: verbose + show output
pytest -vs

# Stop at first failure
pytest -x

# Stop after N failures
pytest --maxfail=3

Filtering with -k

The -k flag filters tests by name expression. It supports and, or, not, and substring matching.

Bash
# Run tests whose name contains "revenue"
pytest -k revenue

# Run unit tests only
pytest -k unit

# Run tests matching either pattern
pytest -k "phone or currency"

# Exclude slow tests
pytest -k "not slow"

# Complex expression
pytest -k "unit and not database"

# Run a specific test file
pytest tests/unit/test_validators.py

# Run a specific test function
pytest tests/unit/test_validators.py::test_revenue_is_positive

# Run a specific test class
pytest tests/unit/test_validators.py::TestRevenueValidator

# Run a specific method in a class
pytest tests/unit/test_validators.py::TestRevenueValidator::test_accepts_positive_float

Running by Marker

Bash
# Run only unit tests
pytest -m unit

# Run unit and smoke tests
pytest -m "unit or smoke"

# Run everything except integration and slow
pytest -m "not integration and not slow"

Output Formats

Bash
# Short traceback (good default)
pytest --tb=short

# Long traceback (most context)
pytest --tb=long

# Single line per failure
pytest --tb=line

# No traceback, just failure names
pytest --tb=no

# Native Python traceback style
pytest --tb=native

# JUnit XML for CI systems
pytest --junit-xml=test-results/junit.xml

# JSON report (requires pytest-json-report)
pytest --json-report --json-report-file=test-results/report.json

Real Example: Testing a Pandas Pipeline Function

Here is the source code under test:

Python
# src/transformers.py
import pandas as pd
import numpy as np
from typing import Optional


def normalize_sales_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Normalize raw sales data from CRM export.

    Transformations applied:
    - Strip whitespace from string columns
    - Normalize revenue: remove currency symbols, convert to float
    - Parse date columns to datetime
    - Drop rows where customer_id is null
    - Clip revenue to [0, 1_000_000] range (outlier handling)
    """
    if df.empty:
        return df

    result = df.copy()

    # Strip whitespace from object columns
    str_cols = result.select_dtypes(include="object").columns
    result[str_cols] = result[str_cols].apply(lambda col: col.str.strip())

    # Normalize revenue
    if "revenue" in result.columns:
        result["revenue"] = (
            result["revenue"]
            .astype(str)
            .str.replace(r"[$,€£]", "", regex=True)
            .str.strip()
            .replace("", np.nan)
            .astype(float)
            .clip(lower=0, upper=1_000_000)
        )

    # Parse sale_date
    if "sale_date" in result.columns:
        result["sale_date"] = pd.to_datetime(result["sale_date"], errors="coerce")

    # Drop rows with null customer_id
    if "customer_id" in result.columns:
        result = result.dropna(subset=["customer_id"])

    return result.reset_index(drop=True)

Now the tests:

Python
# tests/unit/test_transformers.py
import pytest
import pandas as pd
import numpy as np
from pandas.testing import assert_frame_equal

from src.transformers import normalize_sales_data


@pytest.mark.unit
class TestNormalizeSalesData:
    """Tests for the normalize_sales_data transformation."""

    def test_returns_empty_dataframe_unchanged(self):
        empty = pd.DataFrame()
        result = normalize_sales_data(empty)
        assert result.empty

    def test_strips_whitespace_from_string_columns(self):
        df = pd.DataFrame({
            "customer_id": ["C001", "  C002  ", "C003"],
            "region": [" NORTH", "SOUTH ", " EAST "],
        })
        result = normalize_sales_data(df)
        assert result["customer_id"].tolist() == ["C001", "C002", "C003"]
        assert result["region"].tolist() == ["NORTH", "SOUTH", "EAST"]

    def test_normalizes_revenue_with_dollar_sign(self):
        df = pd.DataFrame({
            "customer_id": ["C001"],
            "revenue": ["$1,234.56"],
        })
        result = normalize_sales_data(df)
        assert result["revenue"].iloc[0] == pytest.approx(1234.56)

    def test_normalizes_revenue_with_euro_sign(self):
        df = pd.DataFrame({
            "customer_id": ["C001"],
            "revenue": ["€500"],
        })
        result = normalize_sales_data(df)
        assert result["revenue"].iloc[0] == pytest.approx(500.0)

    def test_clips_revenue_at_upper_bound(self):
        df = pd.DataFrame({
            "customer_id": ["C001", "C002"],
            "revenue": ["2000000", "500000"],
        })
        result = normalize_sales_data(df)
        assert result.loc[result["customer_id"] == "C001", "revenue"].iloc[0] == 1_000_000
        assert result.loc[result["customer_id"] == "C002", "revenue"].iloc[0] == 500_000

    def test_clips_negative_revenue_to_zero(self):
        df = pd.DataFrame({
            "customer_id": ["C001"],
            "revenue": ["-500"],
        })
        result = normalize_sales_data(df)
        assert result["revenue"].iloc[0] == 0.0

    def test_drops_rows_with_null_customer_id(self):
        df = pd.DataFrame({
            "customer_id": ["C001", None, "C003"],
            "revenue": ["100", "200", "300"],
        })
        result = normalize_sales_data(df)
        assert len(result) == 2
        assert "C001" in result["customer_id"].values
        assert "C003" in result["customer_id"].values

    def test_resets_index_after_dropping_rows(self):
        df = pd.DataFrame({
            "customer_id": [None, "C002", "C003"],
            "revenue": ["100", "200", "300"],
        })
        result = normalize_sales_data(df)
        assert list(result.index) == [0, 1]

    def test_parses_sale_date_to_datetime(self):
        df = pd.DataFrame({
            "customer_id": ["C001"],
            "sale_date": ["2026-01-15"],
        })
        result = normalize_sales_data(df)
        assert pd.api.types.is_datetime64_any_dtype(result["sale_date"])
        assert result["sale_date"].iloc[0] == pd.Timestamp("2026-01-15")

    def test_coerces_invalid_dates_to_nat(self):
        df = pd.DataFrame({
            "customer_id": ["C001", "C002"],
            "sale_date": ["2026-01-15", "not-a-date"],
        })
        result = normalize_sales_data(df)
        assert pd.isna(result["sale_date"].iloc[1])

    def test_does_not_modify_original_dataframe(self):
        df = pd.DataFrame({
            "customer_id": ["  C001  "],
            "revenue": ["$100"],
        })
        original_customer_id = df["customer_id"].iloc[0]
        normalize_sales_data(df)
        # Original should be unchanged
        assert df["customer_id"].iloc[0] == original_customer_id

    def test_handles_dataframe_without_optional_columns(self):
        """Transformation should not fail if revenue or sale_date are absent."""
        df = pd.DataFrame({
            "customer_id": ["C001", "C002"],
            "name": ["Alice", "Bob"],
        })
        result = normalize_sales_data(df)
        assert list(result.columns) == ["customer_id", "name"]
        assert len(result) == 2

Real Example: Testing a Data Validation Function

Python
# src/validators.py
from typing import List
import pandas as pd


class ValidationResult:
    def __init__(self):
        self.errors: List[str] = []

    @property
    def is_valid(self) -> bool:
        return len(self.errors) == 0

    def add_error(self, message: str) -> None:
        self.errors.append(message)

    def __repr__(self) -> str:
        if self.is_valid:
            return "ValidationResult(valid)"
        return f"ValidationResult(errors={self.errors})"


def validate_pipeline_output(df: pd.DataFrame, required_columns: List[str]) -> ValidationResult:
    """Validate a pipeline output DataFrame against a schema."""
    result = ValidationResult()

    # Check required columns exist
    missing = [col for col in required_columns if col not in df.columns]
    if missing:
        result.add_error(f"Missing required columns: {missing}")

    # Check for completely empty DataFrame
    if df.empty:
        result.add_error("DataFrame is empty")
        return result

    # Check for duplicate primary key (assume first required column is PK)
    if required_columns and required_columns[0] in df.columns:
        pk_col = required_columns[0]
        duplicates = df[pk_col].duplicated().sum()
        if duplicates > 0:
            result.add_error(f"Found {duplicates} duplicate values in '{pk_col}'")

    # Check for all-null columns
    for col in required_columns:
        if col in df.columns and df[col].isnull().all():
            result.add_error(f"Column '{col}' is entirely null")

    return result
Python
# tests/unit/test_validators.py
import pytest
import pandas as pd

from src.validators import validate_pipeline_output, ValidationResult


@pytest.mark.unit
def test_valid_dataframe_passes_validation():
    df = pd.DataFrame({
        "order_id": ["O1", "O2", "O3"],
        "customer_id": ["C1", "C2", "C3"],
        "amount": [100, 200, 300],
    })
    result = validate_pipeline_output(df, required_columns=["order_id", "customer_id", "amount"])
    assert result.is_valid
    assert result.errors == []


@pytest.mark.unit
def test_fails_on_missing_required_column():
    df = pd.DataFrame({
        "order_id": ["O1", "O2"],
        "amount": [100, 200],
        # customer_id is missing
    })
    result = validate_pipeline_output(df, required_columns=["order_id", "customer_id", "amount"])
    assert not result.is_valid
    assert any("customer_id" in e for e in result.errors)


@pytest.mark.unit
def test_fails_on_empty_dataframe():
    df = pd.DataFrame(columns=["order_id", "customer_id"])
    result = validate_pipeline_output(df, required_columns=["order_id", "customer_id"])
    assert not result.is_valid
    assert any("empty" in e.lower() for e in result.errors)


@pytest.mark.unit
def test_fails_on_duplicate_primary_key():
    df = pd.DataFrame({
        "order_id": ["O1", "O1", "O3"],  # O1 duplicated
        "amount": [100, 200, 300],
    })
    result = validate_pipeline_output(df, required_columns=["order_id", "amount"])
    assert not result.is_valid
    assert any("duplicate" in e.lower() for e in result.errors)


@pytest.mark.unit
def test_fails_on_all_null_required_column():
    df = pd.DataFrame({
        "order_id": ["O1", "O2"],
        "customer_id": [None, None],
    })
    result = validate_pipeline_output(df, required_columns=["order_id", "customer_id"])
    assert not result.is_valid
    assert any("entirely null" in e for e in result.errors)


@pytest.mark.unit
def test_accumulates_multiple_errors():
    df = pd.DataFrame({
        "order_id": ["O1", "O1"],  # duplicate
        "customer_id": [None, None],  # all null
        # amount missing
    })
    result = validate_pipeline_output(df, required_columns=["order_id", "customer_id", "amount"])
    assert not result.is_valid
    assert len(result.errors) >= 2

Real Example: Testing an ETL Class

Python
# src/etl.py
import logging
from dataclasses import dataclass, field
from typing import Dict, Any
import pandas as pd

logger = logging.getLogger(__name__)


@dataclass
class ETLMetrics:
    rows_extracted: int = 0
    rows_transformed: int = 0
    rows_loaded: int = 0
    rows_dropped: int = 0
    errors: list = field(default_factory=list)


class SalesETL:
    """
    Extract → Transform → Load pipeline for sales data.

    Extract:  reads raw CSV
    Transform: normalizes, validates, enriches
    Load:     writes to Parquet
    """

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.metrics = ETLMetrics()

    def extract(self, path: str) -> pd.DataFrame:
        logger.info(f"Extracting from {path}")
        df = pd.read_csv(path)
        self.metrics.rows_extracted = len(df)
        return df

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        initial_count = len(df)

        # Drop rows missing required fields
        df = df.dropna(subset=["order_id", "customer_id"])

        # Normalize revenue
        df["revenue"] = pd.to_numeric(df["revenue"], errors="coerce").fillna(0.0)

        # Add derived column
        df["revenue_tier"] = pd.cut(
            df["revenue"],
            bins=[0, 100, 1000, float("inf")],
            labels=["low", "medium", "high"],
            include_lowest=True,
        )

        self.metrics.rows_transformed = len(df)
        self.metrics.rows_dropped = initial_count - len(df)
        return df

    def load(self, df: pd.DataFrame, output_path: str) -> None:
        logger.info(f"Loading {len(df)} rows to {output_path}")
        df.to_parquet(output_path, index=False)
        self.metrics.rows_loaded = len(df)

    def run(self, input_path: str, output_path: str) -> ETLMetrics:
        df = self.extract(input_path)
        df = self.transform(df)
        self.load(df, output_path)
        return self.metrics
Python
# tests/unit/test_etl.py
import pytest
import pandas as pd
import tempfile
import os
from pathlib import Path

from src.etl import SalesETL, ETLMetrics


@pytest.fixture
def etl():
    """Create a SalesETL instance with default config."""
    return SalesETL(config={"env": "test"})


@pytest.fixture
def sample_sales_df():
    """Sample sales DataFrame for transformation tests."""
    return pd.DataFrame({
        "order_id": ["O1", "O2", "O3", "O4"],
        "customer_id": ["C1", "C2", None, "C4"],
        "revenue": [50.0, 500.0, 200.0, None],
        "region": ["NORTH", "SOUTH", "EAST", "WEST"],
    })


@pytest.mark.unit
def test_transform_drops_null_customer_id(etl, sample_sales_df):
    result = etl.transform(sample_sales_df)
    assert len(result) == 3  # Row with customer_id=None dropped
    assert result["customer_id"].isnull().sum() == 0


@pytest.mark.unit
def test_transform_fills_null_revenue_with_zero(etl, sample_sales_df):
    result = etl.transform(sample_sales_df)
    # Row O4 had None revenue but survived (has customer_id)
    assert result["revenue"].isnull().sum() == 0
    o4_revenue = result.loc[result["order_id"] == "O4", "revenue"].iloc[0]
    assert o4_revenue == 0.0


@pytest.mark.unit
def test_transform_adds_revenue_tier_column(etl, sample_sales_df):
    result = etl.transform(sample_sales_df)
    assert "revenue_tier" in result.columns
    assert set(result["revenue_tier"].dropna().unique()).issubset({"low", "medium", "high"})


@pytest.mark.unit
def test_transform_classifies_revenue_tiers_correctly(etl):
    df = pd.DataFrame({
        "order_id": ["O1", "O2", "O3"],
        "customer_id": ["C1", "C2", "C3"],
        "revenue": [50.0, 500.0, 5000.0],
    })
    result = etl.transform(df)
    tiers = result.set_index("order_id")["revenue_tier"].astype(str)
    assert tiers["O1"] == "low"
    assert tiers["O2"] == "medium"
    assert tiers["O3"] == "high"


@pytest.mark.unit
def test_transform_updates_metrics(etl, sample_sales_df):
    etl.transform(sample_sales_df)
    assert etl.metrics.rows_transformed == 3
    assert etl.metrics.rows_dropped == 1


@pytest.mark.unit
def test_load_writes_parquet_file(etl):
    df = pd.DataFrame({
        "order_id": ["O1", "O2"],
        "customer_id": ["C1", "C2"],
        "revenue": [100.0, 200.0],
        "revenue_tier": ["low", "medium"],
    })
    with tempfile.TemporaryDirectory() as tmpdir:
        output_path = os.path.join(tmpdir, "output.parquet")
        etl.load(df, output_path)

        assert os.path.exists(output_path)
        loaded = pd.read_parquet(output_path)
        pd.testing.assert_frame_equal(loaded, df)


@pytest.mark.unit
def test_load_updates_rows_loaded_metric(etl):
    df = pd.DataFrame({"order_id": ["O1", "O2"], "revenue": [1.0, 2.0]})
    with tempfile.TemporaryDirectory() as tmpdir:
        etl.load(df, os.path.join(tmpdir, "out.parquet"))
    assert etl.metrics.rows_loaded == 2


@pytest.mark.integration
def test_full_run_produces_valid_output(etl, tmp_path):
    """Full extract-transform-load cycle using real files."""
    # Write sample CSV input
    input_csv = tmp_path / "sales.csv"
    pd.DataFrame({
        "order_id": ["O1", "O2", "O3"],
        "customer_id": ["C1", None, "C3"],
        "revenue": [100, 200, 300],
        "region": ["NORTH", "SOUTH", "EAST"],
    }).to_csv(input_csv, index=False)

    output_parquet = tmp_path / "output.parquet"
    metrics = etl.run(str(input_csv), str(output_parquet))

    assert metrics.rows_extracted == 3
    assert metrics.rows_loaded == 2  # One dropped due to null customer_id

    output = pd.read_parquet(output_parquet)
    assert len(output) == 2
    assert "revenue_tier" in output.columns

pytest.ini vs pyproject.toml: Decision Guide

| Scenario | Use | |----------|-----| | New project, Python 3.8+ | pyproject.toml | | Monorepo with multiple packages | pyproject.toml per package | | Legacy project, cannot touch pyproject.toml | pytest.ini | | Want to share config with setuptools/mypy/black | pyproject.toml | | CI environment with config injection | Either (env var PYTEST_ADDOPTS overrides both) |

Environment Variable Override

Bash
# Override addopts for a specific CI job without changing config files
PYTEST_ADDOPTS="--tb=long -v" pytest -m unit

# Set markers for a specific environment
PYTEST_CURRENT_ENV=ci pytest --co  # --co: collect only, don't run

Quick Reference: Most Used Commands

Bash
# Development loop: fast, verbose, stop on first failure
pytest -x -vs -m "not slow and not integration"

# Pre-commit: full unit suite with coverage
pytest -m unit --cov=src --cov-report=term-missing

# CI: all tests, JUnit output
pytest --junit-xml=results.xml --cov=src --cov-report=xml

# Debug a specific failing test
pytest tests/unit/test_etl.py::test_transform_drops_null_customer_id -vs --tb=long

# List all tests without running them
pytest --collect-only

# List all tests matching a marker
pytest --collect-only -m integration

# Run last failed tests
pytest --lf

# Run last failed then remaining
pytest --lf --ff

Summary

  • Use pyproject.toml for all pytest configuration in new projects
  • Register all custom markers with --strict-markers to catch typos early
  • Rely on pytest's assert rewriting — plain assert statements are expressive enough
  • Use pd.testing.assert_frame_equal for DataFrame equality checks
  • Separate unit tests (fast, no I/O) from integration tests (slow, real services) with markers
  • Use tmp_path (a built-in pytest fixture) for any test that needs temporary files
  • Run with --lf during development to iterate only on failing tests

The next lesson covers fixtures and parametrization — the tools that eliminate test duplication and make your test suite scale gracefully.

Enjoyed this article?

Explore the Data Engineering learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.