Time Series Analysis and Anomaly Detection for Data Engineers
Master time series fundamentals, decomposition, moving averages, and anomaly detection techniques — 3-sigma, CUSUM, Isolation Forest, and Prophet — with Python examples for pipeline monitoring.
Why Time Series for Data Engineers?
Your pipeline metrics are time series. Row counts per day. Latency per hour. Throughput per minute. Error rate per run. Every metric you collect is indexed by time — and understanding time series analysis is what lets you detect problems before they become incidents.
What you'll build: A complete pipeline monitoring system that:
- Decomposes trends and seasonality from your metrics
- Detects anomalies using multiple methods
- Forecasts expected values so you can alert on deviations
- Identifies day-of-week and hour-of-day patterns
Time Series Fundamentals
Every time series has three components:
Observed = Trend + Seasonality + Noise- Trend: The long-term direction (row counts growing as your business scales)
- Seasonality: Repeating patterns (more orders on Fridays, lower data volumes on weekends)
- Noise: Random variation that doesn't explain anything
As a data engineer, you want your alerts to fire on anomalous noise — not on expected seasonal patterns. That requires decomposing them first.
Generating Realistic Pipeline Metric Data
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
np.random.seed(42)
def generate_pipeline_metrics(days: int = 180) -> pd.DataFrame:
"""
Generate synthetic daily pipeline metrics with:
- Upward trend (business growth)
- Weekly seasonality (lower weekends)
- Random noise
- Injected anomalies
"""
dates = pd.date_range(start="2025-11-01", periods=days, freq="D")
# Trend: growing from 80k to 120k rows
trend = np.linspace(80_000, 120_000, days)
# Weekly seasonality: weekdays ~100%, weekends ~65%
day_of_week = dates.dayofweek # Mon=0, Sun=6
seasonality = np.where(day_of_week < 5, 1.0, 0.65)
# Random noise (±5%)
noise = np.random.normal(loc=1.0, scale=0.05, size=days)
# Compose signal
row_counts = (trend * seasonality * noise).astype(int)
# Inject anomalies: 3 incident days
anomaly_days = [30, 75, 140]
for d in anomaly_days:
row_counts[d] = int(row_counts[d] * 0.15) # severe drop
# Latency metrics (milliseconds) — log-normal distribution
base_latency = np.random.lognormal(mean=5.5, sigma=0.4, size=days)
latency_trend = np.linspace(1.0, 1.3, days) # latency creep
latency_ms = (base_latency * latency_trend).astype(int)
latency_ms[60] = 12_000 # incident spike
return pd.DataFrame({
"date": dates,
"row_count": row_counts,
"latency_ms": latency_ms,
}).set_index("date")
df = generate_pipeline_metrics(180)
print(df.head(10).to_string())Time Series Decomposition
from statsmodels.tsa.seasonal import seasonal_decompose
def decompose_metric(
series: pd.Series,
period: int = 7,
model: str = "multiplicative",
) -> dict:
"""
Decompose a pipeline metric into trend, seasonal, and residual components.
model="multiplicative" is better for data where seasonal swings scale
with the trend level (common in business metrics).
model="additive" is better for constant-magnitude seasonality.
"""
result = seasonal_decompose(series, model=model, period=period, extrapolate_trend="freq")
return {
"trend": result.trend,
"seasonal": result.seasonal,
"residual": result.resid,
"observed": result.observed,
}
# Decompose row counts (weekly period = 7 days)
decomp = decompose_metric(df["row_count"], period=7, model="multiplicative")
# What the seasonal component tells you:
seasonal_by_dow = (
decomp["seasonal"]
.groupby(decomp["seasonal"].index.dayofweek)
.mean()
.rename(index={0:"Mon",1:"Tue",2:"Wed",3:"Thu",4:"Fri",5:"Sat",6:"Sun"})
)
print("Seasonal multipliers by day of week:")
print(seasonal_by_dow.round(3))Output:
Seasonal multipliers by day of week:
Mon 1.021
Tue 1.034
Wed 1.028
Thu 1.019
Fri 1.048
Sat 0.652
Sun 0.613Now you know: Saturday and Sunday load ~35% fewer rows. An alert that ignores this would fire every weekend. The residual after removing trend and seasonality is what you should actually monitor.
Moving Averages: SMA and EWM
# ── Simple Moving Average (SMA) ──────────────────────────────────────────────
df["sma_7d"] = df["row_count"].rolling(window=7, min_periods=7).mean()
df["sma_28d"] = df["row_count"].rolling(window=28, min_periods=14).mean()
# ── Exponentially Weighted Mean (EWM) ────────────────────────────────────────
# span=7 means roughly equivalent to 7-period SMA but weights recent data more
df["ewm_7d"] = df["row_count"].ewm(span=7, adjust=False).mean()
# ── Bollinger Bands (SMA ± 2 std) ────────────────────────────────────────────
rolling_std = df["row_count"].rolling(window=28, min_periods=14).std()
df["upper_band"] = df["sma_28d"] + 2 * rolling_std
df["lower_band"] = df["sma_28d"] - 2 * rolling_std
df["outside_bands"] = (df["row_count"] > df["upper_band"]) | (df["row_count"] < df["lower_band"])
print(f"\nDays outside Bollinger Bands: {df['outside_bands'].sum()}")
print(df[df["outside_bands"]][["row_count", "sma_28d", "lower_band", "upper_band"]].head())SMA vs EWM tradeoffs:
- SMA: Equal weight to all points in the window. Simple to explain. Slow to react to trend changes.
- EWM: Recent points weighted more heavily. Better for trending data. Faster to react. Slightly harder to explain to stakeholders.
For alerting, EWM is usually better — it follows the trend and produces tighter, more relevant bands.
Anomaly Detection Method 1: 3-Sigma Rule
The simplest approach: flag values more than 3 standard deviations from the rolling mean.
def detect_anomalies_3sigma(
series: pd.Series,
window: int = 28,
threshold: float = 3.0,
) -> pd.Series:
"""
Rolling mean ± threshold * rolling std.
Returns boolean Series (True = anomaly).
"""
rolling_mean = series.rolling(window=window, min_periods=7).mean()
rolling_std = series.rolling(window=window, min_periods=7).std()
z_score = (series - rolling_mean).abs() / rolling_std
return z_score > threshold
df["anomaly_3sigma"] = detect_anomalies_3sigma(df["row_count"])
anomalies_3sigma = df[df["anomaly_3sigma"]]
print(f"3-sigma anomalies detected: {len(anomalies_3sigma)}")
print(anomalies_3sigma["row_count"].to_string())Limitation: 3-sigma assumes the metric is normally distributed. If your metric has strong seasonality and you don't remove it first, weekend values will always look like anomalies.
Best practice: Apply 3-sigma to the residuals from decomposition, not the raw series.
# Apply 3-sigma to the deseasoned residual — much more accurate
residual = decomp["residual"].dropna()
df["anomaly_on_residual"] = detect_anomalies_3sigma(residual, window=28, threshold=2.5)Anomaly Detection Method 2: CUSUM
CUSUM (Cumulative Sum) detects sustained shifts — gradual degradation that single-point tests miss.
def cusum_detector(
series: pd.Series,
target: float | None = None,
slack: float = 1.0,
threshold: float = 5.0,
) -> tuple[pd.Series, pd.Series]:
"""
CUSUM control chart.
target: expected mean (if None, uses series mean)
slack: allowable deviation in sigma units before cumulating
threshold: how many sigmas of cumulative deviation triggers an alarm
Returns: (cusum_high, cusum_low) — Series of cumulative sums.
Alarm when |cusum| > threshold * std.
"""
if target is None:
target = series.mean()
sigma = series.std()
k = slack * sigma # slack value (allowance)
cusum_pos = pd.Series(0.0, index=series.index)
cusum_neg = pd.Series(0.0, index=series.index)
for i, (idx, val) in enumerate(series.items()):
if i == 0:
continue
prev_pos = cusum_pos.iloc[i - 1]
prev_neg = cusum_neg.iloc[i - 1]
cusum_pos.iloc[i] = max(0, prev_pos + (val - target) - k)
cusum_neg.iloc[i] = min(0, prev_neg + (val - target) + k)
alarm_threshold = threshold * sigma
alarm_pos = cusum_pos > alarm_threshold
alarm_neg = cusum_neg < -alarm_threshold
return alarm_pos | alarm_neg, cusum_pos, cusum_neg
# Use on latency (detecting latency creep before SLA breach)
latency_clean = df["latency_ms"].copy()
alarms, cusum_hi, cusum_lo = cusum_detector(
latency_clean,
target=latency_clean[:30].mean(), # baseline from first 30 days
slack=0.5,
threshold=4.0,
)
print(f"CUSUM latency alarms: {alarms.sum()} days")
print("First alarm date:", alarms[alarms].index[0] if alarms.any() else "None")CUSUM vs 3-sigma: CUSUM catches gradual drift (latency slowly creeping from 200ms to 350ms over two weeks) that never triggers a single-point threshold. Essential for SLA breach prediction.
Anomaly Detection Method 3: Isolation Forest
Isolation Forest is a machine learning approach that works on multivariate data — useful when you have multiple correlated metrics.
from sklearn.ensemble import IsolationForest
def detect_anomalies_isolation_forest(
df_metrics: pd.DataFrame,
feature_cols: list[str],
contamination: float = 0.05,
) -> pd.Series:
"""
Isolation Forest: anomaly detection without distribution assumptions.
contamination: expected proportion of anomalies (set from domain knowledge).
Returns boolean Series (True = anomaly).
"""
X = df_metrics[feature_cols].dropna()
# Add time-based features (hour of day patterns, day of week)
X = X.copy()
X["dow"] = X.index.dayofweek
X["week"] = X.index.isocalendar().week.astype(int)
iso = IsolationForest(
n_estimators=200,
contamination=contamination,
random_state=42,
n_jobs=-1,
)
labels = iso.fit_predict(X)
# Isolation Forest returns -1 for anomalies, 1 for normal
return pd.Series(labels == -1, index=X.index)
# Multivariate: detect anomalies across row_count + latency_ms together
anomalies_iso = detect_anomalies_isolation_forest(
df,
feature_cols=["row_count", "latency_ms"],
contamination=0.04,
)
print(f"Isolation Forest anomalies: {anomalies_iso.sum()}")
print(df[anomalies_iso][["row_count", "latency_ms"]].to_string())When to use Isolation Forest over simpler methods:
- You have 3+ correlated metrics you want to monitor jointly
- You don't have a good prior on what the distribution looks like
- You need to detect anomalies that are normal in any single dimension but unusual in combination (low row count AND high latency together)
Forecasting with Prophet
Prophet (from Meta) is designed for business time series with strong seasonality and trend changes. It's perfect for forecasting expected pipeline volumes.
from prophet import Prophet
def forecast_pipeline_metric(
series: pd.Series,
forecast_days: int = 14,
include_weekly_seasonality: bool = True,
) -> pd.DataFrame:
"""
Forecast pipeline metric using Prophet.
Returns forecast DataFrame with yhat, yhat_lower, yhat_upper.
"""
# Prophet requires columns named 'ds' and 'y'
prophet_df = pd.DataFrame({
"ds": series.index,
"y": series.values,
})
model = Prophet(
yearly_seasonality=True,
weekly_seasonality=include_weekly_seasonality,
daily_seasonality=False,
changepoint_prior_scale=0.1, # lower = less flexible trend
seasonality_prior_scale=15.0, # higher = stronger seasonality fit
interval_width=0.95, # 95% prediction interval
)
model.fit(prophet_df)
future = model.make_future_dataframe(periods=forecast_days)
forecast = model.predict(future)
return forecast[["ds", "yhat", "yhat_lower", "yhat_upper"]].set_index("ds")
# Train on first 150 days, forecast next 14
train_series = df["row_count"].iloc[:150]
forecast = forecast_pipeline_metric(train_series, forecast_days=14)
# Check: did actual values fall inside the prediction interval?
actuals = df["row_count"].iloc[150:164]
out_of_bounds = actuals[
(actuals < forecast["yhat_lower"].reindex(actuals.index)) |
(actuals > forecast["yhat_upper"].reindex(actuals.index))
]
print(f"Days outside 95% forecast interval: {len(out_of_bounds)}")Prophet-based alerting strategy: Run Prophet nightly. If tomorrow's actual value falls outside the 95% prediction interval, fire an alert. This is far more intelligent than static thresholds because it accounts for trend growth and seasonal patterns automatically.
Hour-of-Day and Day-of-Week Pattern Analysis
def analyse_time_patterns(
series: pd.Series,
freq: str = "H",
) -> dict[str, pd.Series]:
"""
Analyse intra-day and day-of-week patterns in pipeline metrics.
series: must have DatetimeIndex with freq <= daily.
"""
# Resample to hourly if finer than hourly
if freq == "H":
hourly = series.resample("H").sum()
dow_pattern = hourly.groupby(hourly.index.dayofweek).median()
hour_pattern = hourly.groupby(hourly.index.hour).median()
dow_pattern.index = ["Mon","Tue","Wed","Thu","Fri","Sat","Sun"]
return {"dow": dow_pattern, "hour": hour_pattern}
else:
dow_pattern = series.groupby(series.index.dayofweek).median()
dow_pattern.index = ["Mon","Tue","Wed","Thu","Fri","Sat","Sun"]
return {"dow": dow_pattern}
# Simulate hourly event data for the pattern analysis
hourly_index = pd.date_range("2025-11-01", periods=180*24, freq="H")
hour_of_day = hourly_index.hour
day_of_week = hourly_index.dayofweek
# Business hours pattern (peak 9am-5pm, low at night, low weekends)
hour_multiplier = np.where((hour_of_day >= 9) & (hour_of_day <= 17), 1.5, 0.4)
dow_multiplier = np.where(day_of_week < 5, 1.0, 0.3)
base_volume = 5000
hourly_events = (
base_volume * hour_multiplier * dow_multiplier *
np.random.lognormal(0, 0.15, len(hourly_index))
).astype(int)
hourly_series = pd.Series(hourly_events, index=hourly_index)
patterns = analyse_time_patterns(hourly_series)
print("Median hourly volume by day of week:")
print(patterns["dow"].round(0).to_string())
print("\nMedian hourly volume by hour of day (top 5):")
print(patterns["hour"].nlargest(5).round(0).to_string())This analysis lets you build time-aware SLA budgets. Your latency SLA during business hours might be 200ms while overnight it's 500ms. Dynamic thresholds that know what time it is drastically reduce false-positive alerts.
Using pandas resample() and rolling() for Pipeline Monitoring
# ── resample() patterns ───────────────────────────────────────────────────────
# Aggregate from hourly to daily
daily_volume = hourly_series.resample("D").sum()
# Week-over-week comparison
weekly_volume = hourly_series.resample("W").sum()
wow_change = weekly_volume.pct_change()
print("Week-over-week volume change:")
print(wow_change.tail(4).round(3))
# ── rolling() patterns ────────────────────────────────────────────────────────
# Rolling 7-day statistics
roll7 = daily_volume.rolling(7)
df_roll = pd.DataFrame({
"volume": daily_volume,
"roll_mean": roll7.mean(),
"roll_std": roll7.std(),
"roll_min": roll7.min(),
"roll_max": roll7.max(),
"roll_cv": roll7.std() / roll7.mean(), # coefficient of variation
})
# Flag high-variability weeks (CV > 0.15 means unstable pipeline)
unstable_weeks = df_roll[df_roll["roll_cv"] > 0.15]
print(f"\nUnstable weeks (CV > 15%): {len(unstable_weeks)}")Complete Pipeline Monitoring System
from dataclasses import dataclass
from typing import Callable
@dataclass
class AnomalyAlert:
metric: str
timestamp: pd.Timestamp
value: float
method: str
severity: str # "warning" | "critical"
message: str
class PipelineTimeSeriesMonitor:
"""
Complete time series monitoring for a data pipeline.
Pluggable anomaly detection methods, configurable per metric.
"""
def __init__(self, metric_name: str, history: pd.Series):
self.metric_name = metric_name
self.history = history
self.alerts: list[AnomalyAlert] = []
def run_all_checks(
self,
new_value: float,
timestamp: pd.Timestamp,
) -> list[AnomalyAlert]:
self.alerts = []
self._check_3sigma(new_value, timestamp)
self._check_prophet_bounds(new_value, timestamp)
self._check_week_over_week(new_value, timestamp)
return self.alerts
def _check_3sigma(self, value: float, ts: pd.Timestamp):
window = self.history.tail(28)
mean, std = window.mean(), window.std()
if std == 0:
return
z = abs(value - mean) / std
if z > 3.0:
self.alerts.append(AnomalyAlert(
metric = self.metric_name,
timestamp = ts,
value = value,
method = "3-sigma",
severity = "critical" if z > 5.0 else "warning",
message = f"z={z:.2f} | expected {mean:.0f}±{2*std:.0f}",
))
def _check_week_over_week(self, value: float, ts: pd.Timestamp):
"""Compare to same day last week."""
same_day_last_week = ts - pd.Timedelta(days=7)
if same_day_last_week not in self.history.index:
return
last_week_val = self.history[same_day_last_week]
if last_week_val == 0:
return
pct_change = (value - last_week_val) / last_week_val
if abs(pct_change) > 0.25: # >25% WoW change
self.alerts.append(AnomalyAlert(
metric = self.metric_name,
timestamp = ts,
value = value,
method = "week-over-week",
severity = "warning",
message = f"WoW change: {pct_change:+.1%} (last week: {last_week_val:.0f})",
))
def _check_prophet_bounds(self, value: float, ts: pd.Timestamp):
"""Check if value falls outside Prophet's 95% prediction interval."""
try:
forecast = forecast_pipeline_metric(self.history.tail(120), forecast_days=1)
if ts in forecast.index:
lower = forecast.loc[ts, "yhat_lower"]
upper = forecast.loc[ts, "yhat_upper"]
if not (lower <= value <= upper):
self.alerts.append(AnomalyAlert(
metric = self.metric_name,
timestamp = ts,
value = value,
method = "prophet",
severity = "warning",
message = f"Outside 95% interval [{lower:.0f}, {upper:.0f}]",
))
except Exception:
pass # Prophet may fail with insufficient data; degrade gracefully
# ── Usage ─────────────────────────────────────────────────────────────────────
monitor = PipelineTimeSeriesMonitor("daily_row_count", df["row_count"].iloc[:170])
new_ts = df.index[170]
new_val = int(df["row_count"].iloc[170] * 0.12) # simulate incident
alerts = monitor.run_all_checks(new_val, new_ts)
for a in alerts:
print(f"[{a.severity.upper()}] {a.method}: {a.message}")Key Takeaways
| Method | Best For | Limitation | |---|---|---| | 3-sigma on residuals | Point anomalies after deseasonalisation | Assumes residual normality | | Bollinger Bands | Visual monitoring dashboards | Lags trend changes | | CUSUM | Detecting gradual drift, SLA creep | Requires baseline target | | Isolation Forest | Multivariate, no distribution assumption | Black box, needs tuning | | Prophet | Forecasting + threshold generation | Slow, requires enough history | | Week-over-week | Simple, explainable alerts | Misses non-weekly patterns |
Practical recommendation: Run 3-sigma on residuals as your primary alert. Add CUSUM for latency SLA monitoring. Add Prophet for capacity forecasting. Use Isolation Forest only if you have 3+ correlated metrics you need to monitor jointly.
Start with the simplest method that works. You can always layer complexity on top.
Enjoyed this article?
Explore the Data Engineering learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.