Data Pipeline Architecture: Medallion, Ingestion Patterns, and Data Contracts
Design production data pipelines with the medallion architecture, understand ingestion strategies (full load, incremental, CDC), define data contracts, and apply patterns used in real analytics engineering teams.
Architecture Is Strategy
The biggest mistakes in data engineering aren't coding mistakes — they're architecture mistakes. Choosing the wrong ingestion strategy costs months of re-engineering. This lesson gives you the patterns used by mature data teams.
1. Medallion Architecture Deep Dive
The medallion (or lakehouse) architecture organises data into three quality layers:
Sources ──► BRONZE ──► SILVER ──► GOLD
(raw) (clean) (business)Bronze Layer — Raw, Immutable
Purpose: Preserve everything exactly as it arrived. Never transform, never delete.
-- Snowflake: Bronze table design
CREATE TABLE bronze.orders_raw (
-- Metadata added by pipeline
_file_name VARCHAR,
_loaded_at TIMESTAMP_TZ DEFAULT CURRENT_TIMESTAMP,
_source VARCHAR DEFAULT 'orders_api',
_run_id VARCHAR,
-- Source columns as-is (VARCHAR to avoid parse failures)
order_id VARCHAR,
customer_id VARCHAR,
product_id VARCHAR,
quantity VARCHAR, -- intentionally string
amount VARCHAR, -- intentionally string
currency VARCHAR,
status VARCHAR,
created_at VARCHAR, -- intentionally string
raw_payload VARIANT -- full original JSON
);Rules:
- Schema is append-only — add columns, never remove or rename
- Always store the original payload (
VARIANTin Snowflake,JSONcolumn) - Partition by
_loaded_atfor efficient pruning - Retention: 90 days minimum, often 1-7 years
Silver Layer — Cleaned, Typed, Deduplicated
Purpose: One clean record per entity. The single source of truth for analysts.
-- Silver: orders clean
CREATE TABLE silver.orders (
order_id VARCHAR NOT NULL,
customer_id VARCHAR NOT NULL,
product_id VARCHAR NOT NULL,
quantity INTEGER NOT NULL,
amount_usd DECIMAL(12, 2) NOT NULL,
currency CHAR(3) NOT NULL,
status VARCHAR NOT NULL,
created_at TIMESTAMP_TZ NOT NULL,
updated_at TIMESTAMP_TZ NOT NULL,
-- Lineage
_source VARCHAR,
_loaded_at TIMESTAMP_TZ DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (order_id)
);dbt model for Bronze → Silver:
-- models/silver/orders.sql
WITH source AS (
SELECT *
FROM {{ source('bronze', 'orders_raw') }}
WHERE _loaded_at >= DATEADD('day', -1, CURRENT_TIMESTAMP) -- incremental window
),
parsed AS (
SELECT
order_id,
customer_id,
product_id,
TRY_CAST(quantity AS INTEGER) AS quantity,
TRY_CAST(amount AS DECIMAL(12, 2)) AS amount_usd,
UPPER(TRIM(currency)) AS currency,
LOWER(TRIM(status)) AS status,
TRY_TO_TIMESTAMP_TZ(created_at) AS created_at,
CURRENT_TIMESTAMP AS updated_at,
_source,
_loaded_at
FROM source
WHERE order_id IS NOT NULL
),
deduplicated AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY _loaded_at DESC
) AS _rn
FROM parsed
WHERE quantity IS NOT NULL AND amount_usd IS NOT NULL
)
SELECT * EXCLUDE (_rn)
FROM deduplicated
WHERE _rn = 1Gold Layer — Business-Ready Aggregations
Purpose: Pre-aggregated, domain-specific models for dashboards, APIs, and ML features.
-- Gold: daily revenue by market
CREATE TABLE gold.daily_revenue (
date DATE NOT NULL,
market VARCHAR NOT NULL,
product_category VARCHAR,
total_revenue DECIMAL(15, 2),
order_count INTEGER,
avg_order_value DECIMAL(10, 2),
new_customers INTEGER,
returning_customers INTEGER,
PRIMARY KEY (date, market)
);-- models/gold/daily_revenue.sql
SELECT
DATE_TRUNC('day', o.created_at) AS date,
c.market,
p.category AS product_category,
SUM(o.amount_usd) AS total_revenue,
COUNT(DISTINCT o.order_id) AS order_count,
AVG(o.amount_usd) AS avg_order_value,
COUNT(DISTINCT CASE
WHEN o.created_at = c.first_order_at THEN o.customer_id
END) AS new_customers,
COUNT(DISTINCT CASE
WHEN o.created_at > c.first_order_at THEN o.customer_id
END) AS returning_customers
FROM silver.orders o
JOIN silver.customers c ON o.customer_id = c.customer_id
JOIN silver.products p ON o.product_id = p.product_id
WHERE o.status = 'completed'
GROUP BY 1, 2, 32. Ingestion Strategies
Choosing the right ingestion strategy is one of the most important decisions in pipeline design.
Full Load
Copy the entire source table every run. Simple, but expensive at scale.
def full_load(source_table: str, target_table: str) -> int:
df = pd.read_sql(f"SELECT * FROM {source_table}", source_engine)
df.to_sql(target_table, target_engine, if_exists="replace", index=False)
return len(df)When to use:
- Small tables (< 100k rows)
- Source doesn't have an updated_at column
- Data can change arbitrarily (no reliable change tracking)
- Lookup/reference tables
Incremental Load
Only extract rows added or modified since the last run.
from datetime import datetime, timedelta
def incremental_load(last_run: datetime) -> int:
df = pd.read_sql(
"""
SELECT * FROM orders
WHERE updated_at > %(last_run)s
ORDER BY updated_at
""",
source_engine,
params={"last_run": last_run},
)
# UPSERT into target
df.to_sql("orders_staging", target_engine, if_exists="replace", index=False)
target_engine.execute("""
MERGE INTO orders_silver t
USING orders_staging s ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...
""")
return len(df)Requirements:
- Source must have a reliable
updated_attimestamp - Updated_at must be set on UPDATE, not just INSERT
- You need to store the last successful run timestamp
When to use:
- Large tables where full load is too slow
- Tables with a reliable updated_at column
Change Data Capture (CDC)
Capture every INSERT, UPDATE, and DELETE from the source database's transaction log — without changing the source.
Source DB ──► Binlog/WAL ──► CDC Connector ──► Kafka ──► Target
(transaction log) (Debezium)Tools:
- Debezium — open source, reads MySQL binlog, PostgreSQL WAL, Oracle redo log
- AWS DMS — managed CDC from RDS, Aurora to S3/Redshift
- Fivetran / Airbyte — managed connectors with built-in CDC
CDC message format (Debezium):
{
"op": "u", // c=create, u=update, d=delete, r=read(snapshot)
"before": {"order_id": "123", "status": "pending"},
"after": {"order_id": "123", "status": "shipped"},
"source": {"table": "orders", "ts_ms": 1715000000000}
}When to use:
- You need to capture DELETEs (incremental misses these)
- Low-latency ingestion (near real-time)
- Can't query the source database (too slow, restricted access)
- SCD Type 2 (track all historical changes)
Snapshot Load
Take a daily snapshot of the source table, preserving history.
-- Append a dated snapshot each day
INSERT INTO bronze.orders_snapshot
SELECT *, CURRENT_DATE AS _snapshot_date
FROM source.orders;When to use:
- You need point-in-time queries ("what did orders look like on March 1st?")
- Source doesn't have updated_at
- Audit requirements
3. Data Contracts
A data contract is a formal agreement between the producer (source team) and consumer (data engineering team) about what data will look like.
What a contract defines
# contracts/orders.yaml
name: orders
version: "1.2.0"
owner: "orders-team"
description: "All customer orders from the orders service"
schema:
- name: order_id
type: string
required: true
description: "Unique order identifier, format ORD-{uuid}"
- name: customer_id
type: string
required: true
- name: amount
type: decimal(12,2)
required: true
constraints:
minimum: 0.01
- name: status
type: string
required: true
enum: [pending, processing, shipped, delivered, cancelled, refunded]
- name: created_at
type: timestamp_tz
required: true
sla:
freshness: 5 minutes # data must be no older than 5 min
availability: 99.9%
delivery:
method: kafka
topic: orders.events
format: JSON
schema_registry: trueValidating against a contract in Python
import jsonschema
from pathlib import Path
import yaml, json
def validate_against_contract(data: list[dict], contract_path: str) -> bool:
contract = yaml.safe_load(Path(contract_path).read_text())
# Build JSON Schema from contract
properties = {}
required = []
for field in contract["schema"]:
properties[field["name"]] = {"type": "string"} # simplified
if field.get("required"):
required.append(field["name"])
json_schema = {
"type": "object",
"properties": properties,
"required": required,
}
errors = []
for i, record in enumerate(data):
try:
jsonschema.validate(record, json_schema)
except jsonschema.ValidationError as e:
errors.append(f"Record {i}: {e.message}")
if errors:
raise ValueError(f"Contract violation: {errors[:5]}")
return True4. Slowly Changing Dimensions in Pipelines
When source data changes, you need a strategy for what to do in the warehouse.
-- SCD Type 2: track all changes with effective dates
-- (covered fully in the Data Modelling lesson)
MERGE INTO dim_customers t
USING (
SELECT
customer_id,
email,
CURRENT_TIMESTAMP AS valid_from,
'9999-12-31'::TIMESTAMP AS valid_to,
TRUE AS is_current
FROM staging_customers
) s
ON t.customer_id = s.customer_id AND t.is_current = TRUE
WHEN MATCHED AND (t.email <> s.email) THEN
UPDATE SET t.valid_to = CURRENT_TIMESTAMP, t.is_current = FALSE
WHEN NOT MATCHED THEN
INSERT (customer_id, email, valid_from, valid_to, is_current)
VALUES (s.customer_id, s.email, s.valid_from, s.valid_to, s.is_current);
-- Then insert new version for changed records
INSERT INTO dim_customers
SELECT ...
FROM staging_customers s
WHERE EXISTS (
SELECT 1 FROM dim_customers t
WHERE t.customer_id = s.customer_id
AND t.email <> s.email
AND t.is_current = FALSE
AND t.valid_to >= CURRENT_TIMESTAMP - INTERVAL '1 minute'
);5. Idempotency — The Most Important Pipeline Property
A pipeline is idempotent if running it multiple times produces the same result as running it once.
Why it matters
- Retries are safe after failures
- Backfilling historical data is safe
- You can re-run without fear of duplicates
How to achieve it
Use MERGE instead of INSERT:
MERGE INTO silver.orders t
USING staging_orders s ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET
t.status = s.status,
t.updated_at = s.updated_at
WHEN NOT MATCHED THEN INSERT (order_id, customer_id, amount, status, created_at, updated_at)
VALUES (s.order_id, s.customer_id, s.amount, s.status, s.created_at, s.updated_at);Delete-insert with date partitions:
-- Delete the partition first, then insert fresh
DELETE FROM gold.daily_revenue WHERE date = '{{ ds }}';
INSERT INTO gold.daily_revenue
SELECT ...
WHERE DATE_TRUNC('day', created_at) = '{{ ds }}';Deduplication with window functions:
SELECT * EXCLUDE (_rn)
FROM (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY order_id ORDER BY _loaded_at DESC
) AS _rn
FROM bronze.orders_raw
)
WHERE _rn = 16. Freshness Monitoring
Know when your data went stale:
-- Snowflake: freshness check
SELECT
table_name,
MAX(_loaded_at) AS latest_load,
DATEDIFF('minute', MAX(_loaded_at), CURRENT_TIMESTAMP) AS minutes_since_load,
CASE
WHEN DATEDIFF('minute', MAX(_loaded_at), CURRENT_TIMESTAMP) > 60
THEN 'STALE'
ELSE 'FRESH'
END AS freshness_status
FROM bronze.orders_raw
GROUP BY table_name;# Python: alert if stale
def check_freshness(table: str, max_minutes: int = 60) -> None:
result = snowflake_query(f"""
SELECT DATEDIFF('minute', MAX(_loaded_at), CURRENT_TIMESTAMP) AS lag_minutes
FROM {table}
""")
lag = result[0]["lag_minutes"]
if lag > max_minutes:
send_alert(f"{table} is {lag} minutes stale (threshold: {max_minutes})")Summary
| Pattern | Use When | |---------|---------| | Medallion (bronze/silver/gold) | Default for all modern data warehouses | | Full load | Small tables, no updated_at, reference data | | Incremental load | Large tables with reliable updated_at | | CDC | Need DELETEs, near real-time, no query access to source | | Snapshot | Point-in-time history, no change tracking in source | | Data contracts | Formalise agreements between teams | | Idempotency | Always — use MERGE and delete-insert patterns | | Freshness monitoring | Every pipeline in production |
Next: transformations in dbt — models, tests, and the analytics engineering workflow.
Enjoyed this article?
Explore the Data Engineering learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.