Back to blog
Data Engineeringadvanced

Statistical Data Quality Checks for Pipelines

Build production-grade statistical quality checks into your data pipelines — outlier detection, distribution drift, null rate monitoring, and a complete Python DataQualityChecker class.

LearnixoMay 7, 202610 min read
Data QualityPythonSciPyGreat ExpectationsStatisticsPipeline Monitoring
Share:š•

Why Statistical Checks Beat Schema Checks

Most pipeline validation stops at schema validation: column names, data types, not-null constraints. That catches about 30% of real data quality issues.

The remaining 70% are statistical: your amount column is technically a float, but it's suddenly producing values 10x larger than usual. Your user_id column has no nulls, but the cardinality dropped from 500k to 12k distinct values. The pipeline ran — and produced garbage.

Statistical checks catch what schema checks miss.


The Complete DataQualityChecker Class

We'll build this incrementally. By the end you'll have a reusable class you can drop into any Python pipeline.

Python
import numpy as np
import pandas as pd
from scipy import stats
from dataclasses import dataclass, field
from typing import Optional
import warnings
warnings.filterwarnings('ignore')


@dataclass
class QualityCheck:
    check_name: str
    column: str
    passed: bool
    observed_value: float
    threshold: float
    message: str


@dataclass
class DataQualityReport:
    run_id: str
    table_name: str
    row_count: int
    checks: list[QualityCheck] = field(default_factory=list)

    @property
    def passed(self) -> bool:
        return all(c.passed for c in self.checks)

    @property
    def failed_checks(self) -> list[QualityCheck]:
        return [c for c in self.checks if not c.passed]

    def summary(self) -> str:
        total  = len(self.checks)
        passed = sum(1 for c in self.checks if c.passed)
        status = "PASSED" if self.passed else "FAILED"
        lines  = [f"[{status}] {self.table_name} | Run: {self.run_id}",
                  f"  Checks: {passed}/{total} passed | Rows: {self.row_count:,}"]
        for fc in self.failed_checks:
            lines.append(f"  FAIL: {fc.check_name} on '{fc.column}' — {fc.message}")
        return "\n".join(lines)

Z-Score Outlier Detection

Z-score measures how many standard deviations a value is from the mean. Values beyond ±3 sigma occur in only 0.27% of a normal distribution — a useful signal.

Python
from scipy.stats import zscore


class DataQualityChecker:
    def __init__(self, df: pd.DataFrame, table_name: str, run_id: str):
        self.df          = df
        self.table_name  = table_name
        self.run_id      = run_id
        self._report     = DataQualityReport(run_id, table_name, len(df))

    # ── Z-Score Outlier Detection ────────────────────────────────────────────
    def check_zscore_outliers(
        self,
        column: str,
        threshold: float = 3.0,
        max_outlier_pct: float = 0.01,
    ) -> "DataQualityChecker":
        """Fail if more than max_outlier_pct of rows have |z-score| > threshold."""
        series     = self.df[column].dropna()
        z_scores   = np.abs(zscore(series))
        outlier_pct = (z_scores > threshold).sum() / len(series)

        check = QualityCheck(
            check_name      = "zscore_outliers",
            column          = column,
            passed          = outlier_pct <= max_outlier_pct,
            observed_value  = round(outlier_pct * 100, 3),
            threshold       = max_outlier_pct * 100,
            message         = (
                f"{outlier_pct*100:.2f}% of values exceed z={threshold} "
                f"(allowed: {max_outlier_pct*100:.1f}%)"
            ),
        )
        self._report.checks.append(check)
        return self

    # ── IQR-Based Outlier Detection ──────────────────────────────────────────
    def check_iqr_outliers(
        self,
        column: str,
        multiplier: float = 1.5,
        max_outlier_pct: float = 0.02,
    ) -> "DataQualityChecker":
        """
        Tukey fences: outlier if value < Q1 - k*IQR or > Q3 + k*IQR.
        More robust than z-score for skewed distributions (e.g., cost data).
        """
        series = self.df[column].dropna()
        q1, q3 = np.percentile(series, [25, 75])
        iqr    = q3 - q1
        lower  = q1 - multiplier * iqr
        upper  = q3 + multiplier * iqr

        outlier_pct = ((series < lower) | (series > upper)).sum() / len(series)

        check = QualityCheck(
            check_name      = "iqr_outliers",
            column          = column,
            passed          = outlier_pct <= max_outlier_pct,
            observed_value  = round(outlier_pct * 100, 3),
            threshold       = max_outlier_pct * 100,
            message         = (
                f"{outlier_pct*100:.2f}% outside [{lower:.2f}, {upper:.2f}] "
                f"(IQR*{multiplier})"
            ),
        )
        self._report.checks.append(check)
        return self

