Learnixo

pytest: Testing Python Pipelines · Lesson 2 of 4

Fixtures & Parametrize: Reusable Test Infrastructure

pytest Fixtures and Parametrization for Data Pipelines

Fixtures and parametrization are the two features that separate a 50-test suite from a 500-test suite — without 500 lines of duplicated setup code. In data engineering, you constantly need: sample DataFrames, database connections, temp directories for output files, and exhaustive edge-case coverage for cleaning functions. This lesson covers all of it with production patterns.

What Is a Fixture?

A fixture is a function decorated with @pytest.fixture. pytest automatically discovers and injects fixtures into test functions by matching parameter names.

Python
import pytest
import pandas as pd

@pytest.fixture
def sample_df():
    return pd.DataFrame({
        "customer_id": ["C001", "C002", "C003"],
        "revenue": [100.0, 250.0, 75.0],
        "region": ["NORTH", "SOUTH", "EAST"],
    })

def test_revenue_sum(sample_df):
    # sample_df is automatically injected  no instantiation needed
    assert sample_df["revenue"].sum() == 425.0

def test_customer_count(sample_df):
    assert len(sample_df) == 3

This looks trivial, but fixtures compose, scope, and share across files — that is where the power lies.

Fixture Scope

The scope parameter controls how often pytest creates and destroys a fixture:

| Scope | Created once per... | Use for | |-------|---------------------|---------| | function | Test function (default) | Mutable state, DataFrames | | class | Test class | Shared setup across methods | | module | Test file | Expensive setup reused in one file | | session | Entire test run | Database connections, Docker containers |

function scope (Default)

Each test gets a fresh copy. Safe for mutable objects.

Python
@pytest.fixture(scope="function")
def clean_dataframe():
    """Fresh DataFrame for each test — mutations in one test don't bleed into others."""
    return pd.DataFrame({
        "id": [1, 2, 3],
        "value": [10.0, 20.0, 30.0],
    })

class scope

Created once for all methods in the class, destroyed after the last method runs.

Python
@pytest.fixture(scope="class")
def schema_validator():
    """Expensive to construct — reuse within a test class."""
    from src.validators import SchemaValidator
    return SchemaValidator.from_config("tests/fixtures/schema.json")

@pytest.mark.usefixtures("schema_validator")
class TestSchemaValidation:
    def test_valid_record_passes(self, schema_validator):
        result = schema_validator.validate({"id": 1, "name": "Alice"})
        assert result.is_valid

    def test_missing_id_fails(self, schema_validator):
        result = schema_validator.validate({"name": "Alice"})
        assert not result.is_valid

module scope

Shared across all tests in a single file. Good for an in-memory database loaded from a fixture CSV.

Python
@pytest.fixture(scope="module")
def loaded_reference_data():
    """Load reference tables once per test module — these are read-only."""
    import pandas as pd
    return {
        "country_codes": pd.read_csv("tests/fixtures/country_codes.csv"),
        "currency_rates": pd.read_csv("tests/fixtures/currency_rates.csv"),
    }

session scope

The fixture lives for the entire pytest run. Use for shared infrastructure: a Docker-started PostgreSQL, a DuckDB connection, or a large dataset loaded once.

Python
@pytest.fixture(scope="session")
def duckdb_connection():
    """
    Single DuckDB in-memory connection shared across all tests.
    DuckDB handles concurrent reads safely for in-memory connections.
    """
    import duckdb
    conn = duckdb.connect(":memory:")
    # Seed with test data
    conn.execute("""
        CREATE TABLE orders AS
        SELECT * FROM read_csv_auto('tests/fixtures/orders.csv')
    """)
    yield conn
    conn.close()

conftest.py: Sharing Fixtures Across Files

conftest.py files are automatically loaded by pytest. Any fixture defined there is available to all tests in the same directory and its subdirectories — no import required.

