Back to blog
Data Engineeringintermediate

dbt in Production: CI/CD, Airflow Integration, and Multi-Environment Deployment

Deploy dbt at scale — multi-environment strategy, slim CI with state:modified, GitHub Actions pipelines, Airflow + dbt integration, job scheduling, and production operational patterns.

LearnixoMay 7, 20267 min read
dbtCI/CDproductionAirflowGitHub Actionsdeploymentanalytics engineering
Share:š•

dbt in Production Is an Engineering Problem

Local dbt run is the easy part. Production means: automated CI/CD, multiple environments, incremental runs that don't rebuild the whole warehouse, integrated with Airflow, with alerts and rollback capability. This lesson covers the full operations picture.


1. Multi-Environment Strategy

developer → dev schema (personal)
         → CI (PR check, isolated schema)
         → staging (pre-prod, full data)
         → production (real data, real users)

profiles.yml per environment

YAML
# ~/.dbt/profiles.yml
analytics:
  target: dev
  outputs:

    dev:
      type: snowflake
      account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
      user:    "{{ env_var('SNOWFLAKE_USER') }}"
      password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
      role:     TRANSFORMER
      database: ANALYTICS
      warehouse: TRANSFORM_WH_XS
      schema:   "DEV_{{ env_var('DBT_USER', 'local') }}"  # personal dev schema
      threads:  4

    ci:
      type: snowflake
      account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
      user:    "{{ env_var('SNOWFLAKE_USER') }}"
      password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
      role:     CI_TRANSFORMER
      database: ANALYTICS
      warehouse: TRANSFORM_WH_XS
      schema:   "CI_PR_{{ env_var('PR_NUMBER', 'local') }}"  # isolated per PR
      threads:  8

    staging:
      type: snowflake
      schema: STAGING
      threads: 8

    prod:
      type: snowflake
      schema: MARTS
      threads: 16

2. Slim CI — Only Build What Changed

The most important production optimization: don't rebuild 500 models on every PR.

Bash
# In CI, build only modified models + their downstream dependents
dbt build \
  --target ci \
  --select state:modified+   # changed models + everything downstream
  --defer \                  # use prod manifest for upstream refs
  --state /path/to/prod/manifest

This requires the production manifest:

YAML
# GitHub Actions: download prod manifest before CI run
- name: Download production manifest
  run: |
    aws s3 cp s3://my-dbt-artifacts/prod/manifest.json ./prod_manifest/manifest.json

3. GitHub Actions CI/CD Pipeline

YAML
# .github/workflows/dbt_ci.yml
name: dbt CI

on:
  pull_request:
    paths:
      - 'dbt/**'

jobs:
  dbt-ci:
    runs-on: ubuntu-latest
    defaults:
      run:
        working-directory: dbt/

    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.12"

      - name: Install dbt
        run: pip install dbt-snowflake

      - name: Install dbt packages
        run: dbt deps

      - name: Download production manifest (for slim CI)
        env:
          AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
          AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
        run: |
          aws s3 cp s3://my-dbt-artifacts/prod/manifest.json ./prod_manifest/manifest.json

      - name: dbt build (slim CI)
        env:
          SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
          SNOWFLAKE_USER:    ${{ secrets.SNOWFLAKE_CI_USER }}
          SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_CI_PASSWORD }}
          PR_NUMBER: ${{ github.event.pull_request.number }}
        run: |
          dbt build \
            --target ci \
            --select state:modified+ \
            --defer \
            --state ./prod_manifest

      - name: Upload test results
        if: always()
        uses: actions/upload-artifact@v4
        with:
          name: dbt-test-results
          path: dbt/target/run_results.json

      - name: Cleanup CI schema
        if: always()
        env:
          SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
          PR_NUMBER: ${{ github.event.pull_request.number }}
        run: |
          dbt run-operation drop_schema --args "{schema: CI_PR_$PR_NUMBER}"
YAML
# .github/workflows/dbt_prod.yml
name: dbt Production Deploy

on:
  push:
    branches: [main]
    paths:
      - 'dbt/**'

