pytest Fundamentals for Data Engineers
Master pytest from the ground up: configuration, test discovery, assert rewriting, markers, filtering, and testing real pandas pipelines and ETL classes with production-quality patterns.
pytest Fundamentals for Data Engineers
Testing data pipelines is not optional. Without tests, every refactor is a gamble, every deployment is a prayer, and every data quality incident is a surprise. This guide builds a solid pytest foundation using examples you will encounter in real data engineering work — pandas transformations, validation functions, and ETL classes.
Why pytest Over unittest
Python ships with unittest, but pytest has become the standard for good reasons:
- Plain
assertstatements with rich diff output — noassertEqual,assertIn,assertRaisesceremony - Automatic test discovery — no base classes required
- Fixture system that scales from unit tests to integration suites
- Plugin ecosystem:
pytest-cov,pytest-mock,pytest-xdist,pytest-asyncio, and hundreds more - Parameterization as a first-class feature
# unittest style — verbose boilerplate
import unittest
class TestValidation(unittest.TestCase):
def test_non_null_columns(self):
import pandas as pd
df = pd.DataFrame({"id": [1, 2, None]})
self.assertFalse(df["id"].isnull().any(), "id column contains nulls")
# pytest style — readable, minimal
import pandas as pd
def test_non_null_columns():
df = pd.DataFrame({"id": [1, 2, None]})
assert not df["id"].isnull().any(), "id column contains nulls"Installation
# Minimal install
pip install pytest
# Data engineering test stack
pip install pytest pytest-cov pytest-mock pytest-xdist pandas pyarrow
# Lock versions in your pyproject.tomlVerify the install:
pytest --version
# pytest 8.x.xConfiguration: pyproject.toml vs pytest.ini
pyproject.toml (Recommended)
Modern Python projects consolidate all tooling configuration into pyproject.toml. This avoids the sprawl of setup.cfg, pytest.ini, .coveragerc, and friends.
# pyproject.toml
[tool.pytest.ini_options]
# Where pytest looks for tests
testpaths = ["tests"]
# Minimum pytest version required
minversion = "8.0"
# Default flags applied to every run
addopts = [
"--strict-markers", # Error on unknown markers instead of warning
"--strict-config", # Error on config warnings
"-ra", # Show short summary for all except passed
"--tb=short", # Shorter tracebacks
]
# Register custom markers (required when --strict-markers is set)
markers = [
"unit: fast, isolated unit tests",
"integration: tests that hit real databases or external services",
"slow: tests that take more than 5 seconds",
"smoke: minimal set to verify basic functionality",
]
# Log capture settings
log_cli = true
log_cli_level = "INFO"
log_format = "%(asctime)s %(levelname)s %(message)s"
log_date_format = "%Y-%m-%d %H:%M:%S"
# Coverage via pytest-cov (when using --cov flag)
# Separate [tool.coverage] section handles .coveragerc equivalent
[tool.coverage.run]
source = ["src"]
omit = ["tests/*", "*/migrations/*", "*/conftest.py"]
[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"if TYPE_CHECKING:",
"raise NotImplementedError",
"if __name__ == .__main__.:",
]pytest.ini (Legacy but still common)
If you are working in a project that predates pyproject.toml adoption:
# pytest.ini
[pytest]
testpaths = tests
minversion = 8.0
addopts = --strict-markers --strict-config -ra --tb=short
markers =
unit: fast isolated unit tests
integration: tests requiring external services
slow: tests taking more than 5 seconds
log_cli = true
log_cli_level = INFOsetup.cfg (Avoid for new projects)
# setup.cfg — still works, but migrate to pyproject.toml
[tool:pytest]
testpaths = tests
addopts = --strict-markersTest Discovery Rules
pytest discovers tests by walking directories and applying these rules in order:
- Start from
testpaths(or current directory if not set) - Recurse into directories that match
norecursedirsexclusion patterns - Collect files matching
python_filespattern (default:test_*.pyor*_test.py) - Inside those files, collect functions matching
python_functions(default:test_*) - Inside those files, collect classes matching
python_classes(default:Test*) - Inside
Test*classes, collect methods matchingtest_*
tests/
├── conftest.py # Shared fixtures — auto-loaded
├── unit/
│ ├── __init__.py # Not required, but common
│ ├── test_validators.py # Discovered
│ └── test_transformers.py # Discovered
├── integration/
│ ├── test_database.py # Discovered
│ └── test_api.py # Discovered
└── fixtures/
└── sample_data.csv # NOT collected (no test_ prefix)Customize discovery in pyproject.toml:
[tool.pytest.ini_options]
python_files = ["test_*.py", "*_test.py", "check_*.py"]
python_classes = ["Test*", "Check*"]
python_functions = ["test_*", "check_*"]
norecursedirs = [".git", "node_modules", ".venv", "dist", "build", "__pycache__"]Assert Rewriting: The pytest Superpower
pytest rewrites assert statements at import time to provide detailed failure messages without any extra code. This is one of the most underappreciated features.
import pandas as pd
import numpy as np
def test_assert_rewriting_demo():
expected = pd.DataFrame({
"customer_id": [1, 2, 3],
"revenue": [100.0, 200.0, 300.0],
})
actual = pd.DataFrame({
"customer_id": [1, 2, 3],
"revenue": [100.0, 250.0, 300.0], # 200 → 250 intentional mismatch
})
# Plain assert — pytest makes this informative
assert list(actual["revenue"]) == list(expected["revenue"])
# Output:
# AssertionError: assert [100.0, 250.0, 300.0] == [100.0, 200.0, 300.0]
# At index 1 diff: 250.0 != 200.0For DataFrame comparisons, use pd.testing.assert_frame_equal which gives column-level diffs:
def test_dataframe_equality():
expected = pd.DataFrame({"a": [1, 2], "b": [3, 4]})
actual = pd.DataFrame({"a": [1, 2], "b": [3, 5]}) # b[1] differs
pd.testing.assert_frame_equal(actual, expected)
# AssertionError: DataFrame.iloc[:, 1] are different
# Column values are different (50.0 %)
# [left]: [3, 5]
# [right]: [3, 4]Test Functions vs Test Classes
Test Functions (Preferred for most cases)
# tests/unit/test_validators.py
def test_revenue_is_positive():
assert validate_revenue(100.0) is True
def test_revenue_rejects_negative():
assert validate_revenue(-1.0) is False
def test_revenue_rejects_zero():
assert validate_revenue(0.0) is FalseTest Classes (Use for grouping related tests with shared setup)
Test classes in pytest do NOT inherit from unittest.TestCase. They are plain classes.
class TestRevenueValidator:
"""Group tests for the revenue validation function."""
def test_accepts_positive_integer(self):
assert validate_revenue(100) is True
def test_accepts_positive_float(self):
assert validate_revenue(99.99) is True
def test_rejects_negative(self):
assert validate_revenue(-1) is False
def test_rejects_zero(self):
assert validate_revenue(0) is False
def test_rejects_none(self):
assert validate_revenue(None) is False
def test_rejects_string(self):
assert validate_revenue("100") is FalseKey rule: Do NOT use __init__ in test classes. pytest does not call __init__ and will refuse to collect classes that define it.
Markers
Markers let you label tests for selective execution. Register all custom markers in pyproject.toml when using --strict-markers.
Built-in Markers
import pytest
# Skip a test unconditionally
@pytest.mark.skip(reason="API endpoint not yet implemented")
def test_export_api():
...
# Skip conditionally
@pytest.mark.skipif(
condition=not is_snowflake_available(),
reason="Snowflake credentials not configured"
)
def test_snowflake_query():
...
# Mark as expected failure
@pytest.mark.xfail(reason="Known bug in date parsing — JIRA-1234")
def test_date_parsing_edge_case():
...
# Mark as expected failure, fail if it unexpectedly passes
@pytest.mark.xfail(strict=True, reason="Should fail until fix is merged")
def test_future_feature():
...Custom Markers for Data Engineering
# tests/unit/test_transformers.py
import pytest
@pytest.mark.unit
def test_clean_phone_numbers():
from src.transformers import clean_phone
assert clean_phone("+1 (555) 867-5309") == "15558675309"
assert clean_phone("555.867.5309") == "5558675309"
assert clean_phone(None) is None
@pytest.mark.unit
def test_normalize_currency():
from src.transformers import normalize_currency
assert normalize_currency("$1,234.56") == 1234.56
assert normalize_currency("EUR 500") == 500.0
@pytest.mark.integration
def test_pipeline_reads_from_s3():
# Hits real S3 — only run in CI with credentials
...
@pytest.mark.slow
def test_full_historical_backfill():
# Processes 10M rows — only run nightly
...Apply markers at the class level to tag all methods:
@pytest.mark.unit
class TestPhoneNormalizer:
def test_us_format(self): ...
def test_international_format(self): ...
def test_strips_whitespace(self): ...
# All three tests get the 'unit' markerRunning Tests: Flags That Matter
Basic Execution
# Run all tests
pytest
# Run with verbose output (shows each test name)
pytest -v
# Run with extra verbose (shows fixture setup/teardown)
pytest -vv
# Show print() output and logging.info() during tests
pytest -s
# Combine: verbose + show output
pytest -vs
# Stop at first failure
pytest -x
# Stop after N failures
pytest --maxfail=3Filtering with -k
The -k flag filters tests by name expression. It supports and, or, not, and substring matching.
# Run tests whose name contains "revenue"
pytest -k revenue
# Run unit tests only
pytest -k unit
# Run tests matching either pattern
pytest -k "phone or currency"
# Exclude slow tests
pytest -k "not slow"
# Complex expression
pytest -k "unit and not database"
# Run a specific test file
pytest tests/unit/test_validators.py
# Run a specific test function
pytest tests/unit/test_validators.py::test_revenue_is_positive
# Run a specific test class
pytest tests/unit/test_validators.py::TestRevenueValidator
# Run a specific method in a class
pytest tests/unit/test_validators.py::TestRevenueValidator::test_accepts_positive_floatRunning by Marker
# Run only unit tests
pytest -m unit
# Run unit and smoke tests
pytest -m "unit or smoke"
# Run everything except integration and slow
pytest -m "not integration and not slow"Output Formats
# Short traceback (good default)
pytest --tb=short
# Long traceback (most context)
pytest --tb=long
# Single line per failure
pytest --tb=line
# No traceback, just failure names
pytest --tb=no
# Native Python traceback style
pytest --tb=native
# JUnit XML for CI systems
pytest --junit-xml=test-results/junit.xml
# JSON report (requires pytest-json-report)
pytest --json-report --json-report-file=test-results/report.jsonReal Example: Testing a Pandas Pipeline Function
Here is the source code under test:
# src/transformers.py
import pandas as pd
import numpy as np
from typing import Optional
def normalize_sales_data(df: pd.DataFrame) -> pd.DataFrame:
"""
Normalize raw sales data from CRM export.
Transformations applied:
- Strip whitespace from string columns
- Normalize revenue: remove currency symbols, convert to float
- Parse date columns to datetime
- Drop rows where customer_id is null
- Clip revenue to [0, 1_000_000] range (outlier handling)
"""
if df.empty:
return df
result = df.copy()
# Strip whitespace from object columns
str_cols = result.select_dtypes(include="object").columns
result[str_cols] = result[str_cols].apply(lambda col: col.str.strip())
# Normalize revenue
if "revenue" in result.columns:
result["revenue"] = (
result["revenue"]
.astype(str)
.str.replace(r"[$,€£]", "", regex=True)
.str.strip()
.replace("", np.nan)
.astype(float)
.clip(lower=0, upper=1_000_000)
)
# Parse sale_date
if "sale_date" in result.columns:
result["sale_date"] = pd.to_datetime(result["sale_date"], errors="coerce")
# Drop rows with null customer_id
if "customer_id" in result.columns:
result = result.dropna(subset=["customer_id"])
return result.reset_index(drop=True)Now the tests:
# tests/unit/test_transformers.py
import pytest
import pandas as pd
import numpy as np
from pandas.testing import assert_frame_equal
from src.transformers import normalize_sales_data
@pytest.mark.unit
class TestNormalizeSalesData:
"""Tests for the normalize_sales_data transformation."""
def test_returns_empty_dataframe_unchanged(self):
empty = pd.DataFrame()
result = normalize_sales_data(empty)
assert result.empty
def test_strips_whitespace_from_string_columns(self):
df = pd.DataFrame({
"customer_id": ["C001", " C002 ", "C003"],
"region": [" NORTH", "SOUTH ", " EAST "],
})
result = normalize_sales_data(df)
assert result["customer_id"].tolist() == ["C001", "C002", "C003"]
assert result["region"].tolist() == ["NORTH", "SOUTH", "EAST"]
def test_normalizes_revenue_with_dollar_sign(self):
df = pd.DataFrame({
"customer_id": ["C001"],
"revenue": ["$1,234.56"],
})
result = normalize_sales_data(df)
assert result["revenue"].iloc[0] == pytest.approx(1234.56)
def test_normalizes_revenue_with_euro_sign(self):
df = pd.DataFrame({
"customer_id": ["C001"],
"revenue": ["€500"],
})
result = normalize_sales_data(df)
assert result["revenue"].iloc[0] == pytest.approx(500.0)
def test_clips_revenue_at_upper_bound(self):
df = pd.DataFrame({
"customer_id": ["C001", "C002"],
"revenue": ["2000000", "500000"],
})
result = normalize_sales_data(df)
assert result.loc[result["customer_id"] == "C001", "revenue"].iloc[0] == 1_000_000
assert result.loc[result["customer_id"] == "C002", "revenue"].iloc[0] == 500_000
def test_clips_negative_revenue_to_zero(self):
df = pd.DataFrame({
"customer_id": ["C001"],
"revenue": ["-500"],
})
result = normalize_sales_data(df)
assert result["revenue"].iloc[0] == 0.0
def test_drops_rows_with_null_customer_id(self):
df = pd.DataFrame({
"customer_id": ["C001", None, "C003"],
"revenue": ["100", "200", "300"],
})
result = normalize_sales_data(df)
assert len(result) == 2
assert "C001" in result["customer_id"].values
assert "C003" in result["customer_id"].values
def test_resets_index_after_dropping_rows(self):
df = pd.DataFrame({
"customer_id": [None, "C002", "C003"],
"revenue": ["100", "200", "300"],
})
result = normalize_sales_data(df)
assert list(result.index) == [0, 1]
def test_parses_sale_date_to_datetime(self):
df = pd.DataFrame({
"customer_id": ["C001"],
"sale_date": ["2026-01-15"],
})
result = normalize_sales_data(df)
assert pd.api.types.is_datetime64_any_dtype(result["sale_date"])
assert result["sale_date"].iloc[0] == pd.Timestamp("2026-01-15")
def test_coerces_invalid_dates_to_nat(self):
df = pd.DataFrame({
"customer_id": ["C001", "C002"],
"sale_date": ["2026-01-15", "not-a-date"],
})
result = normalize_sales_data(df)
assert pd.isna(result["sale_date"].iloc[1])
def test_does_not_modify_original_dataframe(self):
df = pd.DataFrame({
"customer_id": [" C001 "],
"revenue": ["$100"],
})
original_customer_id = df["customer_id"].iloc[0]
normalize_sales_data(df)
# Original should be unchanged
assert df["customer_id"].iloc[0] == original_customer_id
def test_handles_dataframe_without_optional_columns(self):
"""Transformation should not fail if revenue or sale_date are absent."""
df = pd.DataFrame({
"customer_id": ["C001", "C002"],
"name": ["Alice", "Bob"],
})
result = normalize_sales_data(df)
assert list(result.columns) == ["customer_id", "name"]
assert len(result) == 2Real Example: Testing a Data Validation Function
# src/validators.py
from typing import List
import pandas as pd
class ValidationResult:
def __init__(self):
self.errors: List[str] = []
@property
def is_valid(self) -> bool:
return len(self.errors) == 0
def add_error(self, message: str) -> None:
self.errors.append(message)
def __repr__(self) -> str:
if self.is_valid:
return "ValidationResult(valid)"
return f"ValidationResult(errors={self.errors})"
def validate_pipeline_output(df: pd.DataFrame, required_columns: List[str]) -> ValidationResult:
"""Validate a pipeline output DataFrame against a schema."""
result = ValidationResult()
# Check required columns exist
missing = [col for col in required_columns if col not in df.columns]
if missing:
result.add_error(f"Missing required columns: {missing}")
# Check for completely empty DataFrame
if df.empty:
result.add_error("DataFrame is empty")
return result
# Check for duplicate primary key (assume first required column is PK)
if required_columns and required_columns[0] in df.columns:
pk_col = required_columns[0]
duplicates = df[pk_col].duplicated().sum()
if duplicates > 0:
result.add_error(f"Found {duplicates} duplicate values in '{pk_col}'")
# Check for all-null columns
for col in required_columns:
if col in df.columns and df[col].isnull().all():
result.add_error(f"Column '{col}' is entirely null")
return result# tests/unit/test_validators.py
import pytest
import pandas as pd
from src.validators import validate_pipeline_output, ValidationResult
@pytest.mark.unit
def test_valid_dataframe_passes_validation():
df = pd.DataFrame({
"order_id": ["O1", "O2", "O3"],
"customer_id": ["C1", "C2", "C3"],
"amount": [100, 200, 300],
})
result = validate_pipeline_output(df, required_columns=["order_id", "customer_id", "amount"])
assert result.is_valid
assert result.errors == []
@pytest.mark.unit
def test_fails_on_missing_required_column():
df = pd.DataFrame({
"order_id": ["O1", "O2"],
"amount": [100, 200],
# customer_id is missing
})
result = validate_pipeline_output(df, required_columns=["order_id", "customer_id", "amount"])
assert not result.is_valid
assert any("customer_id" in e for e in result.errors)
@pytest.mark.unit
def test_fails_on_empty_dataframe():
df = pd.DataFrame(columns=["order_id", "customer_id"])
result = validate_pipeline_output(df, required_columns=["order_id", "customer_id"])
assert not result.is_valid
assert any("empty" in e.lower() for e in result.errors)
@pytest.mark.unit
def test_fails_on_duplicate_primary_key():
df = pd.DataFrame({
"order_id": ["O1", "O1", "O3"], # O1 duplicated
"amount": [100, 200, 300],
})
result = validate_pipeline_output(df, required_columns=["order_id", "amount"])
assert not result.is_valid
assert any("duplicate" in e.lower() for e in result.errors)
@pytest.mark.unit
def test_fails_on_all_null_required_column():
df = pd.DataFrame({
"order_id": ["O1", "O2"],
"customer_id": [None, None],
})
result = validate_pipeline_output(df, required_columns=["order_id", "customer_id"])
assert not result.is_valid
assert any("entirely null" in e for e in result.errors)
@pytest.mark.unit
def test_accumulates_multiple_errors():
df = pd.DataFrame({
"order_id": ["O1", "O1"], # duplicate
"customer_id": [None, None], # all null
# amount missing
})
result = validate_pipeline_output(df, required_columns=["order_id", "customer_id", "amount"])
assert not result.is_valid
assert len(result.errors) >= 2Real Example: Testing an ETL Class
# src/etl.py
import logging
from dataclasses import dataclass, field
from typing import Dict, Any
import pandas as pd
logger = logging.getLogger(__name__)
@dataclass
class ETLMetrics:
rows_extracted: int = 0
rows_transformed: int = 0
rows_loaded: int = 0
rows_dropped: int = 0
errors: list = field(default_factory=list)
class SalesETL:
"""
Extract → Transform → Load pipeline for sales data.
Extract: reads raw CSV
Transform: normalizes, validates, enriches
Load: writes to Parquet
"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.metrics = ETLMetrics()
def extract(self, path: str) -> pd.DataFrame:
logger.info(f"Extracting from {path}")
df = pd.read_csv(path)
self.metrics.rows_extracted = len(df)
return df
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
initial_count = len(df)
# Drop rows missing required fields
df = df.dropna(subset=["order_id", "customer_id"])
# Normalize revenue
df["revenue"] = pd.to_numeric(df["revenue"], errors="coerce").fillna(0.0)
# Add derived column
df["revenue_tier"] = pd.cut(
df["revenue"],
bins=[0, 100, 1000, float("inf")],
labels=["low", "medium", "high"],
include_lowest=True,
)
self.metrics.rows_transformed = len(df)
self.metrics.rows_dropped = initial_count - len(df)
return df
def load(self, df: pd.DataFrame, output_path: str) -> None:
logger.info(f"Loading {len(df)} rows to {output_path}")
df.to_parquet(output_path, index=False)
self.metrics.rows_loaded = len(df)
def run(self, input_path: str, output_path: str) -> ETLMetrics:
df = self.extract(input_path)
df = self.transform(df)
self.load(df, output_path)
return self.metrics# tests/unit/test_etl.py
import pytest
import pandas as pd
import tempfile
import os
from pathlib import Path
from src.etl import SalesETL, ETLMetrics
@pytest.fixture
def etl():
"""Create a SalesETL instance with default config."""
return SalesETL(config={"env": "test"})
@pytest.fixture
def sample_sales_df():
"""Sample sales DataFrame for transformation tests."""
return pd.DataFrame({
"order_id": ["O1", "O2", "O3", "O4"],
"customer_id": ["C1", "C2", None, "C4"],
"revenue": [50.0, 500.0, 200.0, None],
"region": ["NORTH", "SOUTH", "EAST", "WEST"],
})
@pytest.mark.unit
def test_transform_drops_null_customer_id(etl, sample_sales_df):
result = etl.transform(sample_sales_df)
assert len(result) == 3 # Row with customer_id=None dropped
assert result["customer_id"].isnull().sum() == 0
@pytest.mark.unit
def test_transform_fills_null_revenue_with_zero(etl, sample_sales_df):
result = etl.transform(sample_sales_df)
# Row O4 had None revenue but survived (has customer_id)
assert result["revenue"].isnull().sum() == 0
o4_revenue = result.loc[result["order_id"] == "O4", "revenue"].iloc[0]
assert o4_revenue == 0.0
@pytest.mark.unit
def test_transform_adds_revenue_tier_column(etl, sample_sales_df):
result = etl.transform(sample_sales_df)
assert "revenue_tier" in result.columns
assert set(result["revenue_tier"].dropna().unique()).issubset({"low", "medium", "high"})
@pytest.mark.unit
def test_transform_classifies_revenue_tiers_correctly(etl):
df = pd.DataFrame({
"order_id": ["O1", "O2", "O3"],
"customer_id": ["C1", "C2", "C3"],
"revenue": [50.0, 500.0, 5000.0],
})
result = etl.transform(df)
tiers = result.set_index("order_id")["revenue_tier"].astype(str)
assert tiers["O1"] == "low"
assert tiers["O2"] == "medium"
assert tiers["O3"] == "high"
@pytest.mark.unit
def test_transform_updates_metrics(etl, sample_sales_df):
etl.transform(sample_sales_df)
assert etl.metrics.rows_transformed == 3
assert etl.metrics.rows_dropped == 1
@pytest.mark.unit
def test_load_writes_parquet_file(etl):
df = pd.DataFrame({
"order_id": ["O1", "O2"],
"customer_id": ["C1", "C2"],
"revenue": [100.0, 200.0],
"revenue_tier": ["low", "medium"],
})
with tempfile.TemporaryDirectory() as tmpdir:
output_path = os.path.join(tmpdir, "output.parquet")
etl.load(df, output_path)
assert os.path.exists(output_path)
loaded = pd.read_parquet(output_path)
pd.testing.assert_frame_equal(loaded, df)
@pytest.mark.unit
def test_load_updates_rows_loaded_metric(etl):
df = pd.DataFrame({"order_id": ["O1", "O2"], "revenue": [1.0, 2.0]})
with tempfile.TemporaryDirectory() as tmpdir:
etl.load(df, os.path.join(tmpdir, "out.parquet"))
assert etl.metrics.rows_loaded == 2
@pytest.mark.integration
def test_full_run_produces_valid_output(etl, tmp_path):
"""Full extract-transform-load cycle using real files."""
# Write sample CSV input
input_csv = tmp_path / "sales.csv"
pd.DataFrame({
"order_id": ["O1", "O2", "O3"],
"customer_id": ["C1", None, "C3"],
"revenue": [100, 200, 300],
"region": ["NORTH", "SOUTH", "EAST"],
}).to_csv(input_csv, index=False)
output_parquet = tmp_path / "output.parquet"
metrics = etl.run(str(input_csv), str(output_parquet))
assert metrics.rows_extracted == 3
assert metrics.rows_loaded == 2 # One dropped due to null customer_id
output = pd.read_parquet(output_parquet)
assert len(output) == 2
assert "revenue_tier" in output.columnspytest.ini vs pyproject.toml: Decision Guide
| Scenario | Use |
|----------|-----|
| New project, Python 3.8+ | pyproject.toml |
| Monorepo with multiple packages | pyproject.toml per package |
| Legacy project, cannot touch pyproject.toml | pytest.ini |
| Want to share config with setuptools/mypy/black | pyproject.toml |
| CI environment with config injection | Either (env var PYTEST_ADDOPTS overrides both) |
Environment Variable Override
# Override addopts for a specific CI job without changing config files
PYTEST_ADDOPTS="--tb=long -v" pytest -m unit
# Set markers for a specific environment
PYTEST_CURRENT_ENV=ci pytest --co # --co: collect only, don't runQuick Reference: Most Used Commands
# Development loop: fast, verbose, stop on first failure
pytest -x -vs -m "not slow and not integration"
# Pre-commit: full unit suite with coverage
pytest -m unit --cov=src --cov-report=term-missing
# CI: all tests, JUnit output
pytest --junit-xml=results.xml --cov=src --cov-report=xml
# Debug a specific failing test
pytest tests/unit/test_etl.py::test_transform_drops_null_customer_id -vs --tb=long
# List all tests without running them
pytest --collect-only
# List all tests matching a marker
pytest --collect-only -m integration
# Run last failed tests
pytest --lf
# Run last failed then remaining
pytest --lf --ffSummary
- Use
pyproject.tomlfor all pytest configuration in new projects - Register all custom markers with
--strict-markersto catch typos early - Rely on pytest's assert rewriting — plain
assertstatements are expressive enough - Use
pd.testing.assert_frame_equalfor DataFrame equality checks - Separate unit tests (fast, no I/O) from integration tests (slow, real services) with markers
- Use
tmp_path(a built-in pytest fixture) for any test that needs temporary files - Run with
--lfduring development to iterate only on failing tests
The next lesson covers fixtures and parametrization — the tools that eliminate test duplication and make your test suite scale gracefully.
Enjoyed this article?
Explore the Data Engineering learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.