pytest Fixtures and Parametrization for Data Pipelines
Build reusable test infrastructure with fixtures at every scope level, conftest.py, yield fixtures for teardown, fixture factories, and parametrize for exhaustive edge-case coverage in data engineering pipelines.
pytest Fixtures and Parametrization for Data Pipelines
Fixtures and parametrization are the two features that separate a 50-test suite from a 500-test suite — without 500 lines of duplicated setup code. In data engineering, you constantly need: sample DataFrames, database connections, temp directories for output files, and exhaustive edge-case coverage for cleaning functions. This lesson covers all of it with production patterns.
What Is a Fixture?
A fixture is a function decorated with @pytest.fixture. pytest automatically discovers and injects fixtures into test functions by matching parameter names.
import pytest
import pandas as pd
@pytest.fixture
def sample_df():
return pd.DataFrame({
"customer_id": ["C001", "C002", "C003"],
"revenue": [100.0, 250.0, 75.0],
"region": ["NORTH", "SOUTH", "EAST"],
})
def test_revenue_sum(sample_df):
# sample_df is automatically injected — no instantiation needed
assert sample_df["revenue"].sum() == 425.0
def test_customer_count(sample_df):
assert len(sample_df) == 3This looks trivial, but fixtures compose, scope, and share across files — that is where the power lies.
Fixture Scope
The scope parameter controls how often pytest creates and destroys a fixture:
| Scope | Created once per... | Use for |
|-------|---------------------|---------|
| function | Test function (default) | Mutable state, DataFrames |
| class | Test class | Shared setup across methods |
| module | Test file | Expensive setup reused in one file |
| session | Entire test run | Database connections, Docker containers |
function scope (Default)
Each test gets a fresh copy. Safe for mutable objects.
@pytest.fixture(scope="function")
def clean_dataframe():
"""Fresh DataFrame for each test — mutations in one test don't bleed into others."""
return pd.DataFrame({
"id": [1, 2, 3],
"value": [10.0, 20.0, 30.0],
})class scope
Created once for all methods in the class, destroyed after the last method runs.
@pytest.fixture(scope="class")
def schema_validator():
"""Expensive to construct — reuse within a test class."""
from src.validators import SchemaValidator
return SchemaValidator.from_config("tests/fixtures/schema.json")
@pytest.mark.usefixtures("schema_validator")
class TestSchemaValidation:
def test_valid_record_passes(self, schema_validator):
result = schema_validator.validate({"id": 1, "name": "Alice"})
assert result.is_valid
def test_missing_id_fails(self, schema_validator):
result = schema_validator.validate({"name": "Alice"})
assert not result.is_validmodule scope
Shared across all tests in a single file. Good for an in-memory database loaded from a fixture CSV.
@pytest.fixture(scope="module")
def loaded_reference_data():
"""Load reference tables once per test module — these are read-only."""
import pandas as pd
return {
"country_codes": pd.read_csv("tests/fixtures/country_codes.csv"),
"currency_rates": pd.read_csv("tests/fixtures/currency_rates.csv"),
}session scope
The fixture lives for the entire pytest run. Use for shared infrastructure: a Docker-started PostgreSQL, a DuckDB connection, or a large dataset loaded once.
@pytest.fixture(scope="session")
def duckdb_connection():
"""
Single DuckDB in-memory connection shared across all tests.
DuckDB handles concurrent reads safely for in-memory connections.
"""
import duckdb
conn = duckdb.connect(":memory:")
# Seed with test data
conn.execute("""
CREATE TABLE orders AS
SELECT * FROM read_csv_auto('tests/fixtures/orders.csv')
""")
yield conn
conn.close()conftest.py: Sharing Fixtures Across Files
conftest.py files are automatically loaded by pytest. Any fixture defined there is available to all tests in the same directory and its subdirectories — no import required.
tests/
├── conftest.py ← fixtures available to ALL tests
├── unit/
│ ├── conftest.py ← fixtures available to tests/unit/** only
│ └── test_transformers.py
└── integration/
├── conftest.py ← fixtures available to tests/integration/** only
└── test_database.pyRoot conftest.py (shared infrastructure)
# tests/conftest.py
import pytest
import pandas as pd
import tempfile
import os
from pathlib import Path
@pytest.fixture(scope="session")
def test_data_dir() -> Path:
"""Absolute path to the test fixtures directory."""
return Path(__file__).parent / "fixtures"
@pytest.fixture(scope="session")
def sample_orders_csv(test_data_dir) -> Path:
"""Path to the sample orders CSV file."""
path = test_data_dir / "orders.csv"
assert path.exists(), f"Fixture file not found: {path}"
return path
@pytest.fixture
def sample_orders_df() -> pd.DataFrame:
"""
A small, representative orders DataFrame.
function-scoped so each test gets a fresh mutable copy.
"""
return pd.DataFrame({
"order_id": ["ORD-001", "ORD-002", "ORD-003", "ORD-004", "ORD-005"],
"customer_id": ["C001", "C002", "C001", "C003", "C002"],
"product_sku": ["SKU-A", "SKU-B", "SKU-A", "SKU-C", "SKU-B"],
"quantity": [2, 1, 3, 1, 2],
"unit_price": [29.99, 149.99, 29.99, 299.99, 149.99],
"order_date": pd.to_datetime([
"2026-01-01", "2026-01-02", "2026-01-03",
"2026-01-04", "2026-01-05"
]),
"status": ["shipped", "pending", "shipped", "cancelled", "shipped"],
})
@pytest.fixture
def sample_customers_df() -> pd.DataFrame:
return pd.DataFrame({
"customer_id": ["C001", "C002", "C003"],
"name": ["Alice Martin", "Bob Chen", "Carol White"],
"region": ["NORTH", "SOUTH", "EAST"],
"tier": ["gold", "silver", "gold"],
})
@pytest.fixture
def temp_output_dir(tmp_path) -> Path:
"""
A temporary directory for pipeline output.
tmp_path is a built-in pytest fixture that provides a unique dir per test.
"""
output = tmp_path / "pipeline_output"
output.mkdir()
return outputIntegration conftest.py
# tests/integration/conftest.py
import pytest
import os
def pytest_configure(config):
"""Register integration-specific markers."""
config.addinivalue_line("markers", "requires_db: test requires database connection")
@pytest.fixture(scope="session")
def db_config():
"""
Database config from environment variables.
In CI, these are set as secrets. Locally, use a .env file.
"""
host = os.environ.get("TEST_DB_HOST", "localhost")
port = int(os.environ.get("TEST_DB_PORT", "5432"))
database = os.environ.get("TEST_DB_NAME", "test_pipeline")
user = os.environ.get("TEST_DB_USER", "postgres")
password = os.environ.get("TEST_DB_PASSWORD", "")
return {
"host": host,
"port": port,
"database": database,
"user": user,
"password": password,
}
@pytest.fixture(scope="session")
def pg_connection(db_config):
"""
Live PostgreSQL connection — session-scoped for performance.
Skips if database is not reachable.
"""
try:
import psycopg2
conn = psycopg2.connect(**db_config)
conn.autocommit = False
yield conn
conn.close()
except Exception as e:
pytest.skip(f"Cannot connect to test database: {e}")Yield Fixtures: Setup and Teardown
yield in a fixture separates setup (before yield) from teardown (after yield). This is the pytest equivalent of setUp/tearDown, but scoped precisely.
@pytest.fixture
def temp_parquet_file(tmp_path) -> str:
"""
Create a temporary parquet file for testing loaders.
File is removed after the test regardless of pass/fail.
"""
import pandas as pd
path = tmp_path / "test_data.parquet"
df = pd.DataFrame({
"id": [1, 2, 3],
"value": ["a", "b", "c"],
})
df.to_parquet(path)
yield str(path) # Test receives the path string
# Teardown: nothing needed — tmp_path cleans itself
# But if you had a database table, you'd drop it here
@pytest.fixture(scope="module")
def duckdb_with_orders():
"""
DuckDB in-memory DB seeded with orders data.
Created once per module, dropped after all module tests complete.
"""
import duckdb
conn = duckdb.connect(":memory:")
conn.execute("""
CREATE TABLE orders (
order_id VARCHAR,
customer_id VARCHAR,
revenue DOUBLE,
order_date DATE
)
""")
conn.execute("""
INSERT INTO orders VALUES
('O1', 'C1', 100.0, '2026-01-01'),
('O2', 'C2', 200.0, '2026-01-02'),
('O3', 'C1', 150.0, '2026-01-03')
""")
yield conn
# Teardown — drop tables to release resources
conn.execute("DROP TABLE IF EXISTS orders")
conn.close()
def test_orders_table_row_count(duckdb_with_orders):
count = duckdb_with_orders.execute("SELECT COUNT(*) FROM orders").fetchone()[0]
assert count == 3
def test_total_revenue(duckdb_with_orders):
total = duckdb_with_orders.execute("SELECT SUM(revenue) FROM orders").fetchone()[0]
assert total == 450.0Transactional Teardown (PostgreSQL Pattern)
For integration tests hitting a real database, wrap each test in a transaction and roll back:
@pytest.fixture
def db_transaction(pg_connection):
"""
Wrap each test in a transaction that is rolled back after.
This keeps tests isolated without recreating the schema.
"""
pg_connection.autocommit = False
yield pg_connection
pg_connection.rollback() # Always roll back — test data disappears
def test_insert_order(db_transaction):
cursor = db_transaction.cursor()
cursor.execute(
"INSERT INTO orders (order_id, customer_id, revenue) VALUES (%s, %s, %s)",
("TEST-001", "C999", 999.99)
)
cursor.execute("SELECT revenue FROM orders WHERE order_id = %s", ("TEST-001",))
row = cursor.fetchone()
assert row[0] == pytest.approx(999.99)
# After test: rollback removes TEST-001 from the DBFixture Factories (Returning Functions)
Sometimes you need a fixture that creates multiple instances with different parameters. Return a factory function:
@pytest.fixture
def make_dataframe():
"""
Factory fixture: returns a function that creates DataFrames on demand.
Each call to make_dataframe() creates a new DataFrame with custom parameters.
"""
def _factory(
n_rows: int = 10,
null_fraction: float = 0.0,
include_revenue: bool = True,
revenue_range: tuple = (10.0, 1000.0),
) -> pd.DataFrame:
import numpy as np
rng = np.random.default_rng(seed=42)
data = {
"customer_id": [f"C{i:04d}" for i in range(n_rows)],
"order_date": pd.date_range("2026-01-01", periods=n_rows, freq="D"),
"region": rng.choice(["NORTH", "SOUTH", "EAST", "WEST"], size=n_rows),
}
if include_revenue:
revenues = rng.uniform(*revenue_range, size=n_rows)
if null_fraction > 0:
null_mask = rng.random(n_rows) < null_fraction
revenues = revenues.astype(object)
revenues[null_mask] = None
data["revenue"] = revenues
return pd.DataFrame(data)
return _factory
def test_pipeline_handles_large_input(make_dataframe):
large_df = make_dataframe(n_rows=10_000)
assert len(large_df) == 10_000
def test_pipeline_handles_sparse_revenue(make_dataframe):
sparse_df = make_dataframe(n_rows=100, null_fraction=0.3)
null_pct = sparse_df["revenue"].isnull().mean()
# Approximately 30% nulls (seeded RNG gives deterministic results)
assert 0.2 <= null_pct <= 0.4
def test_pipeline_without_revenue_column(make_dataframe):
df = make_dataframe(include_revenue=False)
assert "revenue" not in df.columnsMock Connection Factory
@pytest.fixture
def mock_db_connection_factory():
"""
Returns a factory for creating mock database connections.
Each call configures the mock to return specific query results.
"""
from unittest.mock import MagicMock
def _factory(query_results: dict):
"""
query_results: mapping of SQL substring → list of rows to return
e.g. {"SELECT * FROM orders": [("O1", 100), ("O2", 200)]}
"""
conn = MagicMock()
cursor = MagicMock()
conn.cursor.return_value = cursor
def mock_execute(sql, *args, **kwargs):
for pattern, rows in query_results.items():
if pattern in sql:
cursor.fetchall.return_value = rows
cursor.fetchone.return_value = rows[0] if rows else None
return
cursor.execute.side_effect = mock_execute
return conn
return _factory
def test_extract_uses_correct_query(mock_db_connection_factory):
from src.extractors import OrderExtractor
conn = mock_db_connection_factory({
"SELECT * FROM orders": [
("O1", "C1", 100.0),
("O2", "C2", 200.0),
]
})
extractor = OrderExtractor(conn)
df = extractor.extract()
assert len(df) == 2@pytest.mark.parametrize
Parametrize runs a single test function with multiple argument sets. It replaces copy-paste test functions.
Single Parameter
from src.transformers import clean_phone_number
import pytest
@pytest.mark.parametrize("raw_input,expected", [
("+1 (555) 867-5309", "15558675309"),
("555.867.5309", "5558675309"),
("555-867-5309", "5558675309"),
("(555)8675309", "5558675309"),
("5558675309", "5558675309"),
(" 5558675309 ", "5558675309"), # Whitespace
])
def test_clean_phone_number(raw_input, expected):
assert clean_phone_number(raw_input) == expectedRunning this produces 6 separate test cases, each independently pass/fail:
test_transformers.py::test_clean_phone_number[+1 (555) 867-5309-15558675309] PASSED
test_transformers.py::test_clean_phone_number[555.867.5309-5558675309] PASSED
...Multiple Parameters
from src.transformers import normalize_currency
@pytest.mark.parametrize("currency_string,expected_value,expected_currency", [
("$1,234.56", 1234.56, "USD"),
("€500.00", 500.00, "EUR"),
("£99.99", 99.99, "GBP"),
("1000", 1000.0, None), # No currency symbol
("USD 250", 250.0, "USD"), # Prefix code
("250 EUR", 250.0, "EUR"), # Suffix code
])
def test_normalize_currency(currency_string, expected_value, expected_currency):
value, currency = normalize_currency(currency_string)
assert value == pytest.approx(expected_value)
assert currency == expected_currencyParametrize with IDs
Custom IDs make the test output readable:
@pytest.mark.parametrize("df,expected_error_count", [
pytest.param(
pd.DataFrame({"order_id": ["O1"], "revenue": [100.0]}),
0,
id="valid_single_row",
),
pytest.param(
pd.DataFrame({"order_id": ["O1", "O1"], "revenue": [100.0, 200.0]}),
1,
id="duplicate_order_id",
),
pytest.param(
pd.DataFrame({"revenue": [100.0]}),
1,
id="missing_order_id_column",
),
pytest.param(
pd.DataFrame(columns=["order_id", "revenue"]),
1,
id="empty_dataframe",
),
], ids=lambda x: x if isinstance(x, str) else None)
def test_validation_error_count(df, expected_error_count):
from src.validators import validate_pipeline_output
result = validate_pipeline_output(df, required_columns=["order_id", "revenue"])
assert len(result.errors) == expected_error_countpytest.param with skip and xfail
@pytest.mark.parametrize("input_value,expected", [
pytest.param(None, None, id="null_input"),
pytest.param("", None, id="empty_string"),
pytest.param("2026-01-15", pd.Timestamp("2026-01-15"), id="iso_format"),
pytest.param("15/01/2026", pd.Timestamp("2026-01-15"), id="uk_format"),
pytest.param("Jan 15 2026", pd.Timestamp("2026-01-15"), id="verbose_format"),
pytest.param(
"32/01/2026", # Invalid day
None,
id="invalid_day",
),
pytest.param(
"2026-13-01", # Invalid month
None,
id="invalid_month",
marks=pytest.mark.xfail(reason="Parser doesn't yet handle invalid months — JIRA-5678"),
),
pytest.param(
"yesterday",
None,
id="natural_language_date",
marks=pytest.mark.skip(reason="Natural language parsing not implemented"),
),
])
def test_parse_date(input_value, expected):
from src.transformers import parse_date
result = parse_date(input_value)
if expected is None:
assert result is None
else:
assert result == expectedParametrize on a Class
@pytest.mark.parametrize("region,expected_tier", [
("NORTH", "premium"),
("SOUTH", "standard"),
("EAST", "standard"),
("WEST", "premium"),
])
class TestRegionTierMapping:
def test_tier_assignment(self, region, expected_tier):
from src.enrichment import get_region_tier
assert get_region_tier(region) == expected_tier
def test_tier_is_string(self, region, expected_tier):
from src.enrichment import get_region_tier
result = get_region_tier(region)
assert isinstance(result, str)Indirect Parametrize
indirect=True passes the parameter value to a fixture function instead of directly to the test. This is useful when you want to parametrize fixture construction.
@pytest.fixture
def configured_etl(request):
"""
Fixture that accepts its configuration from parametrize via indirect.
request.param contains the value passed from parametrize.
"""
from src.etl import SalesETL
config = request.param
return SalesETL(config=config)
@pytest.mark.parametrize("configured_etl", [
{"env": "development", "batch_size": 100},
{"env": "staging", "batch_size": 1000},
{"env": "production", "batch_size": 10000},
], indirect=True)
def test_etl_initializes_with_config(configured_etl):
assert configured_etl.config is not None
assert "env" in configured_etl.config
assert "batch_size" in configured_etl.configPartial Indirect (mix of direct and fixture)
@pytest.fixture
def database_with_data(request):
"""Fixture that pre-loads a DuckDB with the requested table data."""
import duckdb
table_name, rows = request.param
conn = duckdb.connect(":memory:")
conn.execute(f"CREATE TABLE {table_name} (id INT, value VARCHAR)")
for row in rows:
conn.execute(f"INSERT INTO {table_name} VALUES (?, ?)", row)
yield conn
conn.close()
@pytest.mark.parametrize(
"database_with_data,expected_count",
[
(("orders", [(1, "A"), (2, "B")]), 2),
(("orders", [(1, "A")]), 1),
(("orders", []), 0),
],
indirect=["database_with_data"], # Only database_with_data goes through the fixture
)
def test_row_count(database_with_data, expected_count):
count = database_with_data.execute("SELECT COUNT(*) FROM orders").fetchone()[0]
assert count == expected_countCombining Fixtures and Parametrize
Real power comes from layering fixtures and parametrize:
# conftest.py
@pytest.fixture
def dirty_dataframe():
"""A DataFrame with various data quality issues for cleaning tests."""
return pd.DataFrame({
"customer_id": ["C001", " C002 ", None, "C004", "C004"],
"email": ["alice@x.com", "BOB@X.COM", "carol@x.com", None, "dave@x.com"],
"revenue": ["$100.00", "€200", "invalid", "300", "400"],
"signup_date": ["2026-01-01", "2026-02-30", "2026-03-15", "not-a-date", "2026-04-01"],
})
@pytest.mark.parametrize("column,null_fraction_threshold", [
("customer_id", 0.0), # No nulls allowed after cleaning
("email", 0.3), # Up to 30% nulls tolerated
("revenue", 0.2), # Up to 20% parse failures tolerated
])
def test_cleaning_null_rates(dirty_dataframe, column, null_fraction_threshold):
from src.transformers import clean_dataframe
result = clean_dataframe(dirty_dataframe)
if column in result.columns:
null_rate = result[column].isnull().mean()
assert null_rate <= null_fraction_threshold, (
f"Column '{column}' has {null_rate:.1%} nulls, "
f"exceeding threshold of {null_fraction_threshold:.1%}"
)Realistic Data Engineering Fixtures
Large Sample DataFrame for Performance Tests
@pytest.fixture(scope="session")
def large_orders_df() -> pd.DataFrame:
"""
100k-row orders DataFrame for performance and memory tests.
Session-scoped — expensive to create, safe to share (read-only in tests).
"""
import numpy as np
rng = np.random.default_rng(seed=0)
n = 100_000
return pd.DataFrame({
"order_id": [f"ORD-{i:07d}" for i in range(n)],
"customer_id": [f"C{rng.integers(1, 10001):06d}" for _ in range(n)],
"product_sku": rng.choice(
[f"SKU-{c}" for c in "ABCDEFGHIJ"], size=n
),
"quantity": rng.integers(1, 20, size=n),
"unit_price": rng.uniform(1.0, 500.0, size=n).round(2),
"order_date": pd.to_datetime(
rng.integers(
pd.Timestamp("2024-01-01").value,
pd.Timestamp("2026-01-01").value,
size=n,
)
),
"status": rng.choice(["shipped", "pending", "cancelled"], size=n, p=[0.7, 0.2, 0.1]),
})
def test_aggregation_completes_in_reasonable_time(large_orders_df):
import time
from src.aggregators import compute_customer_revenue
start = time.monotonic()
result = compute_customer_revenue(large_orders_df)
elapsed = time.monotonic() - start
assert elapsed < 5.0, f"Aggregation took {elapsed:.2f}s — optimize the implementation"
assert len(result) > 0Mock S3 / File System Fixture
@pytest.fixture
def mock_s3_bucket(tmp_path):
"""
Simulates an S3 bucket structure using a local temp directory.
Tests that write to / read from 'S3' use this fixture via monkeypatching.
"""
bucket_root = tmp_path / "mock-s3" / "data-lake-bucket"
bucket_root.mkdir(parents=True)
# Pre-populate with sample data
raw_dir = bucket_root / "raw" / "orders" / "2026" / "01"
raw_dir.mkdir(parents=True)
pd.DataFrame({
"order_id": ["O1", "O2"],
"revenue": [100.0, 200.0],
}).to_parquet(raw_dir / "part-00001.parquet")
return bucket_root
def test_pipeline_reads_partitioned_data(mock_s3_bucket, monkeypatch):
from src.readers import S3ParquetReader
# Monkeypatch the S3 base path to use our local mock
monkeypatch.setenv("S3_DATA_LAKE_PATH", str(mock_s3_bucket))
reader = S3ParquetReader(bucket="data-lake-bucket")
df = reader.read_partition(table="orders", year=2026, month=1)
assert len(df) == 2
assert "order_id" in df.columnsTemp Directory with Schema Files
@pytest.fixture
def schema_directory(tmp_path):
"""
Creates a temporary directory with JSON schema files for schema registry tests.
"""
import json
schemas = {
"orders_v1.json": {
"version": "1.0",
"required_columns": ["order_id", "customer_id", "revenue"],
"column_types": {
"order_id": "string",
"customer_id": "string",
"revenue": "float64",
}
},
"customers_v1.json": {
"version": "1.0",
"required_columns": ["customer_id", "name", "email"],
"column_types": {
"customer_id": "string",
"name": "string",
"email": "string",
}
}
}
for filename, content in schemas.items():
(tmp_path / filename).write_text(json.dumps(content))
return tmp_path
def test_schema_registry_loads_all_schemas(schema_directory):
from src.registry import SchemaRegistry
registry = SchemaRegistry(schema_dir=str(schema_directory))
assert registry.get("orders_v1") is not None
assert registry.get("customers_v1") is not None
assert registry.get("nonexistent") is NoneParametrized Tests for Data Cleaning Edge Cases
Here is a complete, production-quality parametrized test module for a clean_email function:
# tests/unit/test_email_cleaner.py
import pytest
from src.transformers import clean_email
# (raw_input, expected_output)
VALID_EMAIL_CASES = [
("alice@example.com", "alice@example.com"),
("ALICE@EXAMPLE.COM", "alice@example.com"), # Lowercased
(" alice@example.com ", "alice@example.com"), # Stripped
("Alice.Martin+tag@Example.COM", "alice.martin+tag@example.com"),
("user@subdomain.example.co.uk", "user@subdomain.example.co.uk"),
]
INVALID_EMAIL_CASES = [
(None, None),
("", None),
(" ", None),
("not-an-email", None),
("missing@", None),
("@missinglocal.com", None),
("double@@domain.com", None),
("spaces in@email.com", None),
]
@pytest.mark.parametrize("raw,expected", VALID_EMAIL_CASES,
ids=[f"valid_{i}" for i in range(len(VALID_EMAIL_CASES))])
def test_cleans_valid_email(raw, expected):
assert clean_email(raw) == expected
@pytest.mark.parametrize("raw,expected", INVALID_EMAIL_CASES,
ids=[case[0] or "none" for case in INVALID_EMAIL_CASES])
def test_returns_none_for_invalid_email(raw, expected):
assert clean_email(raw) is None
@pytest.mark.parametrize("domain", [
"gmail.com",
"outlook.com",
"company.io",
"data-corp.co.uk",
"subdomain.enterprise.example.com",
])
def test_preserves_various_domains(domain):
email = f"user@{domain}"
result = clean_email(email)
assert result is not None
assert result.endswith(f"@{domain}")Fixture Debugging Tips
See fixture setup order
pytest --setup-show tests/unit/test_transformers.py -v
# Output shows:
# SETUP S session_fixtures (session scope)
# SETUP M module_fixtures (module scope)
# SETUP F function_fixture (function scope)
# tests/unit/test_transformers.py::test_something PASSED
# TEARDOWN F function_fixtureList all available fixtures
pytest --fixtures # All fixtures including built-ins
pytest --fixtures -v # With descriptions from docstringsFixture finalization order
Fixtures are torn down in reverse order of their setup. If fixture B depends on fixture A, then B is torn down before A.
@pytest.fixture
def database(request):
conn = create_connection()
yield conn
conn.close() # Runs last
@pytest.fixture
def populated_database(database):
database.execute("INSERT INTO ...")
yield database
database.execute("DELETE FROM ...") # Runs first (reverse order)Summary
- Use
conftest.pyto share fixtures without imports — scope them appropriately functionscope for mutable data,sessionscope for expensive infrastructureyieldfixtures for setup/teardown — always clean up after integration fixtures- Fixture factories let one fixture create multiple independent instances
parametrizeeliminates copy-paste test functions — usepytest.paramfor per-case marks- Combine fixtures and parametrize for powerful, DRY test suites
indirectparametrize passes values through fixtures for dynamic setup- Use
--setup-showto debug fixture lifecycle issues
The next lesson covers mocking — patching external dependencies so your tests don't need real Snowflake credentials or S3 buckets.
Enjoyed this article?
Explore the Data Engineering learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.