Data Pipeline Monitoring: Quality, Alerting, and Observability
Build observability into every data pipeline — data quality checks, row count validation, freshness monitoring, alerting, Great Expectations, and the operational patterns that keep pipelines reliable in production.
Pipelines Fail Silently Without Monitoring
A pipeline can run successfully (exit code 0) and still produce wrong data. A join on the wrong column. A type coercion that silently turned amounts to NULL. A vendor API that started returning empty arrays instead of an error.
Monitoring means you know before your stakeholders that something is wrong.
1. The Three Layers of Pipeline Monitoring
Layer 1: Infrastructure — Did the pipeline run? Did it crash?
Layer 2: Data Quality — Is the data correct? Complete? Fresh?
Layer 3: Business Logic — Does the data make business sense?Most teams have Layer 1. Few have Layer 2. Almost none have Layer 3. You need all three.
2. Logging at Every Stage
Every pipeline should emit structured logs at each stage:
import logging
import time
from contextlib import contextmanager
from typing import Generator
import structlog
log = structlog.get_logger()
@contextmanager
def pipeline_step(name: str, **metadata) -> Generator:
step_log = log.bind(step=name, **metadata)
step_log.info("step_started")
start = time.perf_counter()
try:
yield step_log
elapsed = time.perf_counter() - start
step_log.info("step_completed", duration_s=round(elapsed, 3))
except Exception as e:
elapsed = time.perf_counter() - start
step_log.error("step_failed", duration_s=round(elapsed, 3), error=str(e))
raise
# Usage
with pipeline_step("ingest", source="orders_api", date="2026-05-07") as step_log:
orders = fetch_orders()
step_log.info("records_fetched", count=len(orders))
with pipeline_step("validate") as step_log:
result = validate(orders)
step_log.info("validation_complete",
passed=result.passed,
rejected=result.rejected_count)
with pipeline_step("load", target="silver.orders") as step_log:
rows = load_to_snowflake(orders)
step_log.info("rows_loaded", count=rows)3. Row Count Validation
The simplest and most effective check:
from dataclasses import dataclass
@dataclass
class CountCheck:
step: str
count: int
expected_min: int
expected_max: int | None = None
@property
def passed(self) -> bool:
if self.count < self.expected_min:
return False
if self.expected_max and self.count > self.expected_max:
return False
return True
def assert_valid(self) -> None:
if not self.passed:
raise ValueError(
f"[{self.step}] Row count {self.count} outside expected range "
f"[{self.expected_min}, {self.expected_max}]"
)
class PipelineValidator:
def __init__(self, baseline: int | None = None):
self.baseline = baseline
self.checks: list[CountCheck] = []
def check_count(self, step: str, count: int) -> None:
min_rows = int(self.baseline * 0.8) if self.baseline else 1
check = CountCheck(step=step, count=count, expected_min=min_rows)
self.checks.append(check)
check.assert_valid()
log.info("count_check_passed", step=step, count=count)
def check_no_nulls(self, df, columns: list[str], step: str) -> None:
for col in columns:
null_count = df[col].isna().sum()
if null_count > 0:
raise ValueError(f"[{step}] Column '{col}' has {null_count} nulls")
def check_uniqueness(self, df, key_columns: list[str], step: str) -> None:
dup_count = df.duplicated(subset=key_columns).sum()
if dup_count > 0:
raise ValueError(f"[{step}] {dup_count} duplicate keys on {key_columns}")4. Schema Validation
Catch schema drift before it corrupts your warehouse:
from typing import Any
EXPECTED_SCHEMA = {
"order_id": str,
"customer_id": str,
"amount": (int, float),
"status": str,
"created_at": str,
}
VALID_STATUSES = {"pending", "processing", "shipped", "delivered", "cancelled"}
def validate_schema(records: list[dict]) -> tuple[list[dict], list[str]]:
valid = []
errors = []
for i, record in enumerate(records):
row_errors = []
# required fields
for field, expected_type in EXPECTED_SCHEMA.items():
if field not in record:
row_errors.append(f"missing field: {field}")
continue
val = record[field]
if val is None:
row_errors.append(f"null value: {field}")
continue
if not isinstance(val, expected_type):
row_errors.append(f"wrong type for {field}: expected {expected_type}, got {type(val)}")
# business rules
if record.get("amount", 0) <= 0:
row_errors.append(f"invalid amount: {record.get('amount')}")
if record.get("status") not in VALID_STATUSES:
row_errors.append(f"invalid status: {record.get('status')}")
if row_errors:
errors.append(f"Row {i}: {', '.join(row_errors)}")
else:
valid.append(record)
return valid, errors5. Great Expectations
Great Expectations (GX) is the most widely used Python data quality library.
pip install great-expectationsQuickstart with a DataFrame
import great_expectations as gx
import pandas as pd
df = pd.DataFrame([
{"order_id": "ORD-001", "amount": 99.99, "status": "completed"},
{"order_id": "ORD-002", "amount": -5.00, "status": "invalid"},
{"order_id": None, "amount": 50.00, "status": "completed"},
])
context = gx.get_context()
# Create a datasource
datasource = context.sources.add_or_update_pandas(name="orders")
asset = datasource.add_dataframe_asset(name="orders_df")
batch_request = asset.build_batch_request(dataframe=df)
# Define expectations
validator = context.get_validator(batch_request=batch_request)
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("amount")
validator.expect_column_values_to_be_between("amount", min_value=0.01)
validator.expect_column_values_to_be_in_set(
"status", ["pending", "processing", "completed", "cancelled"]
)
validator.expect_column_values_to_match_regex(
"order_id", r"^ORD-\d+$"
)
validator.expect_column_to_exist("customer_id")
validator.expect_table_row_count_to_be_between(min_value=1)
# Run the checkpoint
results = validator.validate()
print(results.success) # False — 2 violations found
for result in results.results:
if not result.success:
print(f"FAILED: {result.expectation_config.expectation_type}")
print(f" {result.result}")Expectations Catalog
# Null checks
validator.expect_column_values_to_not_be_null("order_id")
# Uniqueness
validator.expect_column_values_to_be_unique("order_id")
# Value ranges
validator.expect_column_values_to_be_between("amount", min_value=0, max_value=1_000_000)
# Set membership
validator.expect_column_values_to_be_in_set("currency", ["USD", "EUR", "GBP", "NOK"])
# Regex
validator.expect_column_values_to_match_regex("email", r"^[^@]+@[^@]+\.[^@]+$")
# Row counts
validator.expect_table_row_count_to_be_between(min_value=100, max_value=100_000)
# Schema
validator.expect_column_to_exist("order_id")
validator.expect_column_values_to_be_of_type("amount", "float")
# Statistical (anomaly detection)
validator.expect_column_mean_to_be_between("amount", min_value=50, max_value=500)
validator.expect_column_stdev_to_be_between("amount", max_value=1000)6. dbt Tests for Data Quality
Run quality checks inside your warehouse after transformation:
# models/silver/schema.yml
models:
- name: orders
description: "Cleaned, deduplicated orders from all sources"
columns:
- name: order_id
tests:
- not_null
- unique
- name: customer_id
tests:
- not_null
- relationships:
to: ref('customers')
field: customer_id
- name: amount_usd
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: status
tests:
- accepted_values:
values: [pending, processing, shipped, delivered, cancelled]
tests:
- dbt_utils.recency:
datepart: hour
field: created_at
interval: 6
config:
severity: warn7. Freshness Monitoring
import snowflake.connector
from datetime import datetime, timezone
from typing import Optional
def check_table_freshness(
table: str,
timestamp_column: str,
max_lag_minutes: int,
) -> dict:
result = run_query(f"""
SELECT
MAX({timestamp_column}) AS latest_record,
DATEDIFF('minute', MAX({timestamp_column}), CURRENT_TIMESTAMP) AS lag_minutes,
COUNT(*) AS row_count
FROM {table}
WHERE {timestamp_column} > DATEADD('day', -1, CURRENT_TIMESTAMP)
""")
row = result[0]
is_fresh = row["lag_minutes"] <= max_lag_minutes
return {
"table": table,
"latest_record": row["latest_record"],
"lag_minutes": row["lag_minutes"],
"row_count": row["row_count"],
"is_fresh": is_fresh,
"status": "OK" if is_fresh else f"STALE ({row['lag_minutes']} min lag)",
}
# Check multiple tables
tables_to_monitor = [
("silver.orders", "created_at", 60),
("silver.customers", "updated_at", 120),
("gold.daily_revenue", "date", 360),
]
for table, col, max_lag in tables_to_monitor:
result = check_table_freshness(table, col, max_lag)
if not result["is_fresh"]:
send_slack_alert(f":warning: {result['table']} is {result['status']}")8. Anomaly Detection — Statistical Checks
Catch sudden drops or spikes in data:
import statistics
def check_volume_anomaly(
current_count: int,
historical_counts: list[int],
threshold_sigma: float = 2.5,
) -> dict:
if len(historical_counts) < 7:
return {"status": "insufficient_history"}
mean = statistics.mean(historical_counts)
stdev = statistics.stdev(historical_counts)
if stdev == 0:
return {"status": "ok", "z_score": 0}
z_score = (current_count - mean) / stdev
is_anomaly = abs(z_score) > threshold_sigma
return {
"status": "anomaly" if is_anomaly else "ok",
"current": current_count,
"mean": round(mean),
"z_score": round(z_score, 2),
"direction": "drop" if z_score < 0 else "spike",
}
# Example: yesterday had 1,200 orders, last 30 days averaged 1,150 ± 80
historical = [1120, 1180, 1090, 1210, 1150, 1170, 1130, 1200, 1160, 1140]
result = check_volume_anomaly(current_count=400, historical_counts=historical)
# {"status": "anomaly", "z_score": -9.1, "direction": "drop"}9. Alerting Patterns
from enum import Enum
from dataclasses import dataclass, field
class AlertSeverity(str, Enum):
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
@dataclass
class Alert:
pipeline: str
message: str
severity: AlertSeverity
details: dict = field(default_factory=dict)
def send_slack_alert(alert: Alert) -> None:
import requests, os
emoji = {"info": ":information_source:", "warning": ":warning:",
"error": ":red_circle:", "critical": ":rotating_light:"}[alert.severity]
requests.post(
os.environ["SLACK_WEBHOOK_URL"],
json={
"text": f"{emoji} *{alert.pipeline}* — {alert.message}",
"attachments": [{
"color": "danger" if alert.severity in ("error", "critical") else "warning",
"fields": [
{"title": k, "value": str(v), "short": True}
for k, v in alert.details.items()
],
}],
},
)
def send_pagerduty_alert(alert: Alert) -> None:
import requests, os
requests.post(
"https://events.pagerduty.com/v2/enqueue",
json={
"routing_key": os.environ["PAGERDUTY_KEY"],
"event_action": "trigger",
"payload": {
"summary": f"[{alert.pipeline}] {alert.message}",
"severity": alert.severity,
"custom_details": alert.details,
},
},
)
def handle_pipeline_failure(pipeline: str, error: Exception, context: dict) -> None:
alert = Alert(
pipeline=pipeline,
message=str(error),
severity=AlertSeverity.ERROR,
details=context,
)
send_slack_alert(alert)
if "silver" in pipeline or "gold" in pipeline:
send_pagerduty_alert(alert) # escalate for production layers10. Monitoring Dashboard Queries
-- Pipeline run history
SELECT
pipeline_name,
run_date,
status,
rows_processed,
duration_seconds,
error_message
FROM pipeline_runs
WHERE run_date >= DATEADD('day', -7, CURRENT_DATE)
ORDER BY run_date DESC, pipeline_name;
-- Data freshness overview
SELECT
table_name,
DATEDIFF('minute', MAX(_loaded_at), CURRENT_TIMESTAMP) AS lag_minutes,
CASE
WHEN DATEDIFF('minute', MAX(_loaded_at), CURRENT_TIMESTAMP) < 60 THEN 'fresh'
WHEN DATEDIFF('minute', MAX(_loaded_at), CURRENT_TIMESTAMP) < 120 THEN 'warning'
ELSE 'stale'
END AS freshness
FROM information_schema.tables
WHERE table_schema = 'SILVER'
GROUP BY table_name;
-- Null rate by column (anomaly detection)
SELECT
'orders' AS table_name,
SUM(CASE WHEN order_id IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS order_id_null_pct,
SUM(CASE WHEN customer_id IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS customer_id_null_pct,
SUM(CASE WHEN amount_usd IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS amount_null_pct
FROM silver.orders
WHERE _loaded_at >= DATEADD('hour', -24, CURRENT_TIMESTAMP);Summary
| Layer | Tool / Pattern |
|-------|---------------|
| Infrastructure | Airflow task status, exit codes |
| Row counts | Before/after each stage, anomaly detection |
| Schema | validate_schema(), Great Expectations, dbt tests |
| Freshness | DATEDIFF queries, dbt source freshness |
| Business rules | GX expectations, dbt custom tests |
| Alerting | Slack webhooks, PagerDuty for critical |
| Logging | structlog, one log per stage with metrics |
| Dashboards | SQL over pipeline_runs table |
Next: dbt fundamentals — building the transformation layer with SQL models.
Enjoyed this article?
Explore the Data Engineering learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.