Statistical Data Quality Checks for Pipelines
Build production-grade statistical quality checks into your data pipelines ā outlier detection, distribution drift, null rate monitoring, and a complete Python DataQualityChecker class.
Why Statistical Checks Beat Schema Checks
Most pipeline validation stops at schema validation: column names, data types, not-null constraints. That catches about 30% of real data quality issues.
The remaining 70% are statistical: your amount column is technically a float, but it's suddenly producing values 10x larger than usual. Your user_id column has no nulls, but the cardinality dropped from 500k to 12k distinct values. The pipeline ran ā and produced garbage.
Statistical checks catch what schema checks miss.
The Complete DataQualityChecker Class
We'll build this incrementally. By the end you'll have a reusable class you can drop into any Python pipeline.
import numpy as np
import pandas as pd
from scipy import stats
from dataclasses import dataclass, field
from typing import Optional
import warnings
warnings.filterwarnings('ignore')
@dataclass
class QualityCheck:
check_name: str
column: str
passed: bool
observed_value: float
threshold: float
message: str
@dataclass
class DataQualityReport:
run_id: str
table_name: str
row_count: int
checks: list[QualityCheck] = field(default_factory=list)
@property
def passed(self) -> bool:
return all(c.passed for c in self.checks)
@property
def failed_checks(self) -> list[QualityCheck]:
return [c for c in self.checks if not c.passed]
def summary(self) -> str:
total = len(self.checks)
passed = sum(1 for c in self.checks if c.passed)
status = "PASSED" if self.passed else "FAILED"
lines = [f"[{status}] {self.table_name} | Run: {self.run_id}",
f" Checks: {passed}/{total} passed | Rows: {self.row_count:,}"]
for fc in self.failed_checks:
lines.append(f" FAIL: {fc.check_name} on '{fc.column}' ā {fc.message}")
return "\n".join(lines)Z-Score Outlier Detection
Z-score measures how many standard deviations a value is from the mean. Values beyond ±3 sigma occur in only 0.27% of a normal distribution ā a useful signal.
from scipy.stats import zscore
class DataQualityChecker:
def __init__(self, df: pd.DataFrame, table_name: str, run_id: str):
self.df = df
self.table_name = table_name
self.run_id = run_id
self._report = DataQualityReport(run_id, table_name, len(df))
# āā Z-Score Outlier Detection āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
def check_zscore_outliers(
self,
column: str,
threshold: float = 3.0,
max_outlier_pct: float = 0.01,
) -> "DataQualityChecker":
"""Fail if more than max_outlier_pct of rows have |z-score| > threshold."""
series = self.df[column].dropna()
z_scores = np.abs(zscore(series))
outlier_pct = (z_scores > threshold).sum() / len(series)
check = QualityCheck(
check_name = "zscore_outliers",
column = column,
passed = outlier_pct <= max_outlier_pct,
observed_value = round(outlier_pct * 100, 3),
threshold = max_outlier_pct * 100,
message = (
f"{outlier_pct*100:.2f}% of values exceed z={threshold} "
f"(allowed: {max_outlier_pct*100:.1f}%)"
),
)
self._report.checks.append(check)
return self
# āā IQR-Based Outlier Detection āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
def check_iqr_outliers(
self,
column: str,
multiplier: float = 1.5,
max_outlier_pct: float = 0.02,
) -> "DataQualityChecker":
"""
Tukey fences: outlier if value < Q1 - k*IQR or > Q3 + k*IQR.
More robust than z-score for skewed distributions (e.g., cost data).
"""
series = self.df[column].dropna()
q1, q3 = np.percentile(series, [25, 75])
iqr = q3 - q1
lower = q1 - multiplier * iqr
upper = q3 + multiplier * iqr
outlier_pct = ((series < lower) | (series > upper)).sum() / len(series)
check = QualityCheck(
check_name = "iqr_outliers",
column = column,
passed = outlier_pct <= max_outlier_pct,
observed_value = round(outlier_pct * 100, 3),
threshold = max_outlier_pct * 100,
message = (
f"{outlier_pct*100:.2f}% outside [{lower:.2f}, {upper:.2f}] "
f"(IQR*{multiplier})"
),
)
self._report.checks.append(check)
return selfWhen to use which:
- Z-score: data is roughly normal (latencies, temperatures, balanced row counts)
- IQR: data is skewed (costs, file sizes, revenue ā anything with a long right tail)
Distribution Drift Detection with Kolmogorov-Smirnov
The KS test answers: "Are these two samples drawn from the same distribution?" It's the gold standard for detecting when your pipeline's data has fundamentally changed character.
from scipy.stats import ks_2samp
# āā KS Distribution Drift āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
def check_ks_drift(
self,
column: str,
reference_series: pd.Series,
significance: float = 0.05,
) -> "DataQualityChecker":
"""
Kolmogorov-Smirnov test.
H0: current and reference data come from the same distribution.
Small p-value ā distribution has drifted.
"""
current = self.df[column].dropna().values
reference = reference_series.dropna().values
ks_stat, p_value = ks_2samp(reference, current)
drifted = p_value < significance
check = QualityCheck(
check_name = "ks_distribution_drift",
column = column,
passed = not drifted,
observed_value = round(p_value, 6),
threshold = significance,
message = (
f"KS statistic={ks_stat:.4f}, p={p_value:.6f} "
f"({'DRIFT DETECTED' if drifted else 'stable'})"
),
)
self._report.checks.append(check)
return selfReal example: You run this check comparing this week's order_amount distribution against last week's baseline. A p-value < 0.05 means the distributions are statistically different ā a pricing change, a data issue, or a new customer segment all could explain it.
Chi-Squared Test for Categorical Column Drift
When your column is categorical (status codes, country codes, event types), KS doesn't apply. Use chi-squared instead.
from scipy.stats import chi2_contingency
# āā Chi-Squared Categorical Drift āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
def check_chi2_categorical_drift(
self,
column: str,
reference_series: pd.Series,
significance: float = 0.05,
) -> "DataQualityChecker":
"""
Chi-squared test on value counts.
Detects shifts in category proportions between runs.
"""
current_counts = self.df[column].value_counts()
reference_counts = reference_series.value_counts()
# Align on same categories, fill missing with 0
all_cats = set(current_counts.index) | set(reference_counts.index)
current_vec = np.array([current_counts.get(c, 0) for c in all_cats])
reference_vec = np.array([reference_counts.get(c, 0) for c in all_cats])
# Build contingency table
contingency = np.array([current_vec, reference_vec])
chi2, p_value, dof, _ = chi2_contingency(contingency)
drifted = p_value < significance
check = QualityCheck(
check_name = "chi2_categorical_drift",
column = column,
passed = not drifted,
observed_value = round(p_value, 6),
threshold = significance,
message = (
f"chi2={chi2:.2f}, dof={dof}, p={p_value:.6f} "
f"({'CATEGORY SHIFT' if drifted else 'stable'})"
),
)
self._report.checks.append(check)
return selfExample use case: Your payment_method column suddenly has crypto appearing at 8% when it was historically under 0.1%. Chi-squared detects this before it affects downstream revenue calculations.
Null Rate Drift Monitoring
Null rate drift is one of the most common and most damaging quality issues. An upstream system starts sending nulls silently.
# āā Null Rate Check āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
def check_null_rate(
self,
column: str,
max_null_pct: float = 0.05,
) -> "DataQualityChecker":
"""Hard threshold: null rate must stay below max_null_pct."""
null_pct = self.df[column].isna().mean()
check = QualityCheck(
check_name = "null_rate",
column = column,
passed = null_pct <= max_null_pct,
observed_value = round(null_pct * 100, 3),
threshold = max_null_pct * 100,
message = f"Null rate: {null_pct*100:.2f}% (max: {max_null_pct*100:.1f}%)",
)
self._report.checks.append(check)
return self
# āā Null Rate Drift vs. Baseline āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
def check_null_rate_drift(
self,
column: str,
baseline_null_pct: float,
tolerance_pct: float = 0.03,
) -> "DataQualityChecker":
"""
Drift check: current null rate must not exceed baseline by more than tolerance.
Catches gradual degradation that absolute thresholds miss.
"""
current_null_pct = self.df[column].isna().mean()
drift = current_null_pct - baseline_null_pct
check = QualityCheck(
check_name = "null_rate_drift",
column = column,
passed = drift <= tolerance_pct,
observed_value = round(drift * 100, 3),
threshold = tolerance_pct * 100,
message = (
f"Null drift: +{drift*100:.2f}pp from baseline {baseline_null_pct*100:.1f}% "
f"(tolerance: {tolerance_pct*100:.1f}pp)"
),
)
self._report.checks.append(check)
return selfRow Count Anomaly Detection with Rolling Statistics
# āā Row Count Anomaly āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
@staticmethod
def check_row_count_anomaly(
current_count: int,
historical_counts: list[int],
z_threshold: float = 2.0,
) -> QualityCheck:
"""
Rolling mean ± z_threshold * std.
Uses last N historical run counts to establish the expected range.
"""
arr = np.array(historical_counts)
mean = np.mean(arr)
std = np.std(arr)
z = abs(current_count - mean) / std if std > 0 else 0.0
lower = mean - z_threshold * std
upper = mean + z_threshold * std
passed = lower <= current_count <= upper
return QualityCheck(
check_name = "row_count_anomaly",
column = "__row_count__",
passed = passed,
observed_value = current_count,
threshold = z_threshold,
message = (
f"Count {current_count:,} | expected [{lower:,.0f}, {upper:,.0f}] "
f"(mean={mean:,.0f}, z={z:.2f})"
),
)Pattern: Store the last 30 run counts in a metadata table. Before each run completes, check whether today's count is within 2 sigma of the rolling mean. This catches both "no data loaded" (drop to 0) and "data duplicated" (spike to 2x normal).
Spark Partition Skew Detection
In Spark, data skew causes a few tasks to take 10x longer than others, destroying parallelism.
def detect_spark_partition_skew(
spark_df,
partition_col: str,
skew_threshold: float = 3.0,
) -> dict:
"""
Detect data skew in Spark by analysing partition key cardinality.
Returns skewed keys and their row counts.
Run in a Spark session before heavy transforms.
"""
from pyspark.sql import functions as F
counts_df = (
spark_df
.groupBy(partition_col)
.count()
.toPandas()
.rename(columns={"count": "row_count"})
)
mean_count = counts_df["row_count"].mean()
std_count = counts_df["row_count"].std()
threshold = mean_count + skew_threshold * std_count
skewed = counts_df[counts_df["row_count"] > threshold].copy()
skewed["z_score"] = (skewed["row_count"] - mean_count) / std_count
skewed = skewed.sort_values("z_score", ascending=False)
return {
"partition_col": partition_col,
"total_partitions": len(counts_df),
"mean_rows": int(mean_count),
"threshold": int(threshold),
"skewed_partitions": skewed.to_dict("records"),
"has_skew": len(skewed) > 0,
}
# Usage example
# result = detect_spark_partition_skew(orders_df, partition_col="customer_id")
# if result["has_skew"]:
# for sk in result["skewed_partitions"]:
# print(f" Skewed key: {sk['customer_id']} ā {sk['row_count']:,} rows (z={sk['z_score']:.1f})")Great Expectations Integration
Great Expectations (GX) provides a framework for statistical expectations with automatic documentation.
import great_expectations as gx
def build_statistical_expectation_suite(
context: gx.DataContext,
suite_name: str,
mean_lower: float,
mean_upper: float,
stdev_upper: float,
median_lower: float,
median_upper: float,
max_null_pct: float,
) -> gx.core.ExpectationSuite:
"""
Build a GX suite with statistical expectations for a numeric column.
Suitable for: order amounts, latencies, row counts.
"""
suite = context.add_or_update_expectation_suite(suite_name)
# Mean within historical range
suite.add_expectation(gx.expectations.ExpectColumnMeanToBeBetween(
column="amount",
min_value=mean_lower,
max_value=mean_upper,
))
# Standard deviation bounded
suite.add_expectation(gx.expectations.ExpectColumnStdevToBeBetween(
column="amount",
min_value=0,
max_value=stdev_upper,
))
# Median bounded (GX uses quantile)
suite.add_expectation(gx.expectations.ExpectColumnQuantileValuesToBeBetween(
column="amount",
quantile_ranges={
"quantiles": [0.5],
"value_ranges": [[median_lower, median_upper]],
},
))
# Null rate
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(
column="amount",
mostly=1.0 - max_null_pct,
))
# Row count in expected range (set from historical data)
suite.add_expectation(gx.expectations.ExpectTableRowCountToBeBetween(
min_value=50_000,
max_value=200_000,
))
return suiteGX in production pipelines: Run GX validation as a pipeline step. If validation fails, write results to an audit.quality_failures table and send a Slack/PagerDuty alert rather than propagating bad data downstream.
Putting It All Together: Pipeline Quality Gate
import pandas as pd
import numpy as np
from datetime import date
# āā Load current and reference data āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
np.random.seed(42)
# Simulate: reference data from yesterday
reference_df = pd.DataFrame({
"order_id": range(1, 100_001),
"amount": np.random.lognormal(mean=4.5, sigma=0.8, size=100_000),
"status": np.random.choice(["paid", "pending", "refunded"], size=100_000,
p=[0.75, 0.20, 0.05]),
"customer_id": np.random.randint(1, 50_001, size=100_000),
})
# Simulate: today's data with subtle drift (amount distribution shifted up)
current_df = pd.DataFrame({
"order_id": range(100_001, 197_001),
"amount": np.random.lognormal(mean=5.0, sigma=0.9, size=96_000), # shifted!
"status": np.random.choice(["paid", "pending", "refunded"], size=96_000,
p=[0.68, 0.25, 0.07]), # shifted!
"customer_id": np.random.randint(1, 50_001, size=96_000),
})
# āā Historical row counts (last 30 days) āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
historical_counts = list(np.random.normal(loc=100_000, scale=3_000, size=30).astype(int))
# āā Run the quality checks āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
checker = DataQualityChecker(current_df, table_name="orders", run_id=f"run_{date.today()}")
report = (
checker
.check_null_rate("amount", max_null_pct=0.01)
.check_null_rate_drift("amount", baseline_null_pct=0.002, tolerance_pct=0.02)
.check_zscore_outliers("amount", threshold=3.5, max_outlier_pct=0.005)
.check_iqr_outliers("amount", multiplier=2.0, max_outlier_pct=0.03)
.check_ks_drift("amount", reference_df["amount"])
.check_chi2_categorical_drift("status", reference_df["status"])
._report
)
# Add row count anomaly check
report.checks.append(
DataQualityChecker.check_row_count_anomaly(
current_count=len(current_df),
historical_counts=historical_counts,
z_threshold=2.0,
)
)
print(report.summary())Expected output:
[FAILED] orders | Run: run_2026-05-07
Checks: 5/7 passed | Rows: 96,000
FAIL: ks_distribution_drift on 'amount' ā KS statistic=0.0821, p=0.000012 (DRIFT DETECTED)
FAIL: chi2_categorical_drift on 'status' ā chi2=187.42, dof=2, p=0.000000 (CATEGORY SHIFT)Exactly what you'd want to see ā the statistical tests caught the drift that schema validation would have missed entirely.
Production Recommendations
| Check | Cadence | Action on Failure | |---|---|---| | Row count anomaly | Every run | Alert + block downstream | | Null rate | Every run | Alert; block if > 5% delta | | IQR outliers | Every run | Log; alert if > 1% | | KS drift | Daily | Alert; escalate if 3+ consecutive days | | Chi-squared categorical drift | Daily | Alert | | Spark partition skew | Before heavy joins | Auto-repartition or alert |
Store all check results in a pipeline_quality_log table. Track trends over time ā a slowly rising outlier rate is a warning sign even if each individual run passes.
Enjoyed this article?
Explore the Data Engineering learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.