Back to blog
Data Engineeringintermediate

pytest Fixtures and Parametrization for Data Pipelines

Build reusable test infrastructure with fixtures at every scope level, conftest.py, yield fixtures for teardown, fixture factories, and parametrize for exhaustive edge-case coverage in data engineering pipelines.

LearnixoMay 7, 202614 min read
pytestpythonfixturesparametrizedata-engineeringpandastesting
Share:𝕏

pytest Fixtures and Parametrization for Data Pipelines

Fixtures and parametrization are the two features that separate a 50-test suite from a 500-test suite — without 500 lines of duplicated setup code. In data engineering, you constantly need: sample DataFrames, database connections, temp directories for output files, and exhaustive edge-case coverage for cleaning functions. This lesson covers all of it with production patterns.

What Is a Fixture?

A fixture is a function decorated with @pytest.fixture. pytest automatically discovers and injects fixtures into test functions by matching parameter names.

Python
import pytest
import pandas as pd

@pytest.fixture
def sample_df():
    return pd.DataFrame({
        "customer_id": ["C001", "C002", "C003"],
        "revenue": [100.0, 250.0, 75.0],
        "region": ["NORTH", "SOUTH", "EAST"],
    })

def test_revenue_sum(sample_df):
    # sample_df is automatically injected  no instantiation needed
    assert sample_df["revenue"].sum() == 425.0

def test_customer_count(sample_df):
    assert len(sample_df) == 3

This looks trivial, but fixtures compose, scope, and share across files — that is where the power lies.

Fixture Scope

The scope parameter controls how often pytest creates and destroys a fixture:

| Scope | Created once per... | Use for | |-------|---------------------|---------| | function | Test function (default) | Mutable state, DataFrames | | class | Test class | Shared setup across methods | | module | Test file | Expensive setup reused in one file | | session | Entire test run | Database connections, Docker containers |

function scope (Default)

Each test gets a fresh copy. Safe for mutable objects.

Python
@pytest.fixture(scope="function")
def clean_dataframe():
    """Fresh DataFrame for each test — mutations in one test don't bleed into others."""
    return pd.DataFrame({
        "id": [1, 2, 3],
        "value": [10.0, 20.0, 30.0],
    })

class scope

Created once for all methods in the class, destroyed after the last method runs.

Python
@pytest.fixture(scope="class")
def schema_validator():
    """Expensive to construct — reuse within a test class."""
    from src.validators import SchemaValidator
    return SchemaValidator.from_config("tests/fixtures/schema.json")

@pytest.mark.usefixtures("schema_validator")
class TestSchemaValidation:
    def test_valid_record_passes(self, schema_validator):
        result = schema_validator.validate({"id": 1, "name": "Alice"})
        assert result.is_valid

    def test_missing_id_fails(self, schema_validator):
        result = schema_validator.validate({"name": "Alice"})
        assert not result.is_valid

module scope

Shared across all tests in a single file. Good for an in-memory database loaded from a fixture CSV.

Python
@pytest.fixture(scope="module")
def loaded_reference_data():
    """Load reference tables once per test module — these are read-only."""
    import pandas as pd
    return {
        "country_codes": pd.read_csv("tests/fixtures/country_codes.csv"),
        "currency_rates": pd.read_csv("tests/fixtures/currency_rates.csv"),
    }

session scope

The fixture lives for the entire pytest run. Use for shared infrastructure: a Docker-started PostgreSQL, a DuckDB connection, or a large dataset loaded once.

Python
@pytest.fixture(scope="session")
def duckdb_connection():
    """
    Single DuckDB in-memory connection shared across all tests.
    DuckDB handles concurrent reads safely for in-memory connections.
    """
    import duckdb
    conn = duckdb.connect(":memory:")
    # Seed with test data
    conn.execute("""
        CREATE TABLE orders AS
        SELECT * FROM read_csv_auto('tests/fixtures/orders.csv')
    """)
    yield conn
    conn.close()

conftest.py: Sharing Fixtures Across Files

conftest.py files are automatically loaded by pytest. Any fixture defined there is available to all tests in the same directory and its subdirectories — no import required.