When to use which:

  • Z-score: data is roughly normal (latencies, temperatures, balanced row counts)
  • IQR: data is skewed (costs, file sizes, revenue — anything with a long right tail)

Distribution Drift Detection with Kolmogorov-Smirnov

The KS test answers: "Are these two samples drawn from the same distribution?" It's the gold standard for detecting when your pipeline's data has fundamentally changed character.

Python
from scipy.stats import ks_2samp


    # ── KS Distribution Drift ────────────────────────────────────────────────
    def check_ks_drift(
        self,
        column: str,
        reference_series: pd.Series,
        significance: float = 0.05,
    ) -> "DataQualityChecker":
        """
        Kolmogorov-Smirnov test.
        H0: current and reference data come from the same distribution.
        Small p-value → distribution has drifted.
        """
        current   = self.df[column].dropna().values
        reference = reference_series.dropna().values

        ks_stat, p_value = ks_2samp(reference, current)
        drifted = p_value < significance

        check = QualityCheck(
            check_name      = "ks_distribution_drift",
            column          = column,
            passed          = not drifted,
            observed_value  = round(p_value, 6),
            threshold       = significance,
            message         = (
                f"KS statistic={ks_stat:.4f}, p={p_value:.6f} "
                f"({'DRIFT DETECTED' if drifted else 'stable'})"
            ),
        )
        self._report.checks.append(check)
        return self

Real example: You run this check comparing this week's order_amount distribution against last week's baseline. A p-value < 0.05 means the distributions are statistically different — a pricing change, a data issue, or a new customer segment all could explain it.


Chi-Squared Test for Categorical Column Drift

When your column is categorical (status codes, country codes, event types), KS doesn't apply. Use chi-squared instead.

Python
from scipy.stats import chi2_contingency


    # ── Chi-Squared Categorical Drift ────────────────────────────────────────
    def check_chi2_categorical_drift(
        self,
        column: str,
        reference_series: pd.Series,
        significance: float = 0.05,
    ) -> "DataQualityChecker":
        """
        Chi-squared test on value counts.
        Detects shifts in category proportions between runs.
        """
        current_counts   = self.df[column].value_counts()
        reference_counts = reference_series.value_counts()

        # Align on same categories, fill missing with 0
        all_cats      = set(current_counts.index) | set(reference_counts.index)
        current_vec   = np.array([current_counts.get(c, 0) for c in all_cats])
        reference_vec = np.array([reference_counts.get(c, 0) for c in all_cats])

        # Build contingency table
        contingency = np.array([current_vec, reference_vec])

        chi2, p_value, dof, _ = chi2_contingency(contingency)
        drifted = p_value < significance

        check = QualityCheck(
            check_name      = "chi2_categorical_drift",
            column          = column,
            passed          = not drifted,
            observed_value  = round(p_value, 6),
            threshold       = significance,
            message         = (
                f"chi2={chi2:.2f}, dof={dof}, p={p_value:.6f} "
                f"({'CATEGORY SHIFT' if drifted else 'stable'})"
            ),
        )
        self._report.checks.append(check)
        return self

Example use case: Your payment_method column suddenly has crypto appearing at 8% when it was historically under 0.1%. Chi-squared detects this before it affects downstream revenue calculations.


Null Rate Drift Monitoring

Null rate drift is one of the most common and most damaging quality issues. An upstream system starts sending nulls silently.

