Back to blog
Data Engineeringintermediate

Apache Airflow: Orchestrating Data Pipelines at Scale

Master Apache Airflow from scratch — DAGs, operators, sensors, XComs, task dependencies, scheduling, retries, and production deployment patterns used in real data engineering teams.

LearnixoMay 7, 20268 min read
AirfloworchestrationDAGsdata engineeringschedulingpipelines
Share:𝕏

Why Orchestration Matters

Running python pipeline.py on a cron job breaks the moment you need:

  • Task A to run only after Task B succeeds
  • Automatic retries on failure
  • Visibility into what ran, when, and why it failed
  • Backfilling historical data ranges
  • Parallel execution of independent tasks

Apache Airflow solves all of this. It's the most widely deployed pipeline orchestrator in data engineering.


1. Core Concepts

DAG — Directed Acyclic Graph

A DAG is a pipeline definition. It describes:

  • What tasks to run
  • The order/dependencies between them
  • When to run (schedule)
extract_orders ──► validate ──► transform ──► load_snowflake
                                          └──► send_report

The "acyclic" part means no circular dependencies — Task A cannot depend on itself (directly or indirectly).

Task

A single unit of work: run a Python function, execute a SQL query, call an API, wait for a file.

Operator

The type of task. Airflow has 100+ built-in operators:

  • PythonOperator — run a Python function
  • BashOperator — run a shell command
  • SqlOperator — execute SQL
  • S3ToSnowflakeOperator — copy S3 data to Snowflake
  • HttpSensor — wait until an HTTP endpoint returns 200

Scheduler

Reads DAG files, determines which tasks are ready to run, and triggers them. Runs continuously.


2. Installing Airflow

Bash
pip install apache-airflow[amazon,snowflake,postgres]

# initialize the metadata database
airflow db migrate

# create admin user
airflow users create \
  --username admin \
  --password admin \
  --firstname Admin \
  --lastname User \
  --role Admin \
  --email admin@example.com

# start
airflow webserver --port 8080 &
airflow scheduler &

Or with Docker Compose (recommended for local dev):

Bash
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
docker-compose up -d

3. Your First DAG

Python
# dags/orders_pipeline.py
from __future__ import annotations

from datetime import datetime, timedelta

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

@dag(
    dag_id="orders_daily_pipeline",
    description="Ingest, transform and load daily orders",
    schedule="0 2 * * *",           # 2am every day (cron)
    start_date=days_ago(1),
    catchup=False,                   # don't backfill past runs
    default_args={
        "owner": "data-team",
        "retries": 2,
        "retry_delay": timedelta(minutes=5),
        "email_on_failure": True,
        "email": ["data-alerts@company.com"],
    },
    tags=["orders", "ingestion"],
)
def orders_pipeline():

    @task()
    def extract() -> list[dict]:
        import requests, os
        resp = requests.get(
            "https://api.example.com/orders",
            params={"date": "{{ ds }}"},        # {{ ds }} = execution date (YYYY-MM-DD)
            headers={"Authorization": f"Bearer {os.environ['API_TOKEN']}"},
            timeout=30,
        )
        resp.raise_for_status()
        return resp.json()["orders"]

    @task()
    def validate(orders: list[dict]) -> list[dict]:
        errors = [r for r in orders if not r.get("order_id") or not r.get("amount")]
        if errors:
            raise ValueError(f"Validation failed: {len(errors)} invalid rows")
        return orders

    @task()
    def transform(orders: list[dict]) -> list[dict]:
        import pandas as pd
        df = pd.DataFrame(orders)
        df["amount"] = pd.to_numeric(df["amount"], errors="coerce")
        df["created_at"] = pd.to_datetime(df["created_at"])
        df = df.dropna(subset=["amount"])
        return df.to_dict(orient="records")

    @task()
    def load(records: list[dict]) -> int:
        import snowflake.connector
        from snowflake.connector.pandas_tools import write_pandas
        import pandas as pd, os

        df = pd.DataFrame(records)
        with snowflake.connector.connect(
            account=os.environ["SNOWFLAKE_ACCOUNT"],
            user=os.environ["SNOWFLAKE_USER"],
            password=os.environ["SNOWFLAKE_PASSWORD"],
            database="ANALYTICS",
            schema="RAW",
        ) as conn:
            _, _, rows, _ = write_pandas(conn, df, "ORDERS")
            return rows

    # Wire up the pipeline
    raw = extract()
    validated = validate(raw)
    transformed = transform(validated)
    load(transformed)

