Back to blog
Data Engineeringintermediate

Data Pipeline Monitoring: Quality, Alerting, and Observability

Build observability into every data pipeline — data quality checks, row count validation, freshness monitoring, alerting, Great Expectations, and the operational patterns that keep pipelines reliable in production.

LearnixoMay 7, 20268 min read
data engineeringmonitoringdata qualityGreat Expectationsobservabilityalerting
Share:𝕏

Pipelines Fail Silently Without Monitoring

A pipeline can run successfully (exit code 0) and still produce wrong data. A join on the wrong column. A type coercion that silently turned amounts to NULL. A vendor API that started returning empty arrays instead of an error.

Monitoring means you know before your stakeholders that something is wrong.


1. The Three Layers of Pipeline Monitoring

Layer 1: Infrastructure    — Did the pipeline run? Did it crash?
Layer 2: Data Quality      — Is the data correct? Complete? Fresh?
Layer 3: Business Logic    — Does the data make business sense?

Most teams have Layer 1. Few have Layer 2. Almost none have Layer 3. You need all three.


2. Logging at Every Stage

Every pipeline should emit structured logs at each stage:

Python
import logging
import time
from contextlib import contextmanager
from typing import Generator
import structlog

log = structlog.get_logger()

@contextmanager
def pipeline_step(name: str, **metadata) -> Generator:
    step_log = log.bind(step=name, **metadata)
    step_log.info("step_started")
    start = time.perf_counter()
    try:
        yield step_log
        elapsed = time.perf_counter() - start
        step_log.info("step_completed", duration_s=round(elapsed, 3))
    except Exception as e:
        elapsed = time.perf_counter() - start
        step_log.error("step_failed", duration_s=round(elapsed, 3), error=str(e))
        raise

# Usage
with pipeline_step("ingest", source="orders_api", date="2026-05-07") as step_log:
    orders = fetch_orders()
    step_log.info("records_fetched", count=len(orders))

with pipeline_step("validate") as step_log:
    result = validate(orders)
    step_log.info("validation_complete",
                  passed=result.passed,
                  rejected=result.rejected_count)

with pipeline_step("load", target="silver.orders") as step_log:
    rows = load_to_snowflake(orders)
    step_log.info("rows_loaded", count=rows)

3. Row Count Validation

The simplest and most effective check:

Python
from dataclasses import dataclass

@dataclass
class CountCheck:
    step: str
    count: int
    expected_min: int
    expected_max: int | None = None

    @property
    def passed(self) -> bool:
        if self.count < self.expected_min:
            return False
        if self.expected_max and self.count > self.expected_max:
            return False
        return True

    def assert_valid(self) -> None:
        if not self.passed:
            raise ValueError(
                f"[{self.step}] Row count {self.count} outside expected range "
                f"[{self.expected_min}, {self.expected_max}]"
            )

class PipelineValidator:
    def __init__(self, baseline: int | None = None):
        self.baseline = baseline
        self.checks: list[CountCheck] = []

    def check_count(self, step: str, count: int) -> None:
        min_rows = int(self.baseline * 0.8) if self.baseline else 1
        check = CountCheck(step=step, count=count, expected_min=min_rows)
        self.checks.append(check)
        check.assert_valid()
        log.info("count_check_passed", step=step, count=count)

    def check_no_nulls(self, df, columns: list[str], step: str) -> None:
        for col in columns:
            null_count = df[col].isna().sum()
            if null_count > 0:
                raise ValueError(f"[{step}] Column '{col}' has {null_count} nulls")

    def check_uniqueness(self, df, key_columns: list[str], step: str) -> None:
        dup_count = df.duplicated(subset=key_columns).sum()
        if dup_count > 0:
            raise ValueError(f"[{step}] {dup_count} duplicate keys on {key_columns}")

4. Schema Validation

Catch schema drift before it corrupts your warehouse:

Python
from typing import Any

EXPECTED_SCHEMA = {
    "order_id":    str,
    "customer_id": str,
    "amount":      (int, float),
    "status":      str,
    "created_at":  str,
}

VALID_STATUSES = {"pending", "processing", "shipped", "delivered", "cancelled"}

def validate_schema(records: list[dict]) -> tuple[list[dict], list[str]]:
    valid = []
    errors = []

    for i, record in enumerate(records):
        row_errors = []

        # required fields
        for field, expected_type in EXPECTED_SCHEMA.items():
            if field not in record:
                row_errors.append(f"missing field: {field}")
                continue
            val = record[field]
            if val is None:
                row_errors.append(f"null value: {field}")
                continue
            if not isinstance(val, expected_type):
                row_errors.append(f"wrong type for {field}: expected {expected_type}, got {type(val)}")

        # business rules
        if record.get("amount", 0) <= 0:
            row_errors.append(f"invalid amount: {record.get('amount')}")
        if record.get("status") not in VALID_STATUSES:
            row_errors.append(f"invalid status: {record.get('status')}")

        if row_errors:
            errors.append(f"Row {i}: {', '.join(row_errors)}")
        else:
            valid.append(record)

    return valid, errors

