Back to blog
Data Engineeringintermediate

Data Pipeline Architecture: Medallion, Ingestion Patterns, and Data Contracts

Design production data pipelines with the medallion architecture, understand ingestion strategies (full load, incremental, CDC), define data contracts, and apply patterns used in real analytics engineering teams.

LearnixoMay 7, 20269 min read
data engineeringmedallionarchitectureCDCingestiondata contractsELT
Share:𝕏

Architecture Is Strategy

The biggest mistakes in data engineering aren't coding mistakes — they're architecture mistakes. Choosing the wrong ingestion strategy costs months of re-engineering. This lesson gives you the patterns used by mature data teams.


1. Medallion Architecture Deep Dive

The medallion (or lakehouse) architecture organises data into three quality layers:

Sources ──► BRONZE ──► SILVER ──► GOLD
           (raw)      (clean)   (business)

Bronze Layer — Raw, Immutable

Purpose: Preserve everything exactly as it arrived. Never transform, never delete.

SQL
-- Snowflake: Bronze table design
CREATE TABLE bronze.orders_raw (
    -- Metadata added by pipeline
    _file_name       VARCHAR,
    _loaded_at       TIMESTAMP_TZ DEFAULT CURRENT_TIMESTAMP,
    _source          VARCHAR DEFAULT 'orders_api',
    _run_id          VARCHAR,

    -- Source columns as-is (VARCHAR to avoid parse failures)
    order_id         VARCHAR,
    customer_id      VARCHAR,
    product_id       VARCHAR,
    quantity         VARCHAR,      -- intentionally string
    amount           VARCHAR,      -- intentionally string
    currency         VARCHAR,
    status           VARCHAR,
    created_at       VARCHAR,      -- intentionally string
    raw_payload      VARIANT       -- full original JSON
);

Rules:

  • Schema is append-only — add columns, never remove or rename
  • Always store the original payload (VARIANT in Snowflake, JSON column)
  • Partition by _loaded_at for efficient pruning
  • Retention: 90 days minimum, often 1-7 years

Silver Layer — Cleaned, Typed, Deduplicated

Purpose: One clean record per entity. The single source of truth for analysts.

SQL
-- Silver: orders clean
CREATE TABLE silver.orders (
    order_id         VARCHAR         NOT NULL,
    customer_id      VARCHAR         NOT NULL,
    product_id       VARCHAR         NOT NULL,
    quantity         INTEGER         NOT NULL,
    amount_usd       DECIMAL(12, 2)  NOT NULL,
    currency         CHAR(3)         NOT NULL,
    status           VARCHAR         NOT NULL,
    created_at       TIMESTAMP_TZ    NOT NULL,
    updated_at       TIMESTAMP_TZ    NOT NULL,

    -- Lineage
    _source          VARCHAR,
    _loaded_at       TIMESTAMP_TZ DEFAULT CURRENT_TIMESTAMP,

    PRIMARY KEY (order_id)
);

dbt model for Bronze → Silver:

SQL
-- models/silver/orders.sql
WITH source AS (
    SELECT *
    FROM {{ source('bronze', 'orders_raw') }}
    WHERE _loaded_at >= DATEADD('day', -1, CURRENT_TIMESTAMP)   -- incremental window
),

parsed AS (
    SELECT
        order_id,
        customer_id,
        product_id,
        TRY_CAST(quantity AS INTEGER)         AS quantity,
        TRY_CAST(amount AS DECIMAL(12, 2))    AS amount_usd,
        UPPER(TRIM(currency))                 AS currency,
        LOWER(TRIM(status))                   AS status,
        TRY_TO_TIMESTAMP_TZ(created_at)       AS created_at,
        CURRENT_TIMESTAMP                     AS updated_at,
        _source,
        _loaded_at
    FROM source
    WHERE order_id IS NOT NULL
),

deduplicated AS (
    SELECT *,
           ROW_NUMBER() OVER (
               PARTITION BY order_id
               ORDER BY _loaded_at DESC
           ) AS _rn
    FROM parsed
    WHERE quantity IS NOT NULL AND amount_usd IS NOT NULL
)

SELECT * EXCLUDE (_rn)
FROM deduplicated
WHERE _rn = 1

Gold Layer — Business-Ready Aggregations

Purpose: Pre-aggregated, domain-specific models for dashboards, APIs, and ML features.

SQL
-- Gold: daily revenue by market
CREATE TABLE gold.daily_revenue (
    date             DATE            NOT NULL,
    market           VARCHAR         NOT NULL,
    product_category VARCHAR,
    total_revenue    DECIMAL(15, 2),
    order_count      INTEGER,
    avg_order_value  DECIMAL(10, 2),
    new_customers    INTEGER,
    returning_customers INTEGER,

    PRIMARY KEY (date, market)
);
SQL
-- models/gold/daily_revenue.sql
SELECT
    DATE_TRUNC('day', o.created_at)      AS date,
    c.market,
    p.category                           AS product_category,
    SUM(o.amount_usd)                    AS total_revenue,
    COUNT(DISTINCT o.order_id)           AS order_count,
    AVG(o.amount_usd)                    AS avg_order_value,
    COUNT(DISTINCT CASE
        WHEN o.created_at = c.first_order_at THEN o.customer_id
    END)                                 AS new_customers,
    COUNT(DISTINCT CASE
        WHEN o.created_at > c.first_order_at THEN o.customer_id
    END)                                 AS returning_customers