dag_instance = orders_pipeline()

4. Scheduling with Cron

Python
schedule="0 2 * * *"     # 2am daily
schedule="@daily"         # midnight daily
schedule="@hourly"        # top of every hour
schedule="0 6 * * 1"      # 6am every Monday
schedule="*/15 * * * *"   # every 15 minutes
schedule="0 0 1 * *"      # midnight on the 1st of every month
schedule=None             # manual trigger only

Dataset-based scheduling (Airflow 2.4+)

Trigger a DAG when another DAG produces an output:

Python
from airflow import Dataset

orders_dataset = Dataset("snowflake://analytics/raw/orders")

# Producer DAG marks the dataset as updated
@task(outlets=[orders_dataset])
def load(records):
    ...

# Consumer DAG runs when orders_dataset is updated
@dag(schedule=[orders_dataset])
def reporting_pipeline():
    ...

5. Task Dependencies

With @task decorator (TaskFlow API)

Python
@dag(...)
def pipeline():
    a = task_a()
    b = task_b()
    c = task_c()
    d = task_d()

    # a runs first, then b and c in parallel, then d
    a >> [b, c] >> d

With classic operators

Python
from airflow.operators.python import PythonOperator

extract_task = PythonOperator(task_id="extract", python_callable=extract, dag=dag)
validate_task = PythonOperator(task_id="validate", python_callable=validate, dag=dag)
load_task = PythonOperator(task_id="load", python_callable=load, dag=dag)

# dependencies
extract_task >> validate_task >> load_task

6. Sensors — Wait for Conditions

Sensors poll until a condition is true, then let the next task proceed.

Python
from airflow.sensors.filesystem import FileSensor
from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.time_delta import TimeDeltaSensor
from datetime import timedelta

# Wait for a file to appear
wait_for_file = FileSensor(
    task_id="wait_for_orders_file",
    filepath="/data/orders/{{ ds }}/orders.csv",
    poke_interval=60,     # check every 60 seconds
    timeout=3600,         # fail after 1 hour
    mode="reschedule",    # release worker slot while waiting (better for long waits)
    dag=dag,
)

# Wait for an API to return 200
wait_for_api = HttpSensor(
    task_id="wait_for_api",
    http_conn_id="orders_api",
    endpoint="/health",
    poke_interval=30,
    timeout=600,
    dag=dag,
)

7. XCom — Passing Data Between Tasks

XCom (cross-communication) lets tasks share small data.

Python
@task()
def extract() -> dict:
    return {"rows": 1500, "source": "orders_api"}   # automatically pushed to XCom

@task()
def report(stats: dict) -> None:
    print(f"Loaded {stats['rows']} rows from {stats['source']}")

# Wire it:
stats = extract()
report(stats)

XCom is for small metadata (row counts, file paths, status codes). Never push DataFrames or large datasets through XCom — write to storage and pass the path.


8. Variables and Connections

Variables — store config in Airflow UI

Python
from airflow.models import Variable

api_url = Variable.get("orders_api_url")
batch_size = int(Variable.get("batch_size", default_var="1000"))

# JSON variable
config = Variable.get("pipeline_config", deserialize_json=True)

Set in UI: Admin → Variables, or via CLI:

Bash
airflow variables set orders_api_url "https://api.example.com"

Connections — store credentials

Python
from airflow.hooks.base import BaseHook

conn = BaseHook.get_connection("snowflake_prod")
print(conn.host, conn.login, conn.password, conn.schema)

Set in UI: Admin → Connections, or via environment variable:

Bash
AIRFLOW_CONN_SNOWFLAKE_PROD='snowflake://user:pass@account.snowflakecomputing.com/analytics'

9. Retries and Error Handling

Python
default_args = {
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "retry_exponential_backoff": True,   # 5min, 10min, 20min
    "max_retry_delay": timedelta(hours=1),
    "on_failure_callback": alert_slack,
    "on_retry_callback": log_retry,
}