5. Great Expectations

Great Expectations (GX) is the most widely used Python data quality library.

Bash
pip install great-expectations

Quickstart with a DataFrame

Python
import great_expectations as gx
import pandas as pd

df = pd.DataFrame([
    {"order_id": "ORD-001", "amount": 99.99, "status": "completed"},
    {"order_id": "ORD-002", "amount": -5.00, "status": "invalid"},
    {"order_id": None,      "amount": 50.00, "status": "completed"},
])

context = gx.get_context()

# Create a datasource
datasource = context.sources.add_or_update_pandas(name="orders")
asset = datasource.add_dataframe_asset(name="orders_df")
batch_request = asset.build_batch_request(dataframe=df)

# Define expectations
validator = context.get_validator(batch_request=batch_request)

validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("amount")
validator.expect_column_values_to_be_between("amount", min_value=0.01)
validator.expect_column_values_to_be_in_set(
    "status", ["pending", "processing", "completed", "cancelled"]
)
validator.expect_column_values_to_match_regex(
    "order_id", r"^ORD-\d+$"
)
validator.expect_column_to_exist("customer_id")
validator.expect_table_row_count_to_be_between(min_value=1)

# Run the checkpoint
results = validator.validate()
print(results.success)   # False  2 violations found

for result in results.results:
    if not result.success:
        print(f"FAILED: {result.expectation_config.expectation_type}")
        print(f"  {result.result}")

Expectations Catalog

Python
# Null checks
validator.expect_column_values_to_not_be_null("order_id")

# Uniqueness
validator.expect_column_values_to_be_unique("order_id")

# Value ranges
validator.expect_column_values_to_be_between("amount", min_value=0, max_value=1_000_000)

# Set membership
validator.expect_column_values_to_be_in_set("currency", ["USD", "EUR", "GBP", "NOK"])

# Regex
validator.expect_column_values_to_match_regex("email", r"^[^@]+@[^@]+\.[^@]+$")

# Row counts
validator.expect_table_row_count_to_be_between(min_value=100, max_value=100_000)

# Schema
validator.expect_column_to_exist("order_id")
validator.expect_column_values_to_be_of_type("amount", "float")

# Statistical (anomaly detection)
validator.expect_column_mean_to_be_between("amount", min_value=50, max_value=500)
validator.expect_column_stdev_to_be_between("amount", max_value=1000)

6. dbt Tests for Data Quality

Run quality checks inside your warehouse after transformation:

YAML
# models/silver/schema.yml
models:
  - name: orders
    description: "Cleaned, deduplicated orders from all sources"
    columns:
      - name: order_id
        tests:
          - not_null
          - unique

      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('customers')
              field: customer_id

      - name: amount_usd
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: ">= 0"

      - name: status
        tests:
          - accepted_values:
              values: [pending, processing, shipped, delivered, cancelled]

    tests:
      - dbt_utils.recency:
          datepart: hour
          field: created_at
          interval: 6
          config:
            severity: warn

7. Freshness Monitoring

Python
import snowflake.connector
from datetime import datetime, timezone
from typing import Optional

def check_table_freshness(
    table: str,
    timestamp_column: str,
    max_lag_minutes: int,
) -> dict:
    result = run_query(f"""
        SELECT
            MAX({timestamp_column})                                     AS latest_record,
            DATEDIFF('minute', MAX({timestamp_column}), CURRENT_TIMESTAMP) AS lag_minutes,
            COUNT(*)                                                     AS row_count
        FROM {table}
        WHERE {timestamp_column} > DATEADD('day', -1, CURRENT_TIMESTAMP)
    """)

    row = result[0]
    is_fresh = row["lag_minutes"] <= max_lag_minutes

    return {
        "table": table,
        "latest_record": row["latest_record"],
        "lag_minutes": row["lag_minutes"],
        "row_count": row["row_count"],
        "is_fresh": is_fresh,
        "status": "OK" if is_fresh else f"STALE ({row['lag_minutes']} min lag)",
    }

# Check multiple tables
tables_to_monitor = [
    ("silver.orders",    "created_at",   60),
    ("silver.customers", "updated_at",   120),
    ("gold.daily_revenue", "date",       360),
]

for table, col, max_lag in tables_to_monitor:
    result = check_table_freshness(table, col, max_lag)
    if not result["is_fresh"]:
        send_slack_alert(f":warning: {result['table']} is {result['status']}")

8. Anomaly Detection — Statistical Checks

Catch sudden drops or spikes in data:

Python
import statistics