FROM silver.orders          o
JOIN silver.customers       c  ON o.customer_id  = c.customer_id
JOIN silver.products        p  ON o.product_id   = p.product_id
WHERE o.status = 'completed'
GROUP BY 1, 2, 3

2. Ingestion Strategies

Choosing the right ingestion strategy is one of the most important decisions in pipeline design.

Full Load

Copy the entire source table every run. Simple, but expensive at scale.

Python
def full_load(source_table: str, target_table: str) -> int:
    df = pd.read_sql(f"SELECT * FROM {source_table}", source_engine)
    df.to_sql(target_table, target_engine, if_exists="replace", index=False)
    return len(df)

When to use:

  • Small tables (< 100k rows)
  • Source doesn't have an updated_at column
  • Data can change arbitrarily (no reliable change tracking)
  • Lookup/reference tables

Incremental Load

Only extract rows added or modified since the last run.

Python
from datetime import datetime, timedelta

def incremental_load(last_run: datetime) -> int:
    df = pd.read_sql(
        """
        SELECT * FROM orders
        WHERE updated_at > %(last_run)s
        ORDER BY updated_at
        """,
        source_engine,
        params={"last_run": last_run},
    )
    # UPSERT into target
    df.to_sql("orders_staging", target_engine, if_exists="replace", index=False)
    target_engine.execute("""
        MERGE INTO orders_silver t
        USING orders_staging s ON t.order_id = s.order_id
        WHEN MATCHED THEN UPDATE SET ...
        WHEN NOT MATCHED THEN INSERT ...
    """)
    return len(df)

Requirements:

  • Source must have a reliable updated_at timestamp
  • Updated_at must be set on UPDATE, not just INSERT
  • You need to store the last successful run timestamp

When to use:

  • Large tables where full load is too slow
  • Tables with a reliable updated_at column

Change Data Capture (CDC)

Capture every INSERT, UPDATE, and DELETE from the source database's transaction log — without changing the source.

Source DB ──► Binlog/WAL ──► CDC Connector ──► Kafka ──► Target
             (transaction log)  (Debezium)

Tools:

  • Debezium — open source, reads MySQL binlog, PostgreSQL WAL, Oracle redo log
  • AWS DMS — managed CDC from RDS, Aurora to S3/Redshift
  • Fivetran / Airbyte — managed connectors with built-in CDC

CDC message format (Debezium):

JSON
{
  "op": "u",           // c=create, u=update, d=delete, r=read(snapshot)
  "before": {"order_id": "123", "status": "pending"},
  "after":  {"order_id": "123", "status": "shipped"},
  "source": {"table": "orders", "ts_ms": 1715000000000}
}

When to use:

  • You need to capture DELETEs (incremental misses these)
  • Low-latency ingestion (near real-time)
  • Can't query the source database (too slow, restricted access)
  • SCD Type 2 (track all historical changes)

Snapshot Load

Take a daily snapshot of the source table, preserving history.

SQL
-- Append a dated snapshot each day
INSERT INTO bronze.orders_snapshot
SELECT *, CURRENT_DATE AS _snapshot_date
FROM source.orders;

When to use:

  • You need point-in-time queries ("what did orders look like on March 1st?")
  • Source doesn't have updated_at
  • Audit requirements

3. Data Contracts

A data contract is a formal agreement between the producer (source team) and consumer (data engineering team) about what data will look like.

What a contract defines

YAML
# contracts/orders.yaml
name: orders
version: "1.2.0"
owner: "orders-team"
description: "All customer orders from the orders service"

schema:
  - name: order_id
    type: string
    required: true
    description: "Unique order identifier, format ORD-{uuid}"

  - name: customer_id
    type: string
    required: true

  - name: amount
    type: decimal(12,2)
    required: true
    constraints:
      minimum: 0.01

  - name: status
    type: string
    required: true
    enum: [pending, processing, shipped, delivered, cancelled, refunded]

  - name: created_at
    type: timestamp_tz
    required: true

sla:
  freshness: 5 minutes     # data must be no older than 5 min
  availability: 99.9%

delivery:
  method: kafka
  topic: orders.events
  format: JSON
  schema_registry: true

Validating against a contract in Python

Python
import jsonschema
from pathlib import Path
import yaml, json