def alert_slack(context):
    from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
    hook = SlackWebhookHook(slack_webhook_conn_id="slack_alerts")
    hook.send(
        text=f":red_circle: DAG `{context['dag'].dag_id}` failed\n"
             f"Task: `{context['task_instance'].task_id}`\n"
             f"Time: {context['execution_date']}"
    )

10. Branching — Conditional Execution

Python
from airflow.operators.python import BranchPythonOperator

@task.branch()
def choose_load_method(row_count: int) -> str:
    if row_count > 100_000:
        return "bulk_load"
    else:
        return "incremental_load"

branch = choose_load_method(count)
bulk = bulk_load_task()
incremental = incremental_load_task()

branch >> [bulk, incremental]

11. Dynamic Task Mapping (Parallelism)

Process multiple items in parallel without writing a task per item:

Python
@task()
def get_markets() -> list[str]:
    return ["US", "UK", "DE", "FR", "NO"]

@task()
def process_market(market: str) -> int:
    # runs once per market, all in parallel
    rows = fetch_and_load(market)
    return rows

markets = get_markets()
# expands into 5 parallel tasks automatically
process_market.expand(market=markets)

12. Production Best Practices

DAG design rules

  • Keep DAGs declarative — no complex logic in the DAG file itself
  • One pipeline = one DAG (don't cram everything into one)
  • Use tags for filtering: tags=["ingestion", "orders", "tier-1"]
  • Set catchup=False unless backfilling is needed
  • Test DAGs locally before deploying: airflow dags test my_dag 2026-05-07

Task design rules

  • Tasks should be idempotent — re-running produces the same result
  • Tasks should be atomic — either fully complete or fail cleanly
  • Pass metadata through XCom, not large data
  • Use mode="reschedule" on sensors to avoid holding worker slots

Infrastructure

YAML
# docker-compose production setup
services:
  webserver:
    image: apache/airflow:2.9.0
    environment:
      AIRFLOW__CORE__EXECUTOR: CeleryExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
      AIRFLOW__CORE__DAGS_FOLDER: /opt/airflow/dags

Folder structure

airflow/
  dags/
    orders_pipeline.py
    customers_pipeline.py
    reporting_pipeline.py
  plugins/
    hooks/
      snowflake_hook.py
    operators/
      api_to_snowflake_operator.py
  include/
    sql/
      transform_orders.sql
    schemas/
      orders_schema.json
  tests/
    test_orders_pipeline.py

13. Testing DAGs

Python
# tests/test_orders_pipeline.py
import pytest
from airflow.models import DagBag

def test_dag_loads_without_errors():
    dagbag = DagBag(dag_folder="dags/", include_examples=False)
    assert "orders_daily_pipeline" in dagbag.dags
    assert len(dagbag.import_errors) == 0

def test_dag_structure():
    dagbag = DagBag(dag_folder="dags/")
    dag = dagbag.dags["orders_daily_pipeline"]
    task_ids = [t.task_id for t in dag.tasks]
    assert "extract" in task_ids
    assert "validate" in task_ids
    assert "load" in task_ids

def test_extract_task():
    # unit test the callable directly
    from dags.orders_pipeline import extract
    with patch("requests.get") as mock_get:
        mock_get.return_value.json.return_value = {"orders": [{"order_id": "1"}]}
        result = extract.function()
        assert len(result) == 1

Summary

| Concept | Purpose | |---------|---------| | DAG | Pipeline definition — tasks + dependencies + schedule | | Operator | Task type (Python, SQL, HTTP, Bash, etc.) | | Sensor | Wait for a condition before proceeding | | XCom | Pass small metadata between tasks | | Variable | Store configuration in Airflow UI | | Connection | Store credentials securely | | Branching | Conditional task execution | | Dynamic mapping | Fan-out tasks over a list in parallel | | Idempotency | Re-running a task must be safe | | catchup=False | Don't backfill historical runs unless needed |

Next: pipeline architecture patterns — medallion layers, data contracts, and ingestion strategies.

Enjoyed this article?

Explore the Data Engineering learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.