tests/
├── conftest.py           ← fixtures available to ALL tests
├── unit/
│   ├── conftest.py       ← fixtures available to tests/unit/** only
│   └── test_transformers.py
└── integration/
    ├── conftest.py       ← fixtures available to tests/integration/** only
    └── test_database.py

Root conftest.py (shared infrastructure)

Python
# tests/conftest.py
import pytest
import pandas as pd
import tempfile
import os
from pathlib import Path


@pytest.fixture(scope="session")
def test_data_dir() -> Path:
    """Absolute path to the test fixtures directory."""
    return Path(__file__).parent / "fixtures"


@pytest.fixture(scope="session")
def sample_orders_csv(test_data_dir) -> Path:
    """Path to the sample orders CSV file."""
    path = test_data_dir / "orders.csv"
    assert path.exists(), f"Fixture file not found: {path}"
    return path


@pytest.fixture
def sample_orders_df() -> pd.DataFrame:
    """
    A small, representative orders DataFrame.
    function-scoped so each test gets a fresh mutable copy.
    """
    return pd.DataFrame({
        "order_id": ["ORD-001", "ORD-002", "ORD-003", "ORD-004", "ORD-005"],
        "customer_id": ["C001", "C002", "C001", "C003", "C002"],
        "product_sku": ["SKU-A", "SKU-B", "SKU-A", "SKU-C", "SKU-B"],
        "quantity": [2, 1, 3, 1, 2],
        "unit_price": [29.99, 149.99, 29.99, 299.99, 149.99],
        "order_date": pd.to_datetime([
            "2026-01-01", "2026-01-02", "2026-01-03",
            "2026-01-04", "2026-01-05"
        ]),
        "status": ["shipped", "pending", "shipped", "cancelled", "shipped"],
    })


@pytest.fixture
def sample_customers_df() -> pd.DataFrame:
    return pd.DataFrame({
        "customer_id": ["C001", "C002", "C003"],
        "name": ["Alice Martin", "Bob Chen", "Carol White"],
        "region": ["NORTH", "SOUTH", "EAST"],
        "tier": ["gold", "silver", "gold"],
    })


@pytest.fixture
def temp_output_dir(tmp_path) -> Path:
    """
    A temporary directory for pipeline output.
    tmp_path is a built-in pytest fixture that provides a unique dir per test.
    """
    output = tmp_path / "pipeline_output"
    output.mkdir()
    return output

Integration conftest.py

Python
# tests/integration/conftest.py
import pytest
import os


def pytest_configure(config):
    """Register integration-specific markers."""
    config.addinivalue_line("markers", "requires_db: test requires database connection")


@pytest.fixture(scope="session")
def db_config():
    """
    Database config from environment variables.
    In CI, these are set as secrets. Locally, use a .env file.
    """
    host = os.environ.get("TEST_DB_HOST", "localhost")
    port = int(os.environ.get("TEST_DB_PORT", "5432"))
    database = os.environ.get("TEST_DB_NAME", "test_pipeline")
    user = os.environ.get("TEST_DB_USER", "postgres")
    password = os.environ.get("TEST_DB_PASSWORD", "")

    return {
        "host": host,
        "port": port,
        "database": database,
        "user": user,
        "password": password,
    }


@pytest.fixture(scope="session")
def pg_connection(db_config):
    """
    Live PostgreSQL connection — session-scoped for performance.
    Skips if database is not reachable.
    """
    try:
        import psycopg2
        conn = psycopg2.connect(**db_config)
        conn.autocommit = False
        yield conn
        conn.close()
    except Exception as e:
        pytest.skip(f"Cannot connect to test database: {e}")

Yield Fixtures: Setup and Teardown

yield in a fixture separates setup (before yield) from teardown (after yield). This is the pytest equivalent of setUp/tearDown, but scoped precisely.

Python
@pytest.fixture
def temp_parquet_file(tmp_path) -> str:
    """
    Create a temporary parquet file for testing loaders.
    File is removed after the test regardless of pass/fail.
    """
    import pandas as pd

    path = tmp_path / "test_data.parquet"
    df = pd.DataFrame({
        "id": [1, 2, 3],
        "value": ["a", "b", "c"],
    })
    df.to_parquet(path)

    yield str(path)  # Test receives the path string

    # Teardown: nothing needed  tmp_path cleans itself
    # But if you had a database table, you'd drop it here


@pytest.fixture(scope="module")
def duckdb_with_orders():
    """
    DuckDB in-memory DB seeded with orders data.
    Created once per module, dropped after all module tests complete.
    """
    import duckdb

    conn = duckdb.connect(":memory:")
    conn.execute("""
        CREATE TABLE orders (
            order_id VARCHAR,
            customer_id VARCHAR,
            revenue DOUBLE,
            order_date DATE
        )
    """)
    conn.execute("""
        INSERT INTO orders VALUES
            ('O1', 'C1', 100.0, '2026-01-01'),
            ('O2', 'C2', 200.0, '2026-01-02'),
            ('O3', 'C1', 150.0, '2026-01-03')
    """)

    yield conn

    # Teardown — drop tables to release resources
    conn.execute("DROP TABLE IF EXISTS orders")
    conn.close()


def test_orders_table_row_count(duckdb_with_orders):
    count = duckdb_with_orders.execute("SELECT COUNT(*) FROM orders").fetchone()[0]
    assert count == 3

def test_total_revenue(duckdb_with_orders):
    total = duckdb_with_orders.execute("SELECT SUM(revenue) FROM orders").fetchone()[0]
    assert total == 450.0

Transactional Teardown (PostgreSQL Pattern)

For integration tests hitting a real database, wrap each test in a transaction and roll back:

Python
@pytest.fixture
def db_transaction(pg_connection):
    """
    Wrap each test in a transaction that is rolled back after.
    This keeps tests isolated without recreating the schema.
    """
    pg_connection.autocommit = False
    yield pg_connection
    pg_connection.rollback()  # Always roll back  test data disappears


def test_insert_order(db_transaction):
    cursor = db_transaction.cursor()
    cursor.execute(
        "INSERT INTO orders (order_id, customer_id, revenue) VALUES (%s, %s, %s)",
        ("TEST-001", "C999", 999.99)
    )
    cursor.execute("SELECT revenue FROM orders WHERE order_id = %s", ("TEST-001",))
    row = cursor.fetchone()
    assert row[0] == pytest.approx(999.99)
    # After test: rollback removes TEST-001 from the DB

Fixture Factories (Returning Functions)

Sometimes you need a fixture that creates multiple instances with different parameters. Return a factory function:

Python
@pytest.fixture
def make_dataframe():
    """
    Factory fixture: returns a function that creates DataFrames on demand.
    Each call to make_dataframe() creates a new DataFrame with custom parameters.
    """
    def _factory(
        n_rows: int = 10,
        null_fraction: float = 0.0,
        include_revenue: bool = True,
        revenue_range: tuple = (10.0, 1000.0),
    ) -> pd.DataFrame:
        import numpy as np
        rng = np.random.default_rng(seed=42)

        data = {
            "customer_id": [f"C{i:04d}" for i in range(n_rows)],
            "order_date": pd.date_range("2026-01-01", periods=n_rows, freq="D"),
            "region": rng.choice(["NORTH", "SOUTH", "EAST", "WEST"], size=n_rows),
        }

        if include_revenue:
            revenues = rng.uniform(*revenue_range, size=n_rows)
            if null_fraction > 0:
                null_mask = rng.random(n_rows) < null_fraction
                revenues = revenues.astype(object)
                revenues[null_mask] = None
            data["revenue"] = revenues

        return pd.DataFrame(data)

    return _factory


def test_pipeline_handles_large_input(make_dataframe):
    large_df = make_dataframe(n_rows=10_000)
    assert len(large_df) == 10_000

def test_pipeline_handles_sparse_revenue(make_dataframe):
    sparse_df = make_dataframe(n_rows=100, null_fraction=0.3)
    null_pct = sparse_df["revenue"].isnull().mean()
    # Approximately 30% nulls (seeded RNG gives deterministic results)
    assert 0.2 <= null_pct <= 0.4

def test_pipeline_without_revenue_column(make_dataframe):
    df = make_dataframe(include_revenue=False)
    assert "revenue" not in df.columns

Mock Connection Factory

Python
@pytest.fixture
def mock_db_connection_factory():
    """
    Returns a factory for creating mock database connections.
    Each call configures the mock to return specific query results.
    """
    from unittest.mock import MagicMock

    def _factory(query_results: dict):
        """
        query_results: mapping of SQL substring → list of rows to return
        e.g. {"SELECT * FROM orders": [("O1", 100), ("O2", 200)]}
        """
        conn = MagicMock()
        cursor = MagicMock()
        conn.cursor.return_value = cursor

        def mock_execute(sql, *args, **kwargs):
            for pattern, rows in query_results.items():
                if pattern in sql:
                    cursor.fetchall.return_value = rows
                    cursor.fetchone.return_value = rows[0] if rows else None
                    return

        cursor.execute.side_effect = mock_execute
        return conn

    return _factory


def test_extract_uses_correct_query(mock_db_connection_factory):
    from src.extractors import OrderExtractor

    conn = mock_db_connection_factory({
        "SELECT * FROM orders": [
            ("O1", "C1", 100.0),
            ("O2", "C2", 200.0),
        ]
    })
    extractor = OrderExtractor(conn)
    df = extractor.extract()
    assert len(df) == 2

@pytest.mark.parametrize

Parametrize runs a single test function with multiple argument sets. It replaces copy-paste test functions.

Single Parameter

Python
from src.transformers import clean_phone_number
import pytest

@pytest.mark.parametrize("raw_input,expected", [
    ("+1 (555) 867-5309", "15558675309"),
    ("555.867.5309", "5558675309"),
    ("555-867-5309", "5558675309"),
    ("(555)8675309", "5558675309"),
    ("5558675309", "5558675309"),
    ("  5558675309  ", "5558675309"),  # Whitespace
])
def test_clean_phone_number(raw_input, expected):
    assert clean_phone_number(raw_input) == expected

Running this produces 6 separate test cases, each independently pass/fail:

test_transformers.py::test_clean_phone_number[+1 (555) 867-5309-15558675309] PASSED
test_transformers.py::test_clean_phone_number[555.867.5309-5558675309] PASSED
...

Multiple Parameters

Python
from src.transformers import normalize_currency


@pytest.mark.parametrize("currency_string,expected_value,expected_currency", [
    ("$1,234.56", 1234.56, "USD"),
    ("€500.00", 500.00, "EUR"),
    ("£99.99", 99.99, "GBP"),
    ("1000", 1000.0, None),       # No currency symbol
    ("USD 250", 250.0, "USD"),    # Prefix code
    ("250 EUR", 250.0, "EUR"),    # Suffix code
])
def test_normalize_currency(currency_string, expected_value, expected_currency):
    value, currency = normalize_currency(currency_string)
    assert value == pytest.approx(expected_value)
    assert currency == expected_currency

Parametrize with IDs

Custom IDs make the test output readable:

Python
@pytest.mark.parametrize("df,expected_error_count", [
    pytest.param(
        pd.DataFrame({"order_id": ["O1"], "revenue": [100.0]}),
        0,
        id="valid_single_row",
    ),
    pytest.param(
        pd.DataFrame({"order_id": ["O1", "O1"], "revenue": [100.0, 200.0]}),
        1,
        id="duplicate_order_id",
    ),
    pytest.param(
        pd.DataFrame({"revenue": [100.0]}),
        1,
        id="missing_order_id_column",
    ),
    pytest.param(
        pd.DataFrame(columns=["order_id", "revenue"]),
        1,
        id="empty_dataframe",
    ),
], ids=lambda x: x if isinstance(x, str) else None)
def test_validation_error_count(df, expected_error_count):
    from src.validators import validate_pipeline_output
    result = validate_pipeline_output(df, required_columns=["order_id", "revenue"])
    assert len(result.errors) == expected_error_count

pytest.param with skip and xfail

Python
@pytest.mark.parametrize("input_value,expected", [
    pytest.param(None, None, id="null_input"),
    pytest.param("", None, id="empty_string"),
    pytest.param("2026-01-15", pd.Timestamp("2026-01-15"), id="iso_format"),
    pytest.param("15/01/2026", pd.Timestamp("2026-01-15"), id="uk_format"),
    pytest.param("Jan 15 2026", pd.Timestamp("2026-01-15"), id="verbose_format"),
    pytest.param(
        "32/01/2026",  # Invalid day
        None,
        id="invalid_day",
    ),
    pytest.param(
        "2026-13-01",  # Invalid month
        None,
        id="invalid_month",
        marks=pytest.mark.xfail(reason="Parser doesn't yet handle invalid months — JIRA-5678"),
    ),
    pytest.param(
        "yesterday",
        None,
        id="natural_language_date",
        marks=pytest.mark.skip(reason="Natural language parsing not implemented"),
    ),
])
def test_parse_date(input_value, expected):
    from src.transformers import parse_date
    result = parse_date(input_value)
    if expected is None:
        assert result is None
    else:
        assert result == expected

Parametrize on a Class

Python
@pytest.mark.parametrize("region,expected_tier", [
    ("NORTH", "premium"),
    ("SOUTH", "standard"),
    ("EAST", "standard"),
    ("WEST", "premium"),
])
class TestRegionTierMapping:
    def test_tier_assignment(self, region, expected_tier):
        from src.enrichment import get_region_tier
        assert get_region_tier(region) == expected_tier

    def test_tier_is_string(self, region, expected_tier):
        from src.enrichment import get_region_tier
        result = get_region_tier(region)
        assert isinstance(result, str)

Indirect Parametrize

indirect=True passes the parameter value to a fixture function instead of directly to the test. This is useful when you want to parametrize fixture construction.

Python
@pytest.fixture
def configured_etl(request):
    """
    Fixture that accepts its configuration from parametrize via indirect.
    request.param contains the value passed from parametrize.
    """
    from src.etl import SalesETL
    config = request.param
    return SalesETL(config=config)


@pytest.mark.parametrize("configured_etl", [
    {"env": "development", "batch_size": 100},
    {"env": "staging", "batch_size": 1000},
    {"env": "production", "batch_size": 10000},
], indirect=True)
def test_etl_initializes_with_config(configured_etl):
    assert configured_etl.config is not None
    assert "env" in configured_etl.config
    assert "batch_size" in configured_etl.config

Partial Indirect (mix of direct and fixture)

Python
@pytest.fixture
def database_with_data(request):
    """Fixture that pre-loads a DuckDB with the requested table data."""
    import duckdb
    table_name, rows = request.param
    conn = duckdb.connect(":memory:")
    conn.execute(f"CREATE TABLE {table_name} (id INT, value VARCHAR)")
    for row in rows:
        conn.execute(f"INSERT INTO {table_name} VALUES (?, ?)", row)
    yield conn
    conn.close()


@pytest.mark.parametrize(
    "database_with_data,expected_count",
    [
        (("orders", [(1, "A"), (2, "B")]), 2),
        (("orders", [(1, "A")]), 1),
        (("orders", []), 0),
    ],
    indirect=["database_with_data"],  # Only database_with_data goes through the fixture
)
def test_row_count(database_with_data, expected_count):
    count = database_with_data.execute("SELECT COUNT(*) FROM orders").fetchone()[0]
    assert count == expected_count

Combining Fixtures and Parametrize

Real power comes from layering fixtures and parametrize:

Python
# conftest.py
@pytest.fixture
def dirty_dataframe():
    """A DataFrame with various data quality issues for cleaning tests."""
    return pd.DataFrame({
        "customer_id": ["C001", "  C002  ", None, "C004", "C004"],
        "email": ["alice@x.com", "BOB@X.COM", "carol@x.com", None, "dave@x.com"],
        "revenue": ["$100.00", "€200", "invalid", "300", "400"],
        "signup_date": ["2026-01-01", "2026-02-30", "2026-03-15", "not-a-date", "2026-04-01"],
    })


@pytest.mark.parametrize("column,null_fraction_threshold", [
    ("customer_id", 0.0),   # No nulls allowed after cleaning
    ("email", 0.3),          # Up to 30% nulls tolerated
    ("revenue", 0.2),        # Up to 20% parse failures tolerated
])
def test_cleaning_null_rates(dirty_dataframe, column, null_fraction_threshold):
    from src.transformers import clean_dataframe

    result = clean_dataframe(dirty_dataframe)

    if column in result.columns:
        null_rate = result[column].isnull().mean()
        assert null_rate <= null_fraction_threshold, (
            f"Column '{column}' has {null_rate:.1%} nulls, "
            f"exceeding threshold of {null_fraction_threshold:.1%}"
        )

Realistic Data Engineering Fixtures

Large Sample DataFrame for Performance Tests

Python
@pytest.fixture(scope="session")
def large_orders_df() -> pd.DataFrame:
    """
    100k-row orders DataFrame for performance and memory tests.
    Session-scoped — expensive to create, safe to share (read-only in tests).
    """
    import numpy as np

    rng = np.random.default_rng(seed=0)
    n = 100_000

    return pd.DataFrame({
        "order_id": [f"ORD-{i:07d}" for i in range(n)],
        "customer_id": [f"C{rng.integers(1, 10001):06d}" for _ in range(n)],
        "product_sku": rng.choice(
            [f"SKU-{c}" for c in "ABCDEFGHIJ"], size=n
        ),
        "quantity": rng.integers(1, 20, size=n),
        "unit_price": rng.uniform(1.0, 500.0, size=n).round(2),
        "order_date": pd.to_datetime(
            rng.integers(
                pd.Timestamp("2024-01-01").value,
                pd.Timestamp("2026-01-01").value,
                size=n,
            )
        ),
        "status": rng.choice(["shipped", "pending", "cancelled"], size=n, p=[0.7, 0.2, 0.1]),
    })


def test_aggregation_completes_in_reasonable_time(large_orders_df):
    import time
    from src.aggregators import compute_customer_revenue

    start = time.monotonic()
    result = compute_customer_revenue(large_orders_df)
    elapsed = time.monotonic() - start

    assert elapsed < 5.0, f"Aggregation took {elapsed:.2f}s — optimize the implementation"
    assert len(result) > 0

Mock S3 / File System Fixture

Python
@pytest.fixture
def mock_s3_bucket(tmp_path):
    """
    Simulates an S3 bucket structure using a local temp directory.
    Tests that write to / read from 'S3' use this fixture via monkeypatching.
    """
    bucket_root = tmp_path / "mock-s3" / "data-lake-bucket"
    bucket_root.mkdir(parents=True)

    # Pre-populate with sample data
    raw_dir = bucket_root / "raw" / "orders" / "2026" / "01"
    raw_dir.mkdir(parents=True)

    pd.DataFrame({
        "order_id": ["O1", "O2"],
        "revenue": [100.0, 200.0],
    }).to_parquet(raw_dir / "part-00001.parquet")

    return bucket_root


def test_pipeline_reads_partitioned_data(mock_s3_bucket, monkeypatch):
    from src.readers import S3ParquetReader

    # Monkeypatch the S3 base path to use our local mock
    monkeypatch.setenv("S3_DATA_LAKE_PATH", str(mock_s3_bucket))

    reader = S3ParquetReader(bucket="data-lake-bucket")
    df = reader.read_partition(table="orders", year=2026, month=1)

    assert len(df) == 2
    assert "order_id" in df.columns

Temp Directory with Schema Files

Python
@pytest.fixture
def schema_directory(tmp_path):
    """
    Creates a temporary directory with JSON schema files for schema registry tests.
    """
    import json

    schemas = {
        "orders_v1.json": {
            "version": "1.0",
            "required_columns": ["order_id", "customer_id", "revenue"],
            "column_types": {
                "order_id": "string",
                "customer_id": "string",
                "revenue": "float64",
            }
        },
        "customers_v1.json": {
            "version": "1.0",
            "required_columns": ["customer_id", "name", "email"],
            "column_types": {
                "customer_id": "string",
                "name": "string",
                "email": "string",
            }
        }
    }

    for filename, content in schemas.items():
        (tmp_path / filename).write_text(json.dumps(content))

    return tmp_path


def test_schema_registry_loads_all_schemas(schema_directory):
    from src.registry import SchemaRegistry

    registry = SchemaRegistry(schema_dir=str(schema_directory))
    assert registry.get("orders_v1") is not None
    assert registry.get("customers_v1") is not None
    assert registry.get("nonexistent") is None

Parametrized Tests for Data Cleaning Edge Cases

Here is a complete, production-quality parametrized test module for a clean_email function:

Python
# tests/unit/test_email_cleaner.py
import pytest
from src.transformers import clean_email


# (raw_input, expected_output)
VALID_EMAIL_CASES = [
    ("alice@example.com", "alice@example.com"),
    ("ALICE@EXAMPLE.COM", "alice@example.com"),  # Lowercased
    ("  alice@example.com  ", "alice@example.com"),  # Stripped
    ("Alice.Martin+tag@Example.COM", "alice.martin+tag@example.com"),
    ("user@subdomain.example.co.uk", "user@subdomain.example.co.uk"),
]

INVALID_EMAIL_CASES = [
    (None, None),
    ("", None),
    ("   ", None),
    ("not-an-email", None),
    ("missing@", None),
    ("@missinglocal.com", None),
    ("double@@domain.com", None),
    ("spaces in@email.com", None),
]


@pytest.mark.parametrize("raw,expected", VALID_EMAIL_CASES,
    ids=[f"valid_{i}" for i in range(len(VALID_EMAIL_CASES))])
def test_cleans_valid_email(raw, expected):
    assert clean_email(raw) == expected


@pytest.mark.parametrize("raw,expected", INVALID_EMAIL_CASES,
    ids=[case[0] or "none" for case in INVALID_EMAIL_CASES])
def test_returns_none_for_invalid_email(raw, expected):
    assert clean_email(raw) is None


@pytest.mark.parametrize("domain", [
    "gmail.com",
    "outlook.com",
    "company.io",
    "data-corp.co.uk",
    "subdomain.enterprise.example.com",
])
def test_preserves_various_domains(domain):
    email = f"user@{domain}"
    result = clean_email(email)
    assert result is not None
    assert result.endswith(f"@{domain}")

Fixture Debugging Tips

See fixture setup order

Bash
pytest --setup-show tests/unit/test_transformers.py -v
# Output shows:
# SETUP    S session_fixtures (session scope)
# SETUP    M module_fixtures (module scope)
# SETUP    F function_fixture (function scope)
# tests/unit/test_transformers.py::test_something PASSED
# TEARDOWN F function_fixture

List all available fixtures

Bash
pytest --fixtures  # All fixtures including built-ins
pytest --fixtures -v  # With descriptions from docstrings

Fixture finalization order

Fixtures are torn down in reverse order of their setup. If fixture B depends on fixture A, then B is torn down before A.

Python
@pytest.fixture
def database(request):
    conn = create_connection()
    yield conn
    conn.close()  # Runs last

@pytest.fixture
def populated_database(database):
    database.execute("INSERT INTO ...")
    yield database
    database.execute("DELETE FROM ...")  # Runs first (reverse order)

Summary

  • Use conftest.py to share fixtures without imports — scope them appropriately
  • function scope for mutable data, session scope for expensive infrastructure
  • yield fixtures for setup/teardown — always clean up after integration fixtures
  • Fixture factories let one fixture create multiple independent instances
  • parametrize eliminates copy-paste test functions — use pytest.param for per-case marks
  • Combine fixtures and parametrize for powerful, DRY test suites
  • indirect parametrize passes values through fixtures for dynamic setup
  • Use --setup-show to debug fixture lifecycle issues

The next lesson covers mocking — patching external dependencies so your tests don't need real Snowflake credentials or S3 buckets.