def validate_against_contract(data: list[dict], contract_path: str) -> bool:
    contract = yaml.safe_load(Path(contract_path).read_text())

    # Build JSON Schema from contract
    properties = {}
    required = []
    for field in contract["schema"]:
        properties[field["name"]] = {"type": "string"}  # simplified
        if field.get("required"):
            required.append(field["name"])

    json_schema = {
        "type": "object",
        "properties": properties,
        "required": required,
    }

    errors = []
    for i, record in enumerate(data):
        try:
            jsonschema.validate(record, json_schema)
        except jsonschema.ValidationError as e:
            errors.append(f"Record {i}: {e.message}")

    if errors:
        raise ValueError(f"Contract violation: {errors[:5]}")
    return True

4. Slowly Changing Dimensions in Pipelines

When source data changes, you need a strategy for what to do in the warehouse.

SQL
-- SCD Type 2: track all changes with effective dates
-- (covered fully in the Data Modelling lesson)
MERGE INTO dim_customers t
USING (
    SELECT
        customer_id,
        email,
        CURRENT_TIMESTAMP AS valid_from,
        '9999-12-31'::TIMESTAMP AS valid_to,
        TRUE AS is_current
    FROM staging_customers
) s
ON t.customer_id = s.customer_id AND t.is_current = TRUE

WHEN MATCHED AND (t.email <> s.email) THEN
    UPDATE SET t.valid_to = CURRENT_TIMESTAMP, t.is_current = FALSE

WHEN NOT MATCHED THEN
    INSERT (customer_id, email, valid_from, valid_to, is_current)
    VALUES (s.customer_id, s.email, s.valid_from, s.valid_to, s.is_current);

-- Then insert new version for changed records
INSERT INTO dim_customers
SELECT ...
FROM staging_customers s
WHERE EXISTS (
    SELECT 1 FROM dim_customers t
    WHERE t.customer_id = s.customer_id
    AND t.email <> s.email
    AND t.is_current = FALSE
    AND t.valid_to >= CURRENT_TIMESTAMP - INTERVAL '1 minute'
);

5. Idempotency — The Most Important Pipeline Property

A pipeline is idempotent if running it multiple times produces the same result as running it once.

Why it matters

  • Retries are safe after failures
  • Backfilling historical data is safe
  • You can re-run without fear of duplicates

How to achieve it

Use MERGE instead of INSERT:

SQL
MERGE INTO silver.orders t
USING staging_orders s ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET
    t.status = s.status,
    t.updated_at = s.updated_at
WHEN NOT MATCHED THEN INSERT (order_id, customer_id, amount, status, created_at, updated_at)
VALUES (s.order_id, s.customer_id, s.amount, s.status, s.created_at, s.updated_at);

Delete-insert with date partitions:

SQL
-- Delete the partition first, then insert fresh
DELETE FROM gold.daily_revenue WHERE date = '{{ ds }}';

INSERT INTO gold.daily_revenue
SELECT ...
WHERE DATE_TRUNC('day', created_at) = '{{ ds }}';

Deduplication with window functions:

SQL
SELECT * EXCLUDE (_rn)
FROM (
    SELECT *, ROW_NUMBER() OVER (
        PARTITION BY order_id ORDER BY _loaded_at DESC
    ) AS _rn
    FROM bronze.orders_raw
)
WHERE _rn = 1

6. Freshness Monitoring

Know when your data went stale:

SQL
-- Snowflake: freshness check
SELECT
    table_name,
    MAX(_loaded_at)                                          AS latest_load,
    DATEDIFF('minute', MAX(_loaded_at), CURRENT_TIMESTAMP)  AS minutes_since_load,
    CASE
        WHEN DATEDIFF('minute', MAX(_loaded_at), CURRENT_TIMESTAMP) > 60
        THEN 'STALE'
        ELSE 'FRESH'
    END AS freshness_status
FROM bronze.orders_raw
GROUP BY table_name;
Python
# Python: alert if stale
def check_freshness(table: str, max_minutes: int = 60) -> None:
    result = snowflake_query(f"""
        SELECT DATEDIFF('minute', MAX(_loaded_at), CURRENT_TIMESTAMP) AS lag_minutes
        FROM {table}
    """)
    lag = result[0]["lag_minutes"]
    if lag > max_minutes:
        send_alert(f"{table} is {lag} minutes stale (threshold: {max_minutes})")

Summary

| Pattern | Use When | |---------|---------| | Medallion (bronze/silver/gold) | Default for all modern data warehouses | | Full load | Small tables, no updated_at, reference data | | Incremental load | Large tables with reliable updated_at | | CDC | Need DELETEs, near real-time, no query access to source | | Snapshot | Point-in-time history, no change tracking in source | | Data contracts | Formalise agreements between teams | | Idempotency | Always — use MERGE and delete-insert patterns | | Freshness monitoring | Every pipeline in production |

Next: transformations in dbt — models, tests, and the analytics engineering workflow.

Enjoyed this article?

Explore the Data Engineering learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.