jobs:
  dbt-prod:
    runs-on: ubuntu-latest
    defaults:
      run:
        working-directory: dbt/

    steps:
      - uses: actions/checkout@v4

      - name: Install dbt
        run: pip install dbt-snowflake

      - name: dbt deps
        run: dbt deps

      - name: dbt build (production)
        env:
          SNOWFLAKE_ACCOUNT:  ${{ secrets.SNOWFLAKE_ACCOUNT }}
          SNOWFLAKE_USER:     ${{ secrets.SNOWFLAKE_PROD_USER }}
          SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PROD_PASSWORD }}
        run: dbt build --target prod

      - name: Upload manifest to S3 (for next CI slim run)
        env:
          AWS_ACCESS_KEY_ID:     ${{ secrets.AWS_ACCESS_KEY_ID }}
          AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
        run: |
          aws s3 cp target/manifest.json s3://my-dbt-artifacts/prod/manifest.json

      - name: Notify Slack on failure
        if: failure()
        uses: slackapi/slack-github-action@v1
        with:
          webhook: ${{ secrets.SLACK_WEBHOOK }}
          payload: '{"text": ":red_circle: dbt production build failed on main"}'

4. Airflow + dbt Integration

Two patterns for running dbt from Airflow:

Pattern 1: BashOperator (simple, universal)

Python
from airflow.operators.bash import BashOperator

dbt_run = BashOperator(
    task_id="dbt_run_marts",
    bash_command="""
        cd /opt/dbt/my_project &&
        dbt run
          --target prod
          --select marts.*
          --profiles-dir /opt/dbt/profiles
    """,
    env={
        "SNOWFLAKE_ACCOUNT": "{{ var('snowflake_account') }}",
        "SNOWFLAKE_USER":    "{{ var('snowflake_user') }}",
        "SNOWFLAKE_PASSWORD": "{{ var('snowflake_password') }}",
    },
    dag=dag,
)

dbt_test = BashOperator(
    task_id="dbt_test_marts",
    bash_command="cd /opt/dbt/my_project && dbt test --select marts.*",
    dag=dag,
)

dbt_run >> dbt_test

Pattern 2: cosmos (Astronomer, recommended)

Astronomer Cosmos turns each dbt model into an Airflow task, giving full visibility:

Python
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping

profile_config = ProfileConfig(
    profile_name="analytics",
    target_name="prod",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_prod",
        profile_args={"database": "ANALYTICS", "schema": "MARTS"},
    ),
)

dbt_dag = DbtDag(
    project_config=ProjectConfig("/opt/dbt/my_project"),
    profile_config=profile_config,
    execution_config=ExecutionConfig(dbt_executable_path="/usr/local/bin/dbt"),
    schedule="0 3 * * *",   # 3am daily
    dag_id="dbt_marts",
    tags=["dbt", "marts"],
)

This creates one Airflow task per dbt model, with full dependency graph, retries, and logs per model.


5. dbt Cloud (Managed Option)

If you use dbt Cloud instead of self-hosted:

YAML
# dbt Cloud job configuration
Job: "Production Run"
Schedule: "0 3 * * *"   # 3am
Commands:
  - dbt source freshness
  - dbt build --target prod

Job: "CI Slim Run" (triggered by PR)
Commands:
  - dbt build --select state:modified+ --defer --state <prod_artifact>

dbt Cloud provides:

  • IDE (dbt Cloud IDE)
  • Job scheduling with retries
  • Artifact storage (manifest.json for slim CI)
  • Explorer (lineage + documentation)
  • Semantic layer

6. Orchestration Patterns

Trigger dbt after ingestion completes

Python
# Airflow DAG: ingest first, then dbt transforms
ingest_orders = PythonOperator(task_id="ingest_orders", ...)
ingest_customers = PythonOperator(task_id="ingest_customers", ...)

dbt_staging = BashOperator(
    task_id="dbt_staging",
    bash_command="dbt run --select staging.*",
)

dbt_marts = BashOperator(
    task_id="dbt_marts",
    bash_command="dbt run --select marts.*",
)