Python
    # ── Null Rate Check ──────────────────────────────────────────────────────
    def check_null_rate(
        self,
        column: str,
        max_null_pct: float = 0.05,
    ) -> "DataQualityChecker":
        """Hard threshold: null rate must stay below max_null_pct."""
        null_pct = self.df[column].isna().mean()

        check = QualityCheck(
            check_name      = "null_rate",
            column          = column,
            passed          = null_pct <= max_null_pct,
            observed_value  = round(null_pct * 100, 3),
            threshold       = max_null_pct * 100,
            message         = f"Null rate: {null_pct*100:.2f}% (max: {max_null_pct*100:.1f}%)",
        )
        self._report.checks.append(check)
        return self

    # ── Null Rate Drift vs. Baseline ─────────────────────────────────────────
    def check_null_rate_drift(
        self,
        column: str,
        baseline_null_pct: float,
        tolerance_pct: float = 0.03,
    ) -> "DataQualityChecker":
        """
        Drift check: current null rate must not exceed baseline by more than tolerance.
        Catches gradual degradation that absolute thresholds miss.
        """
        current_null_pct = self.df[column].isna().mean()
        drift = current_null_pct - baseline_null_pct

        check = QualityCheck(
            check_name      = "null_rate_drift",
            column          = column,
            passed          = drift <= tolerance_pct,
            observed_value  = round(drift * 100, 3),
            threshold       = tolerance_pct * 100,
            message         = (
                f"Null drift: +{drift*100:.2f}pp from baseline {baseline_null_pct*100:.1f}% "
                f"(tolerance: {tolerance_pct*100:.1f}pp)"
            ),
        )
        self._report.checks.append(check)
        return self

Row Count Anomaly Detection with Rolling Statistics

Python
    # ── Row Count Anomaly ────────────────────────────────────────────────────
    @staticmethod
    def check_row_count_anomaly(
        current_count: int,
        historical_counts: list[int],
        z_threshold: float = 2.0,
    ) -> QualityCheck:
        """
        Rolling mean ± z_threshold * std.
        Uses last N historical run counts to establish the expected range.
        """
        arr  = np.array(historical_counts)
        mean = np.mean(arr)
        std  = np.std(arr)
        z    = abs(current_count - mean) / std if std > 0 else 0.0

        lower = mean - z_threshold * std
        upper = mean + z_threshold * std
        passed = lower <= current_count <= upper

        return QualityCheck(
            check_name      = "row_count_anomaly",
            column          = "__row_count__",
            passed          = passed,
            observed_value  = current_count,
            threshold       = z_threshold,
            message         = (
                f"Count {current_count:,} | expected [{lower:,.0f}, {upper:,.0f}] "
                f"(mean={mean:,.0f}, z={z:.2f})"
            ),
        )

Pattern: Store the last 30 run counts in a metadata table. Before each run completes, check whether today's count is within 2 sigma of the rolling mean. This catches both "no data loaded" (drop to 0) and "data duplicated" (spike to 2x normal).


Spark Partition Skew Detection

In Spark, data skew causes a few tasks to take 10x longer than others, destroying parallelism.

Python
def detect_spark_partition_skew(
    spark_df,
    partition_col: str,
    skew_threshold: float = 3.0,
) -> dict:
    """
    Detect data skew in Spark by analysing partition key cardinality.
    Returns skewed keys and their row counts.
    Run in a Spark session before heavy transforms.
    """
    from pyspark.sql import functions as F

    counts_df = (
        spark_df
        .groupBy(partition_col)
        .count()
        .toPandas()
        .rename(columns={"count": "row_count"})
    )

    mean_count = counts_df["row_count"].mean()
    std_count  = counts_df["row_count"].std()
    threshold  = mean_count + skew_threshold * std_count

    skewed = counts_df[counts_df["row_count"] > threshold].copy()
    skewed["z_score"] = (skewed["row_count"] - mean_count) / std_count
    skewed = skewed.sort_values("z_score", ascending=False)

    return {
        "partition_col":   partition_col,
        "total_partitions": len(counts_df),
        "mean_rows":       int(mean_count),
        "threshold":       int(threshold),
        "skewed_partitions": skewed.to_dict("records"),
        "has_skew":        len(skewed) > 0,
    }


# Usage example
# result = detect_spark_partition_skew(orders_df, partition_col="customer_id")
# if result["has_skew"]:
#     for sk in result["skewed_partitions"]:
#         print(f"  Skewed key: {sk['customer_id']} — {sk['row_count']:,} rows (z={sk['z_score']:.1f})")

Great Expectations Integration

Great Expectations (GX) provides a framework for statistical expectations with automatic documentation.

Python
import great_expectations as gx