def check_volume_anomaly(
    current_count: int,
    historical_counts: list[int],
    threshold_sigma: float = 2.5,
) -> dict:
    if len(historical_counts) < 7:
        return {"status": "insufficient_history"}

    mean = statistics.mean(historical_counts)
    stdev = statistics.stdev(historical_counts)

    if stdev == 0:
        return {"status": "ok", "z_score": 0}

    z_score = (current_count - mean) / stdev
    is_anomaly = abs(z_score) > threshold_sigma

    return {
        "status": "anomaly" if is_anomaly else "ok",
        "current": current_count,
        "mean": round(mean),
        "z_score": round(z_score, 2),
        "direction": "drop" if z_score < 0 else "spike",
    }

# Example: yesterday had 1,200 orders, last 30 days averaged 1,150 ± 80
historical = [1120, 1180, 1090, 1210, 1150, 1170, 1130, 1200, 1160, 1140]
result = check_volume_anomaly(current_count=400, historical_counts=historical)
# {"status": "anomaly", "z_score": -9.1, "direction": "drop"}

9. Alerting Patterns

Python
from enum import Enum
from dataclasses import dataclass, field

class AlertSeverity(str, Enum):
    INFO    = "info"
    WARNING = "warning"
    ERROR   = "error"
    CRITICAL = "critical"

@dataclass
class Alert:
    pipeline: str
    message: str
    severity: AlertSeverity
    details: dict = field(default_factory=dict)

def send_slack_alert(alert: Alert) -> None:
    import requests, os

    emoji = {"info": ":information_source:", "warning": ":warning:",
             "error": ":red_circle:", "critical": ":rotating_light:"}[alert.severity]

    requests.post(
        os.environ["SLACK_WEBHOOK_URL"],
        json={
            "text": f"{emoji} *{alert.pipeline}* — {alert.message}",
            "attachments": [{
                "color": "danger" if alert.severity in ("error", "critical") else "warning",
                "fields": [
                    {"title": k, "value": str(v), "short": True}
                    for k, v in alert.details.items()
                ],
            }],
        },
    )

def send_pagerduty_alert(alert: Alert) -> None:
    import requests, os
    requests.post(
        "https://events.pagerduty.com/v2/enqueue",
        json={
            "routing_key": os.environ["PAGERDUTY_KEY"],
            "event_action": "trigger",
            "payload": {
                "summary": f"[{alert.pipeline}] {alert.message}",
                "severity": alert.severity,
                "custom_details": alert.details,
            },
        },
    )

def handle_pipeline_failure(pipeline: str, error: Exception, context: dict) -> None:
    alert = Alert(
        pipeline=pipeline,
        message=str(error),
        severity=AlertSeverity.ERROR,
        details=context,
    )
    send_slack_alert(alert)
    if "silver" in pipeline or "gold" in pipeline:
        send_pagerduty_alert(alert)   # escalate for production layers

10. Monitoring Dashboard Queries

SQL
-- Pipeline run history
SELECT
    pipeline_name,
    run_date,
    status,
    rows_processed,
    duration_seconds,
    error_message
FROM pipeline_runs
WHERE run_date >= DATEADD('day', -7, CURRENT_DATE)
ORDER BY run_date DESC, pipeline_name;

-- Data freshness overview
SELECT
    table_name,
    DATEDIFF('minute', MAX(_loaded_at), CURRENT_TIMESTAMP) AS lag_minutes,
    CASE
        WHEN DATEDIFF('minute', MAX(_loaded_at), CURRENT_TIMESTAMP) < 60  THEN 'fresh'
        WHEN DATEDIFF('minute', MAX(_loaded_at), CURRENT_TIMESTAMP) < 120 THEN 'warning'
        ELSE 'stale'
    END AS freshness
FROM information_schema.tables
WHERE table_schema = 'SILVER'
GROUP BY table_name;

-- Null rate by column (anomaly detection)
SELECT
    'orders' AS table_name,
    SUM(CASE WHEN order_id   IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS order_id_null_pct,
    SUM(CASE WHEN customer_id IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS customer_id_null_pct,
    SUM(CASE WHEN amount_usd  IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS amount_null_pct
FROM silver.orders
WHERE _loaded_at >= DATEADD('hour', -24, CURRENT_TIMESTAMP);

Summary

| Layer | Tool / Pattern | |-------|---------------| | Infrastructure | Airflow task status, exit codes | | Row counts | Before/after each stage, anomaly detection | | Schema | validate_schema(), Great Expectations, dbt tests | | Freshness | DATEDIFF queries, dbt source freshness | | Business rules | GX expectations, dbt custom tests | | Alerting | Slack webhooks, PagerDuty for critical | | Logging | structlog, one log per stage with metrics | | Dashboards | SQL over pipeline_runs table |

Next: dbt fundamentals — building the transformation layer with SQL models.

Enjoyed this article?

Explore the Data Engineering learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.