Debugging ML Models in Production
Production ML debugging: monitoring prediction distributions, detecting silent failures, tracking performance over time, handling model degradation, and setting up alerts for data and concept drift.
Why Production Debugging Is Different
A model that works in development can silently fail in production. The reasons:
1. Data drift: Input distribution has shifted from training
2. Concept drift: The relationship between features and labels has changed
3. Label drift: How ground truth is defined has changed
4. Pipeline bugs: A preprocessing step is missing or different from training
5. Schema changes: A feature column was renamed, reordered, or removed
6. Version mismatch: Different versions of sklearn/numpy produce different outputsThe Production Monitoring Stack
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime
import numpy as np
from collections import deque
@dataclass
class PredictionLog:
"""Log for tracking model predictions over time."""
timestamp: datetime
prediction: float # predicted probability
features: dict # input features
ground_truth: Optional[int] = None # filled in later when known
class ProductionMonitor:
"""
Tracks model health in production.
Detects: prediction drift, feature drift, performance degradation.
"""
def __init__(self, window_size: int = 1000):
self.predictions = deque(maxlen=window_size)
self.ground_truth = deque(maxlen=window_size)
self.features_log = deque(maxlen=window_size)
self.window_size = window_size
def log_prediction(self, prob: float, features: dict, ground_truth: Optional[int] = None):
self.predictions.append(prob)
self.features_log.append(features)
if ground_truth is not None:
self.ground_truth.append(ground_truth)
def check_prediction_distribution(self, training_mean: float, training_std: float) -> dict:
"""Alert if prediction scores are drifting."""
if len(self.predictions) < 100:
return {"status": "insufficient_data"}
current_mean = np.mean(self.predictions)
current_std = np.std(self.predictions)
drift = abs(current_mean - training_mean) / (training_std + 1e-9)
return {
"current_mean": current_mean,
"training_mean": training_mean,
"drift_z_score": drift,
"alert": drift > 2.0,
"message": "Prediction distribution has shifted" if drift > 2.0 else "OK",
}
def check_feature_drift(self, feature_name: str, training_mean: float, training_std: float) -> dict:
"""Alert if a specific feature distribution has shifted."""
recent_values = [f[feature_name] for f in self.features_log if feature_name in f]
if len(recent_values) < 50:
return {"status": "insufficient_data"}
current_mean = np.mean(recent_values)
drift_z = abs(current_mean - training_mean) / (training_std + 1e-9)
return {
"feature": feature_name,
"drift_z_score": drift_z,
"alert": drift_z > 3.0,
}
def rolling_auc(self) -> Optional[float]:
"""Compute AUC on recent labeled predictions."""
if len(self.ground_truth) < 50:
return None
from sklearn.metrics import roc_auc_score
try:
return roc_auc_score(list(self.ground_truth), list(self.predictions)[-len(self.ground_truth):])
except Exception:
return NoneSilent Failures: Cases the Model Never Sees
import numpy as np
def detect_out_of_distribution(X_new: np.ndarray, X_train: np.ndarray,
feature_names: list, z_threshold: float = 3.0) -> list:
"""
Detect features in a new sample that are outside the training distribution.
These inputs are outside the model's experience ā predictions may be unreliable.
"""
train_means = X_train.mean(axis=0)
train_stds = X_train.std(axis=0)
ood_features = []
for i, name in enumerate(feature_names):
if train_stds[i] < 1e-10:
continue
z = abs((X_new[i] - train_means[i]) / train_stds[i])
if z > z_threshold:
ood_features.append({
"feature": name,
"new_value": X_new[i],
"train_mean": train_means[i],
"z_score": z,
})
return ood_features
# Example: new patient with creatinine = 12.5 (training range was 0.5-4.0)
ood = detect_out_of_distribution(new_patient_features, X_train, feature_names)
if ood:
print("Out-of-distribution features detected:")
for feat in ood:
print(f" {feat['feature']}: value={feat['new_value']:.2f}, "
f"train_mean={feat['train_mean']:.2f}, z={feat['z_score']:.1f}")
print("Prediction confidence may be unreliable for this patient")The Pipeline Mismatch Bug
# One of the most common production failures:
# The preprocessing pipeline used in production doesn't match training
# WRONG: re-creating the scaler from scratch in production
from sklearn.preprocessing import StandardScaler
import numpy as np
# Training:
scaler_train = StandardScaler()
X_train_scaled = scaler_train.fit_transform(X_train)
model.fit(X_train_scaled, y_train)
# scaler_train.mean_ = [60.2, 1.4, 8.3, ...] ā these are discarded
# Production:
scaler_prod = StandardScaler() # NEW scaler ā different mean/std
X_prod_scaled = scaler_prod.fit_transform(X_prod) # wrong transform!
# CORRECT: save and load the fitted pipeline
import joblib
# Save after training
joblib.dump(pipeline, "readmission_pipeline.joblib")
# Load in production
pipeline_prod = joblib.load("readmission_pipeline.joblib")
y_prob = pipeline_prod.predict_proba(X_new)[:, 1] # uses training statisticsSchema Change Detection
import pandas as pd
from typing import Optional
def validate_schema(X_new: pd.DataFrame, training_schema: dict) -> list[str]:
"""
Validates that incoming data matches the expected schema.
training_schema: {feature_name: dtype}
"""
errors = []
# Missing columns
missing = set(training_schema.keys()) - set(X_new.columns)
if missing:
errors.append(f"Missing columns: {missing}")
# Extra columns (not necessarily an error, but flag it)
extra = set(X_new.columns) - set(training_schema.keys())
if extra:
errors.append(f"Unexpected extra columns: {extra}")
# Type mismatches
for col, expected_dtype in training_schema.items():
if col in X_new.columns:
actual = X_new[col].dtype
if str(actual) != expected_dtype:
errors.append(f"Column '{col}': expected {expected_dtype}, got {actual}")
# Null rate spike
for col in training_schema:
if col in X_new.columns:
null_rate = X_new[col].isnull().mean()
if null_rate > 0.5:
errors.append(f"Column '{col}': {null_rate:.0%} null values (possible upstream issue)")
return errors
# Usage in production endpoint
def predict_readmission(patient_data: dict) -> dict:
X_new = pd.DataFrame([patient_data])
errors = validate_schema(X_new, training_schema)
if errors:
return {"error": errors, "prediction": None}
prob = pipeline.predict_proba(X_new)[0, 1]
return {"prediction": float(prob), "high_risk": prob >= 0.4}Performance Degradation Over Time
# Track rolling performance as ground truth arrives
# (Ground truth often has a delay ā readmission data available 30 days after discharge)
def rolling_performance_check(monitor: ProductionMonitor,
training_auc: float,
alert_threshold: float = 0.05) -> None:
"""
Alert when rolling AUC drops significantly below training AUC.
"""
rolling_auc = monitor.rolling_auc()
if rolling_auc is None:
print("Insufficient labeled data for AUC calculation (need 50+ labeled predictions)")
return
drop = training_auc - rolling_auc
print(f"Training AUC: {training_auc:.3f}")
print(f"Rolling AUC: {rolling_auc:.3f}")
print(f"Drop: {drop:.3f}")
if drop > alert_threshold:
print(f"ALERT: AUC dropped by {drop:.3f} ā investigate:")
print(" 1. Check feature distributions for drift")
print(" 2. Check label distribution shift (readmission rate change)")
print(" 3. Check for pipeline or data schema changes")
print(" 4. Consider retraining with recent data")
else:
print("Performance within acceptable range ā")Interview Answer Template
Q: How do you debug a model that was working in development but is underperforming in production?
Production ML failures are almost always about data, not the model. The first question is: is the model actually making predictions, or is there a silent error? I'd check the prediction distribution ā are scores clustered near 0, near 1, or at a fixed value? A distribution shift from development is a red flag. Then I'd validate the preprocessing pipeline: the most common production bug is a re-created scaler (different mean/std from training) or a missing feature transformation step. I'd save the fitted sklearn Pipeline object and load it in production rather than recreating it. Next, I'd look for feature drift ā comparing mean and std of key features in production vs training. If ground truth is available (even delayed), I'd compute rolling AUC and alert when it drops. If it's a schema issue, I'd add a schema validation step at the prediction endpoint that catches missing or renamed columns before they cause silent errors.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.