tests/
├── conftest.py           ← fixtures available to ALL tests
├── unit/
│   ├── conftest.py       ← fixtures available to tests/unit/** only
│   └── test_transformers.py
└── integration/
    ├── conftest.py       ← fixtures available to tests/integration/** only
    └── test_database.py

Root conftest.py (shared infrastructure)

Python
# tests/conftest.py
import pytest
import pandas as pd
import tempfile
import os
from pathlib import Path


@pytest.fixture(scope="session")
def test_data_dir() -> Path:
    """Absolute path to the test fixtures directory."""
    return Path(__file__).parent / "fixtures"


@pytest.fixture(scope="session")
def sample_orders_csv(test_data_dir) -> Path:
    """Path to the sample orders CSV file."""
    path = test_data_dir / "orders.csv"
    assert path.exists(), f"Fixture file not found: {path}"
    return path


@pytest.fixture
def sample_orders_df() -> pd.DataFrame:
    """
    A small, representative orders DataFrame.
    function-scoped so each test gets a fresh mutable copy.
    """
    return pd.DataFrame({
        "order_id": ["ORD-001", "ORD-002", "ORD-003", "ORD-004", "ORD-005"],
        "customer_id": ["C001", "C002", "C001", "C003", "C002"],
        "product_sku": ["SKU-A", "SKU-B", "SKU-A", "SKU-C", "SKU-B"],
        "quantity": [2, 1, 3, 1, 2],
        "unit_price": [29.99, 149.99, 29.99, 299.99, 149.99],
        "order_date": pd.to_datetime([
            "2026-01-01", "2026-01-02", "2026-01-03",
            "2026-01-04", "2026-01-05"
        ]),
        "status": ["shipped", "pending", "shipped", "cancelled", "shipped"],
    })


@pytest.fixture
def sample_customers_df() -> pd.DataFrame:
    return pd.DataFrame({
        "customer_id": ["C001", "C002", "C003"],
        "name": ["Alice Martin", "Bob Chen", "Carol White"],
        "region": ["NORTH", "SOUTH", "EAST"],
        "tier": ["gold", "silver", "gold"],
    })


@pytest.fixture
def temp_output_dir(tmp_path) -> Path:
    """
    A temporary directory for pipeline output.
    tmp_path is a built-in pytest fixture that provides a unique dir per test.
    """
    output = tmp_path / "pipeline_output"
    output.mkdir()
    return output

Integration conftest.py

Python
# tests/integration/conftest.py
import pytest
import os


def pytest_configure(config):
    """Register integration-specific markers."""
    config.addinivalue_line("markers", "requires_db: test requires database connection")


@pytest.fixture(scope="session")
def db_config():
    """
    Database config from environment variables.
    In CI, these are set as secrets. Locally, use a .env file.
    """
    host = os.environ.get("TEST_DB_HOST", "localhost")
    port = int(os.environ.get("TEST_DB_PORT", "5432"))
    database = os.environ.get("TEST_DB_NAME", "test_pipeline")
    user = os.environ.get("TEST_DB_USER", "postgres")
    password = os.environ.get("TEST_DB_PASSWORD", "")

    return {
        "host": host,
        "port": port,
        "database": database,
        "user": user,
        "password": password,
    }


@pytest.fixture(scope="session")
def pg_connection(db_config):
    """
    Live PostgreSQL connection — session-scoped for performance.
    Skips if database is not reachable.
    """
    try:
        import psycopg2
        conn = psycopg2.connect(**db_config)
        conn.autocommit = False
        yield conn
        conn.close()
    except Exception as e:
        pytest.skip(f"Cannot connect to test database: {e}")

Yield Fixtures: Setup and Teardown

yield in a fixture separates setup (before yield) from teardown (after yield). This is the pytest equivalent of setUp/tearDown, but scoped precisely.

Python
@pytest.fixture
def temp_parquet_file(tmp_path) -> str:
    """
    Create a temporary parquet file for testing loaders.
    File is removed after the test regardless of pass/fail.
    """
    import pandas as pd

    path = tmp_path / "test_data.parquet"
    df = pd.DataFrame({
        "id": [1, 2, 3],
        "value": ["a", "b", "c"],
    })
    df.to_parquet(path)

    yield str(path)  # Test receives the path string

    # Teardown: nothing needed  tmp_path cleans itself
    # But if you had a database table, you'd drop it here


@pytest.fixture(scope="module")
def duckdb_with_orders():
    """
    DuckDB in-memory DB seeded with orders data.
    Created once per module, dropped after all module tests complete.
    """
    import duckdb

    conn = duckdb.connect(":memory:")
    conn.execute("""
        CREATE TABLE orders (
            order_id VARCHAR,
            customer_id VARCHAR,
            revenue DOUBLE,
            order_date DATE
        )
    """)
    conn.execute("""
        INSERT INTO orders VALUES
            ('O1', 'C1', 100.0, '2026-01-01'),
            ('O2', 'C2', 200.0, '2026-01-02'),
            ('O3', 'C1', 150.0, '2026-01-03')
    """)

    yield conn

    # Teardown — drop tables to release resources
    conn.execute("DROP TABLE IF EXISTS orders")
    conn.close()


def test_orders_table_row_count(duckdb_with_orders):
    count = duckdb_with_orders.execute("SELECT COUNT(*) FROM orders").fetchone()[0]
    assert count == 3

def test_total_revenue(duckdb_with_orders):
    total = duckdb_with_orders.execute("SELECT SUM(revenue) FROM orders").fetchone()[0]
    assert total == 450.0

Transactional Teardown (PostgreSQL Pattern)

For integration tests hitting a real database, wrap each test in a transaction and roll back:

Python
@pytest.fixture
def db_transaction(pg_connection):
    """
    Wrap each test in a transaction that is rolled back after.
    This keeps tests isolated without recreating the schema.
    """
    pg_connection.autocommit = False
    yield pg_connection
    pg_connection.rollback()  # Always roll back  test data disappears


def test_insert_order(db_transaction):
    cursor = db_transaction.cursor()
    cursor.execute(
        "INSERT INTO orders (order_id, customer_id, revenue) VALUES (%s, %s, %s)",
        ("TEST-001", "C999", 999.99)
    )
    cursor.execute("SELECT revenue FROM orders WHERE order_id = %s", ("TEST-001",))
    row = cursor.fetchone()
    assert row[0] == pytest.approx(999.99)
    # After test: rollback removes TEST-001 from the DB

Fixture Factories (Returning Functions)

Sometimes you need a fixture that creates multiple instances with different parameters. Return a factory function:

Python
@pytest.fixture
def make_dataframe():
    """
    Factory fixture: returns a function that creates DataFrames on demand.
    Each call to make_dataframe() creates a new DataFrame with custom parameters.
    """
    def _factory(
        n_rows: int = 10,
        null_fraction: float = 0.0,
        include_revenue: bool = True,
        revenue_range: tuple = (10.0, 1000.0),
    ) -> pd.DataFrame:
        import numpy as np
        rng = np.random.default_rng(seed=42)

        data = {
            "customer_id": [f"C{i:04d}" for i in range(n_rows)],
            "order_date": pd.date_range("2026-01-01", periods=n_rows, freq="D"),
            "region": rng.choice(["NORTH", "SOUTH", "EAST", "WEST"], size=n_rows),
        }

        if include_revenue:
            revenues = rng.uniform(*revenue_range, size=n_rows)
            if null_fraction > 0:
                null_mask = rng.random(n_rows) < null_fraction
                revenues = revenues.astype(object)
                revenues[null_mask] = None
            data["revenue"] = revenues

        return pd.DataFrame(data)

    return _factory


def test_pipeline_handles_large_input(make_dataframe):
    large_df = make_dataframe(n_rows=10_000)
    assert len(large_df) == 10_000

def test_pipeline_handles_sparse_revenue(make_dataframe):
    sparse_df = make_dataframe(n_rows=100, null_fraction=0.3)
    null_pct = sparse_df["revenue"].isnull().mean()
    # Approximately 30% nulls (seeded RNG gives deterministic results)
    assert 0.2 <= null_pct <= 0.4

def test_pipeline_without_revenue_column(make_dataframe):
    df = make_dataframe(include_revenue=False)
    assert "revenue" not in df.columns

Mock Connection Factory

Python
@pytest.fixture
def mock_db_connection_factory():
    """
    Returns a factory for creating mock database connections.
    Each call configures the mock to return specific query results.
    """
    from unittest.mock import MagicMock

    def _factory(query_results: dict):
        """
        query_results: mapping of SQL substring → list of rows to return
        e.g. {"SELECT * FROM orders": [("O1", 100), ("O2", 200)]}
        """
        conn = MagicMock()
        cursor = MagicMock()
        conn.cursor.return_value = cursor

        def mock_execute(sql, *args, **kwargs):
            for pattern, rows in query_results.items():
                if pattern in sql:
                    cursor.fetchall.return_value = rows
                    cursor.fetchone.return_value = rows[0] if rows else None
                    return

        cursor.execute.side_effect = mock_execute
        return conn

    return _factory


def test_extract_uses_correct_query(mock_db_connection_factory):
    from src.extractors import OrderExtractor

    conn = mock_db_connection_factory({
        "SELECT * FROM orders": [
            ("O1", "C1", 100.0),
            ("O2", "C2", 200.0),
        ]
    })
    extractor = OrderExtractor(conn)
    df = extractor.extract()
    assert len(df) == 2

@pytest.mark.parametrize

Parametrize runs a single test function with multiple argument sets. It replaces copy-paste test functions.

Single Parameter

Python
from src.transformers import clean_phone_number
import pytest

@pytest.mark.parametrize("raw_input,expected", [
    ("+1 (555) 867-5309", "15558675309"),
    ("555.867.5309", "5558675309"),
    ("555-867-5309", "5558675309"),
    ("(555)8675309", "5558675309"),
    ("5558675309", "5558675309"),
    ("  5558675309  ", "5558675309"),  # Whitespace
])
def test_clean_phone_number(raw_input, expected):
    assert clean_phone_number(raw_input) == expected

Running this produces 6 separate test cases, each independently pass/fail:

test_transformers.py::test_clean_phone_number[+1 (555) 867-5309-15558675309] PASSED
test_transformers.py::test_clean_phone_number[555.867.5309-5558675309] PASSED
...

Multiple Parameters

Python
from src.transformers import normalize_currency


@pytest.mark.parametrize("currency_string,expected_value,expected_currency", [
    ("$1,234.56", 1234.56, "USD"),
    ("€500.00", 500.00, "EUR"),
    ("£99.99", 99.99, "GBP"),
    ("1000", 1000.0, None),       # No currency symbol
    ("USD 250", 250.0, "USD"),    # Prefix code
    ("250 EUR", 250.0, "EUR"),    # Suffix code
])
def test_normalize_currency(currency_string, expected_value, expected_currency):
    value, currency = normalize_currency(currency_string)
    assert value == pytest.approx(expected_value)
    assert currency == expected_currency

Parametrize with IDs

Custom IDs make the test output readable:

Python
@pytest.mark.parametrize("df,expected_error_count", [
    pytest.param(
        pd.DataFrame({"order_id": ["O1"], "revenue": [100.0]}),
        0,
        id="valid_single_row",
    ),
    pytest.param(
        pd.DataFrame({"order_id": ["O1", "O1"], "revenue": [100.0, 200.0]}),
        1,
        id="duplicate_order_id",
    ),
    pytest.param(
        pd.DataFrame({"revenue": [100.0]}),
        1,
        id="missing_order_id_column",
    ),
    pytest.param(
        pd.DataFrame(columns=["order_id", "revenue"]),
        1,
        id="empty_dataframe",
    ),
], ids=lambda x: x if isinstance(x, str) else None)
def test_validation_error_count(df, expected_error_count):
    from src.validators import validate_pipeline_output
    result = validate_pipeline_output(df, required_columns=["order_id", "revenue"])
    assert len(result.errors) == expected_error_count

pytest.param with skip and xfail

Python
@pytest.mark.parametrize("input_value,expected", [
    pytest.param(None, None, id="null_input"),
    pytest.param("", None, id="empty_string"),
    pytest.param("2026-01-15", pd.Timestamp("2026-01-15"), id="iso_format"),
    pytest.param("15/01/2026", pd.Timestamp("2026-01-15"), id="uk_format"),
    pytest.param("Jan 15 2026", pd.Timestamp("2026-01-15"), id="verbose_format"),
    pytest.param(
        "32/01/2026",  # Invalid day
        None,
        id="invalid_day",
    ),
    pytest.param(
        "2026-13-01",  # Invalid month
        None,
        id="invalid_month",
        marks=pytest.mark.xfail(reason="Parser doesn't yet handle invalid months — JIRA-5678"),
    ),
    pytest.param(
        "yesterday",
        None,
        id="natural_language_date",
        marks=pytest.mark.skip(reason="Natural language parsing not implemented"),
    ),
])
def test_parse_date(input_value, expected):
    from src.transformers import parse_date
    result = parse_date(input_value)
    if expected is None:
        assert result is None
    else:
        assert result == expected

Parametrize on a Class

Python
@pytest.mark.parametrize("region,expected_tier", [
    ("NORTH", "premium"),
    ("SOUTH", "standard"),
    ("EAST", "standard"),
    ("WEST", "premium"),
])
class TestRegionTierMapping:
    def test_tier_assignment(self, region, expected_tier):
        from src.enrichment import get_region_tier
        assert get_region_tier(region) == expected_tier

    def test_tier_is_string(self, region, expected_tier):
        from src.enrichment import get_region_tier
        result = get_region_tier(region)
        assert isinstance(result, str)

Indirect Parametrize

indirect=True passes the parameter value to a fixture function instead of directly to the test. This is useful when you want to parametrize fixture construction.

Python
@pytest.fixture
def configured_etl(request):
    """
    Fixture that accepts its configuration from parametrize via indirect.
    request.param contains the value passed from parametrize.
    """
    from src.etl import SalesETL
    config = request.param
    return SalesETL(config=config)


@pytest.mark.parametrize("configured_etl", [
    {"env": "development", "batch_size": 100},
    {"env": "staging", "batch_size": 1000},
    {"env": "production", "batch_size": 10000},
], indirect=True)
def test_etl_initializes_with_config(configured_etl):
    assert configured_etl.config is not None
    assert "env" in configured_etl.config
    assert "batch_size" in configured_etl.config

Partial Indirect (mix of direct and fixture)

Python
@pytest.fixture
def database_with_data(request):
    """Fixture that pre-loads a DuckDB with the requested table data."""
    import duckdb
    table_name, rows = request.param
    conn = duckdb.connect(":memory:")
    conn.execute(f"CREATE TABLE {table_name} (id INT, value VARCHAR)")
    for row in rows:
        conn.execute(f"INSERT INTO {table_name} VALUES (?, ?)", row)
    yield conn
    conn.close()


@pytest.mark.parametrize(
    "database_with_data,expected_count",
    [
        (("orders", [(1, "A"), (2, "B")]), 2),
        (("orders", [(1, "A")]), 1),
        (("orders", []), 0),
    ],
    indirect=["database_with_data"],  # Only database_with_data goes through the fixture
)
def test_row_count(database_with_data, expected_count):
    count = database_with_data.execute("SELECT COUNT(*) FROM orders").fetchone()[0]
    assert count == expected_count

Combining Fixtures and Parametrize

Real power comes from layering fixtures and parametrize:

Python
# conftest.py
@pytest.fixture
def dirty_dataframe():
    """A DataFrame with various data quality issues for cleaning tests."""
    return pd.DataFrame({
        "customer_id": ["C001", "  C002  ", None, "C004", "C004"],
        "email": ["alice@x.com", "BOB@X.COM", "carol@x.com", None, "dave@x.com"],
        "revenue": ["$100.00", "€200", "invalid", "300", "400"],
        "signup_date": ["2026-01-01", "2026-02-30", "2026-03-15", "not-a-date", "2026-04-01"],
    })


@pytest.mark.parametrize("column,null_fraction_threshold", [
    ("customer_id", 0.0),   # No nulls allowed after cleaning
    ("email", 0.3),          # Up to 30% nulls tolerated
    ("revenue", 0.2),        # Up to 20% parse failures tolerated
])
def test_cleaning_null_rates(dirty_dataframe, column, null_fraction_threshold):
    from src.transformers import clean_dataframe

    result = clean_dataframe(dirty_dataframe)

    if column in result.columns:
        null_rate = result[column].isnull().mean()
        assert null_rate <= null_fraction_threshold, (
            f"Column '{column}' has {null_rate:.1%} nulls, "
            f"exceeding threshold of {null_fraction_threshold:.1%}"
        )

Realistic Data Engineering Fixtures

Large Sample DataFrame for Performance Tests

Python
@pytest.fixture(scope="session")
def large_orders_df() -> pd.DataFrame:
    """
    100k-row orders DataFrame for performance and memory tests.
    Session-scoped — expensive to create, safe to share (read-only in tests).
    """
    import numpy as np

    rng = np.random.default_rng(seed=0)
    n = 100_000

    return pd.DataFrame({
        "order_id": [f"ORD-{i:07d}" for i in range(n)],
        "customer_id": [f"C{rng.integers(1, 10001):06d}" for _ in range(n)],
        "product_sku": rng.choice(
            [f"SKU-{c}" for c in "ABCDEFGHIJ"], size=n
        ),
        "quantity": rng.integers(1, 20, size=n),
        "unit_price": rng.uniform(1.0, 500.0, size=n).round(2),
        "order_date": pd.to_datetime(
            rng.integers(
                pd.Timestamp("2024-01-01").value,
                pd.Timestamp("2026-01-01").value,
                size=n,
            )
        ),
        "status": rng.choice(["shipped", "pending", "cancelled"], size=n, p=[0.7, 0.2, 0.1]),
    })


def test_aggregation_completes_in_reasonable_time(large_orders_df):
    import time
    from src.aggregators import compute_customer_revenue

    start = time.monotonic()
    result = compute_customer_revenue(large_orders_df)
    elapsed = time.monotonic() - start

    assert elapsed < 5.0, f"Aggregation took {elapsed:.2f}s — optimize the implementation"
    assert len(result) > 0

Mock S3 / File System Fixture

Python
@pytest.fixture
def mock_s3_bucket(tmp_path):
    """
    Simulates an S3 bucket structure using a local temp directory.
    Tests that write to / read from 'S3' use this fixture via monkeypatching.
    """
    bucket_root = tmp_path / "mock-s3" / "data-lake-bucket"
    bucket_root.mkdir(parents=True)

    # Pre-populate with sample data
    raw_dir = bucket_root / "raw" / "orders" / "2026" / "01"
    raw_dir.mkdir(parents=True)

    pd.DataFrame({
        "order_id": ["O1", "O2"],
        "revenue": [100.0, 200.0],
    }).to_parquet(raw_dir / "part-00001.parquet")

    return bucket_root


def test_pipeline_reads_partitioned_data(mock_s3_bucket, monkeypatch):
    from src.readers import S3ParquetReader

    # Monkeypatch the S3 base path to use our local mock
    monkeypatch.setenv("S3_DATA_LAKE_PATH", str(mock_s3_bucket))

    reader = S3ParquetReader(bucket="data-lake-bucket")
    df = reader.read_partition(table="orders", year=2026, month=1)

    assert len(df) == 2
    assert "order_id" in df.columns

Temp Directory with Schema Files

Python
@pytest.fixture
def schema_directory(tmp_path):
    """
    Creates a temporary directory with JSON schema files for schema registry tests.
    """
    import json

    schemas = {
        "orders_v1.json": {
            "version": "1.0",
            "required_columns": ["order_id", "customer_id", "revenue"],
            "column_types": {
                "order_id": "string",
                "customer_id": "string",
                "revenue": "float64",
            }
        },
        "customers_v1.json": {
            "version": "1.0",
            "required_columns": ["customer_id", "name", "email"],
            "column_types": {
                "customer_id": "string",
                "name": "string",
                "email": "string",
            }
        }
    }

    for filename, content in schemas.items():
        (tmp_path / filename).write_text(json.dumps(content))

    return tmp_path


def test_schema_registry_loads_all_schemas(schema_directory):
    from src.registry import SchemaRegistry

    registry = SchemaRegistry(schema_dir=str(schema_directory))
    assert registry.get("orders_v1") is not None
    assert registry.get("customers_v1") is not None
    assert registry.get("nonexistent") is None

Parametrized Tests for Data Cleaning Edge Cases

Here is a complete, production-quality parametrized test module for a clean_email function:

Python
# tests/unit/test_email_cleaner.py
import pytest
from src.transformers import clean_email


# (raw_input, expected_output)
VALID_EMAIL_CASES = [
    ("alice@example.com", "alice@example.com"),
    ("ALICE@EXAMPLE.COM", "alice@example.com"),  # Lowercased
    ("  alice@example.com  ", "alice@example.com"),  # Stripped
    ("Alice.Martin+tag@Example.COM", "alice.martin+tag@example.com"),
    ("user@subdomain.example.co.uk", "user@subdomain.example.co.uk"),
]

INVALID_EMAIL_CASES = [
    (None, None),
    ("", None),
    ("   ", None),
    ("not-an-email", None),
    ("missing@", None),
    ("@missinglocal.com", None),
    ("double@@domain.com", None),
    ("spaces in@email.com", None),
]


@pytest.mark.parametrize("raw,expected", VALID_EMAIL_CASES,
    ids=[f"valid_{i}" for i in range(len(VALID_EMAIL_CASES))])
def test_cleans_valid_email(raw, expected):
    assert clean_email(raw) == expected


@pytest.mark.parametrize("raw,expected", INVALID_EMAIL_CASES,
    ids=[case[0] or "none" for case in INVALID_EMAIL_CASES])
def test_returns_none_for_invalid_email(raw, expected):
    assert clean_email(raw) is None


@pytest.mark.parametrize("domain", [
    "gmail.com",
    "outlook.com",
    "company.io",
    "data-corp.co.uk",
    "subdomain.enterprise.example.com",
])
def test_preserves_various_domains(domain):
    email = f"user@{domain}"
    result = clean_email(email)
    assert result is not None
    assert result.endswith(f"@{domain}")

Fixture Debugging Tips

See fixture setup order

Bash
pytest --setup-show tests/unit/test_transformers.py -v
# Output shows:
# SETUP    S session_fixtures (session scope)
# SETUP    M module_fixtures (module scope)
# SETUP    F function_fixture (function scope)
# tests/unit/test_transformers.py::test_something PASSED
# TEARDOWN F function_fixture

List all available fixtures

Bash
pytest --fixtures  # All fixtures including built-ins
pytest --fixtures -v  # With descriptions from docstrings

Fixture finalization order

Fixtures are torn down in reverse order of their setup. If fixture B depends on fixture A, then B is torn down before A.

Python
@pytest.fixture
def database(request):
    conn = create_connection()
    yield conn
    conn.close()  # Runs last

@pytest.fixture
def populated_database(database):
    database.execute("INSERT INTO ...")
    yield database
    database.execute("DELETE FROM ...")  # Runs first (reverse order)

Summary

  • Use conftest.py to share fixtures without imports — scope them appropriately
  • function scope for mutable data, session scope for expensive infrastructure
  • yield fixtures for setup/teardown — always clean up after integration fixtures
  • Fixture factories let one fixture create multiple independent instances
  • parametrize eliminates copy-paste test functions — use pytest.param for per-case marks
  • Combine fixtures and parametrize for powerful, DRY test suites
  • indirect parametrize passes values through fixtures for dynamic setup
  • Use --setup-show to debug fixture lifecycle issues

The next lesson covers mocking — patching external dependencies so your tests don't need real Snowflake credentials or S3 buckets.

Enjoyed this article?

Explore the Data Engineering learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.