def build_statistical_expectation_suite(
    context: gx.DataContext,
    suite_name: str,
    mean_lower: float,
    mean_upper: float,
    stdev_upper: float,
    median_lower: float,
    median_upper: float,
    max_null_pct: float,
) -> gx.core.ExpectationSuite:
    """
    Build a GX suite with statistical expectations for a numeric column.
    Suitable for: order amounts, latencies, row counts.
    """
    suite = context.add_or_update_expectation_suite(suite_name)

    # Mean within historical range
    suite.add_expectation(gx.expectations.ExpectColumnMeanToBeBetween(
        column="amount",
        min_value=mean_lower,
        max_value=mean_upper,
    ))

    # Standard deviation bounded
    suite.add_expectation(gx.expectations.ExpectColumnStdevToBeBetween(
        column="amount",
        min_value=0,
        max_value=stdev_upper,
    ))

    # Median bounded (GX uses quantile)
    suite.add_expectation(gx.expectations.ExpectColumnQuantileValuesToBeBetween(
        column="amount",
        quantile_ranges={
            "quantiles":   [0.5],
            "value_ranges": [[median_lower, median_upper]],
        },
    ))

    # Null rate
    suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(
        column="amount",
        mostly=1.0 - max_null_pct,
    ))

    # Row count in expected range (set from historical data)
    suite.add_expectation(gx.expectations.ExpectTableRowCountToBeBetween(
        min_value=50_000,
        max_value=200_000,
    ))

    return suite

GX in production pipelines: Run GX validation as a pipeline step. If validation fails, write results to an audit.quality_failures table and send a Slack/PagerDuty alert rather than propagating bad data downstream.


Putting It All Together: Pipeline Quality Gate

Python
import pandas as pd
import numpy as np
from datetime import date

# ── Load current and reference data ─────────────────────────────────────────
np.random.seed(42)

# Simulate: reference data from yesterday
reference_df = pd.DataFrame({
    "order_id":      range(1, 100_001),
    "amount":        np.random.lognormal(mean=4.5, sigma=0.8, size=100_000),
    "status":        np.random.choice(["paid", "pending", "refunded"], size=100_000,
                                       p=[0.75, 0.20, 0.05]),
    "customer_id":   np.random.randint(1, 50_001, size=100_000),
})

# Simulate: today's data with subtle drift (amount distribution shifted up)
current_df = pd.DataFrame({
    "order_id":      range(100_001, 197_001),
    "amount":        np.random.lognormal(mean=5.0, sigma=0.9, size=96_000),  # shifted!
    "status":        np.random.choice(["paid", "pending", "refunded"], size=96_000,
                                       p=[0.68, 0.25, 0.07]),                # shifted!
    "customer_id":   np.random.randint(1, 50_001, size=96_000),
})

# ── Historical row counts (last 30 days) ─────────────────────────────────────
historical_counts = list(np.random.normal(loc=100_000, scale=3_000, size=30).astype(int))

# ── Run the quality checks ────────────────────────────────────────────────────
checker = DataQualityChecker(current_df, table_name="orders", run_id=f"run_{date.today()}")

report = (
    checker
    .check_null_rate("amount", max_null_pct=0.01)
    .check_null_rate_drift("amount", baseline_null_pct=0.002, tolerance_pct=0.02)
    .check_zscore_outliers("amount", threshold=3.5, max_outlier_pct=0.005)
    .check_iqr_outliers("amount", multiplier=2.0, max_outlier_pct=0.03)
    .check_ks_drift("amount", reference_df["amount"])
    .check_chi2_categorical_drift("status", reference_df["status"])
    ._report
)

# Add row count anomaly check
report.checks.append(
    DataQualityChecker.check_row_count_anomaly(
        current_count=len(current_df),
        historical_counts=historical_counts,
        z_threshold=2.0,
    )
)

print(report.summary())

Expected output:

[FAILED] orders | Run: run_2026-05-07
  Checks: 5/7 passed | Rows: 96,000
  FAIL: ks_distribution_drift on 'amount' — KS statistic=0.0821, p=0.000012 (DRIFT DETECTED)
  FAIL: chi2_categorical_drift on 'status' — chi2=187.42, dof=2, p=0.000000 (CATEGORY SHIFT)

Exactly what you'd want to see — the statistical tests caught the drift that schema validation would have missed entirely.


Production Recommendations

| Check | Cadence | Action on Failure | |---|---|---| | Row count anomaly | Every run | Alert + block downstream | | Null rate | Every run | Alert; block if > 5% delta | | IQR outliers | Every run | Log; alert if > 1% | | KS drift | Daily | Alert; escalate if 3+ consecutive days | | Chi-squared categorical drift | Daily | Alert | | Spark partition skew | Before heavy joins | Auto-repartition or alert |

Store all check results in a pipeline_quality_log table. Track trends over time — a slowly rising outlier rate is a warning sign even if each individual run passes.

Enjoyed this article?

Explore the Data Engineering learning path for more.

Found this helpful?

Share:š•

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.