dbt_test = BashOperator(
    task_id="dbt_test",
    bash_command="dbt test --select marts.*",
)

# Fan-in: wait for all ingestion, then transform
[ingest_orders, ingest_customers] >> dbt_staging >> dbt_marts >> dbt_test

Backfilling

Bash
# Rebuild a specific date partition
dbt run \
  --select fct_orders \
  --vars '{"start_date": "2026-01-01", "end_date": "2026-01-31"}'
  --full-refresh  # for incremental models, rebuilds from scratch

7. Breaking Changes and Rollback

Bash
# Before breaking change: audit helper comparison
dbt run-operation audit_helper.compare_relations --args '{
    "a_relation": "analytics.marts.fct_orders",
    "b_relation": "analytics.ci_pr_42.fct_orders"
}'
# Reports column differences, row count differences, value mismatches

# Rollback: Snowflake Time Travel
CREATE OR REPLACE TABLE analytics.marts.fct_orders
CLONE analytics.marts.fct_orders AT (TIMESTAMP => '2026-05-06 10:00:00'::TIMESTAMP_TZ);

8. Monitoring dbt Runs

Python
# Parse dbt run results in Python
import json
from pathlib import Path

def parse_dbt_results(results_path: str = "target/run_results.json") -> dict:
    results = json.loads(Path(results_path).read_text())

    summary = {
        "total": len(results["results"]),
        "passed": sum(1 for r in results["results"] if r["status"] in ("success", "pass")),
        "failed": [r["unique_id"] for r in results["results"] if r["status"] == "error"],
        "warnings": [r["unique_id"] for r in results["results"] if r["status"] == "warn"],
        "elapsed_time": results["elapsed_time"],
    }

    if summary["failed"]:
        send_alert(f"dbt failures: {summary['failed']}")

    return summary
SQL
-- Query dbt run history (if using dbt Cloud or custom logging)
SELECT
    run_id,
    started_at,
    finished_at,
    DATEDIFF('second', started_at, finished_at)  AS duration_s,
    models_run,
    models_errored,
    tests_run,
    tests_failed
FROM dbt_runs
ORDER BY started_at DESC
LIMIT 20;

9. Project Governance at Scale

dbt project structure for a large team:

my_project/
ā”œā”€ā”€ models/
│   ā”œā”€ā”€ staging/              # owned by data engineering
│   │   ā”œā”€ā”€ _sources.yml
│   │   ā”œā”€ā”€ stg_orders.sql
│   │   └── stg_customers.sql
│   ā”œā”€ā”€ intermediate/         # optional join layer
│   ā”œā”€ā”€ marts/
│   │   ā”œā”€ā”€ finance/          # owned by finance analytics
│   │   │   ā”œā”€ā”€ fct_orders.sql
│   │   │   ā”œā”€ā”€ dim_customers.sql
│   │   │   └── schema.yml
│   │   ā”œā”€ā”€ marketing/        # owned by marketing analytics
│   │   └── operations/
└── ...

Tags for ownership:
- tag:finance — finance team models
- tag:tier-1 — SLA: must pass in < 30 min
- tag:pii — contains personal data (restricted access)
YAML
# Enforce ownership via meta
models:
  - name: fct_orders
    meta:
      owner: finance-team
      tier: 1
      sla_minutes: 30
      contains_pii: false
      slack_channel: "#data-finance"

Summary

| Concern | Solution | |---------|---------| | Multiple envs | dev / CI / staging / prod profiles | | Fast CI | state:modified+ with --defer | | Automated deploys | GitHub Actions dbt build --target prod | | Airflow integration | BashOperator or Astronomer Cosmos | | Backfilling | --full-refresh + date vars | | Rollback | Snowflake Time Travel clone | | Breaking change detection | audit_helper.compare_relations | | Team ownership | Tags + meta in schema.yml |

You now have a complete dbt skillset from fundamentals through production operations. Next: data modelling — dimensional models, star schemas, and SCD patterns.

Enjoyed this article?

Explore the Data Engineering learning path for more.

Found this helpful?

Share:š•

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.