Learnixo

Machine Learning Foundations · Lesson 67 of 70

Why Good Training Accuracy ≠ Good Production

Why Production Debugging Is Different

A model that works in development can silently fail in production. The reasons:

1. Data drift:       Input distribution has shifted from training
2. Concept drift:    The relationship between features and labels has changed
3. Label drift:      How ground truth is defined has changed
4. Pipeline bugs:    A preprocessing step is missing or different from training
5. Schema changes:   A feature column was renamed, reordered, or removed
6. Version mismatch: Different versions of sklearn/numpy produce different outputs

The Production Monitoring Stack

Python
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime
import numpy as np
from collections import deque

@dataclass
class PredictionLog:
    """Log for tracking model predictions over time."""
    timestamp:    datetime
    prediction:   float       # predicted probability
    features:     dict        # input features
    ground_truth: Optional[int] = None   # filled in later when known

class ProductionMonitor:
    """
    Tracks model health in production.
    Detects: prediction drift, feature drift, performance degradation.
    """
    def __init__(self, window_size: int = 1000):
        self.predictions = deque(maxlen=window_size)
        self.ground_truth = deque(maxlen=window_size)
        self.features_log  = deque(maxlen=window_size)
        self.window_size   = window_size

    def log_prediction(self, prob: float, features: dict, ground_truth: Optional[int] = None):
        self.predictions.append(prob)
        self.features_log.append(features)
        if ground_truth is not None:
            self.ground_truth.append(ground_truth)

    def check_prediction_distribution(self, training_mean: float, training_std: float) -> dict:
        """Alert if prediction scores are drifting."""
        if len(self.predictions) < 100:
            return {"status": "insufficient_data"}

        current_mean = np.mean(self.predictions)
        current_std  = np.std(self.predictions)
        drift = abs(current_mean - training_mean) / (training_std + 1e-9)

        return {
            "current_mean":   current_mean,
            "training_mean":  training_mean,
            "drift_z_score":  drift,
            "alert":          drift > 2.0,
            "message":        "Prediction distribution has shifted" if drift > 2.0 else "OK",
        }

    def check_feature_drift(self, feature_name: str, training_mean: float, training_std: float) -> dict:
        """Alert if a specific feature distribution has shifted."""
        recent_values = [f[feature_name] for f in self.features_log if feature_name in f]
        if len(recent_values) < 50:
            return {"status": "insufficient_data"}

        current_mean = np.mean(recent_values)
        drift_z = abs(current_mean - training_mean) / (training_std + 1e-9)

        return {
            "feature":       feature_name,
            "drift_z_score": drift_z,
            "alert":         drift_z > 3.0,
        }

    def rolling_auc(self) -> Optional[float]:
        """Compute AUC on recent labeled predictions."""
        if len(self.ground_truth) < 50:
            return None
        from sklearn.metrics import roc_auc_score
        try:
            return roc_auc_score(list(self.ground_truth), list(self.predictions)[-len(self.ground_truth):])
        except Exception:
            return None

Silent Failures: Cases the Model Never Sees

Python
import numpy as np

def detect_out_of_distribution(X_new: np.ndarray, X_train: np.ndarray,
                                feature_names: list, z_threshold: float = 3.0) -> list:
    """
    Detect features in a new sample that are outside the training distribution.
    These inputs are outside the model's experience — predictions may be unreliable.
    """
    train_means = X_train.mean(axis=0)
    train_stds  = X_train.std(axis=0)

    ood_features = []
    for i, name in enumerate(feature_names):
        if train_stds[i] < 1e-10:
            continue
        z = abs((X_new[i] - train_means[i]) / train_stds[i])
        if z > z_threshold:
            ood_features.append({
                "feature":     name,
                "new_value":   X_new[i],
                "train_mean":  train_means[i],
                "z_score":     z,
            })

    return ood_features

# Example: new patient with creatinine = 12.5 (training range was 0.5-4.0)
ood = detect_out_of_distribution(new_patient_features, X_train, feature_names)
if ood:
    print("Out-of-distribution features detected:")
    for feat in ood:
        print(f"  {feat['feature']}: value={feat['new_value']:.2f}, "
              f"train_mean={feat['train_mean']:.2f}, z={feat['z_score']:.1f}")
    print("Prediction confidence may be unreliable for this patient")

The Pipeline Mismatch Bug

Python
# One of the most common production failures:
# The preprocessing pipeline used in production doesn't match training

# WRONG: re-creating the scaler from scratch in production
from sklearn.preprocessing import StandardScaler
import numpy as np

# Training:
scaler_train = StandardScaler()
X_train_scaled = scaler_train.fit_transform(X_train)
model.fit(X_train_scaled, y_train)
# scaler_train.mean_ = [60.2, 1.4, 8.3, ...]  ← these are discarded

# Production:
scaler_prod = StandardScaler()   # NEW scaler → different mean/std
X_prod_scaled = scaler_prod.fit_transform(X_prod)   # wrong transform!

# CORRECT: save and load the fitted pipeline
import joblib

# Save after training
joblib.dump(pipeline, "readmission_pipeline.joblib")

# Load in production
pipeline_prod = joblib.load("readmission_pipeline.joblib")
y_prob = pipeline_prod.predict_proba(X_new)[:, 1]   # uses training statistics

Schema Change Detection

Python
import pandas as pd
from typing import Optional

def validate_schema(X_new: pd.DataFrame, training_schema: dict) -> list[str]:
    """
    Validates that incoming data matches the expected schema.
    training_schema: {feature_name: dtype}
    """
    errors = []

    # Missing columns
    missing = set(training_schema.keys()) - set(X_new.columns)
    if missing:
        errors.append(f"Missing columns: {missing}")

    # Extra columns (not necessarily an error, but flag it)
    extra = set(X_new.columns) - set(training_schema.keys())
    if extra:
        errors.append(f"Unexpected extra columns: {extra}")

    # Type mismatches
    for col, expected_dtype in training_schema.items():
        if col in X_new.columns:
            actual = X_new[col].dtype
            if str(actual) != expected_dtype:
                errors.append(f"Column '{col}': expected {expected_dtype}, got {actual}")

    # Null rate spike
    for col in training_schema:
        if col in X_new.columns:
            null_rate = X_new[col].isnull().mean()
            if null_rate > 0.5:
                errors.append(f"Column '{col}': {null_rate:.0%} null values (possible upstream issue)")

    return errors

# Usage in production endpoint
def predict_readmission(patient_data: dict) -> dict:
    X_new = pd.DataFrame([patient_data])
    errors = validate_schema(X_new, training_schema)

    if errors:
        return {"error": errors, "prediction": None}

    prob = pipeline.predict_proba(X_new)[0, 1]
    return {"prediction": float(prob), "high_risk": prob >= 0.4}

Performance Degradation Over Time

Python
# Track rolling performance as ground truth arrives
# (Ground truth often has a delay  readmission data available 30 days after discharge)

def rolling_performance_check(monitor: ProductionMonitor,
                              training_auc: float,
                              alert_threshold: float = 0.05) -> None:
    """
    Alert when rolling AUC drops significantly below training AUC.
    """
    rolling_auc = monitor.rolling_auc()

    if rolling_auc is None:
        print("Insufficient labeled data for AUC calculation (need 50+ labeled predictions)")
        return

    drop = training_auc - rolling_auc

    print(f"Training AUC:    {training_auc:.3f}")
    print(f"Rolling AUC:     {rolling_auc:.3f}")
    print(f"Drop:            {drop:.3f}")

    if drop > alert_threshold:
        print(f"ALERT: AUC dropped by {drop:.3f} — investigate:")
        print("  1. Check feature distributions for drift")
        print("  2. Check label distribution shift (readmission rate change)")
        print("  3. Check for pipeline or data schema changes")
        print("  4. Consider retraining with recent data")
    else:
        print("Performance within acceptable range ✓")

Interview Answer Template

Q: How do you debug a model that was working in development but is underperforming in production?

Production ML failures are almost always about data, not the model. The first question is: is the model actually making predictions, or is there a silent error? I'd check the prediction distribution — are scores clustered near 0, near 1, or at a fixed value? A distribution shift from development is a red flag. Then I'd validate the preprocessing pipeline: the most common production bug is a re-created scaler (different mean/std from training) or a missing feature transformation step. I'd save the fitted sklearn Pipeline object and load it in production rather than recreating it. Next, I'd look for feature drift — comparing mean and std of key features in production vs training. If ground truth is available (even delayed), I'd compute rolling AUC and alert when it drops. If it's a schema issue, I'd add a schema validation step at the prediction endpoint that catches missing or renamed columns before they cause silent errors.