Apache Airflow: Orchestrating Data Pipelines at Scale
Master Apache Airflow from scratch — DAGs, operators, sensors, XComs, task dependencies, scheduling, retries, and production deployment patterns used in real data engineering teams.
Why Orchestration Matters
Running python pipeline.py on a cron job breaks the moment you need:
- Task A to run only after Task B succeeds
- Automatic retries on failure
- Visibility into what ran, when, and why it failed
- Backfilling historical data ranges
- Parallel execution of independent tasks
Apache Airflow solves all of this. It's the most widely deployed pipeline orchestrator in data engineering.
1. Core Concepts
DAG — Directed Acyclic Graph
A DAG is a pipeline definition. It describes:
- What tasks to run
- The order/dependencies between them
- When to run (schedule)
extract_orders ──► validate ──► transform ──► load_snowflake
└──► send_reportThe "acyclic" part means no circular dependencies — Task A cannot depend on itself (directly or indirectly).
Task
A single unit of work: run a Python function, execute a SQL query, call an API, wait for a file.
Operator
The type of task. Airflow has 100+ built-in operators:
PythonOperator— run a Python functionBashOperator— run a shell commandSqlOperator— execute SQLS3ToSnowflakeOperator— copy S3 data to SnowflakeHttpSensor— wait until an HTTP endpoint returns 200
Scheduler
Reads DAG files, determines which tasks are ready to run, and triggers them. Runs continuously.
2. Installing Airflow
pip install apache-airflow[amazon,snowflake,postgres]
# initialize the metadata database
airflow db migrate
# create admin user
airflow users create \
--username admin \
--password admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com
# start
airflow webserver --port 8080 &
airflow scheduler &Or with Docker Compose (recommended for local dev):
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
docker-compose up -d3. Your First DAG
# dags/orders_pipeline.py
from __future__ import annotations
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
@dag(
dag_id="orders_daily_pipeline",
description="Ingest, transform and load daily orders",
schedule="0 2 * * *", # 2am every day (cron)
start_date=days_ago(1),
catchup=False, # don't backfill past runs
default_args={
"owner": "data-team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"email": ["data-alerts@company.com"],
},
tags=["orders", "ingestion"],
)
def orders_pipeline():
@task()
def extract() -> list[dict]:
import requests, os
resp = requests.get(
"https://api.example.com/orders",
params={"date": "{{ ds }}"}, # {{ ds }} = execution date (YYYY-MM-DD)
headers={"Authorization": f"Bearer {os.environ['API_TOKEN']}"},
timeout=30,
)
resp.raise_for_status()
return resp.json()["orders"]
@task()
def validate(orders: list[dict]) -> list[dict]:
errors = [r for r in orders if not r.get("order_id") or not r.get("amount")]
if errors:
raise ValueError(f"Validation failed: {len(errors)} invalid rows")
return orders
@task()
def transform(orders: list[dict]) -> list[dict]:
import pandas as pd
df = pd.DataFrame(orders)
df["amount"] = pd.to_numeric(df["amount"], errors="coerce")
df["created_at"] = pd.to_datetime(df["created_at"])
df = df.dropna(subset=["amount"])
return df.to_dict(orient="records")
@task()
def load(records: list[dict]) -> int:
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
import pandas as pd, os
df = pd.DataFrame(records)
with snowflake.connector.connect(
account=os.environ["SNOWFLAKE_ACCOUNT"],
user=os.environ["SNOWFLAKE_USER"],
password=os.environ["SNOWFLAKE_PASSWORD"],
database="ANALYTICS",
schema="RAW",
) as conn:
_, _, rows, _ = write_pandas(conn, df, "ORDERS")
return rows
# Wire up the pipeline
raw = extract()
validated = validate(raw)
transformed = transform(validated)
load(transformed)
dag_instance = orders_pipeline()4. Scheduling with Cron
schedule="0 2 * * *" # 2am daily
schedule="@daily" # midnight daily
schedule="@hourly" # top of every hour
schedule="0 6 * * 1" # 6am every Monday
schedule="*/15 * * * *" # every 15 minutes
schedule="0 0 1 * *" # midnight on the 1st of every month
schedule=None # manual trigger onlyDataset-based scheduling (Airflow 2.4+)
Trigger a DAG when another DAG produces an output:
from airflow import Dataset
orders_dataset = Dataset("snowflake://analytics/raw/orders")
# Producer DAG marks the dataset as updated
@task(outlets=[orders_dataset])
def load(records):
...
# Consumer DAG runs when orders_dataset is updated
@dag(schedule=[orders_dataset])
def reporting_pipeline():
...5. Task Dependencies
With @task decorator (TaskFlow API)
@dag(...)
def pipeline():
a = task_a()
b = task_b()
c = task_c()
d = task_d()
# a runs first, then b and c in parallel, then d
a >> [b, c] >> dWith classic operators
from airflow.operators.python import PythonOperator
extract_task = PythonOperator(task_id="extract", python_callable=extract, dag=dag)
validate_task = PythonOperator(task_id="validate", python_callable=validate, dag=dag)
load_task = PythonOperator(task_id="load", python_callable=load, dag=dag)
# dependencies
extract_task >> validate_task >> load_task6. Sensors — Wait for Conditions
Sensors poll until a condition is true, then let the next task proceed.
from airflow.sensors.filesystem import FileSensor
from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.time_delta import TimeDeltaSensor
from datetime import timedelta
# Wait for a file to appear
wait_for_file = FileSensor(
task_id="wait_for_orders_file",
filepath="/data/orders/{{ ds }}/orders.csv",
poke_interval=60, # check every 60 seconds
timeout=3600, # fail after 1 hour
mode="reschedule", # release worker slot while waiting (better for long waits)
dag=dag,
)
# Wait for an API to return 200
wait_for_api = HttpSensor(
task_id="wait_for_api",
http_conn_id="orders_api",
endpoint="/health",
poke_interval=30,
timeout=600,
dag=dag,
)7. XCom — Passing Data Between Tasks
XCom (cross-communication) lets tasks share small data.
@task()
def extract() -> dict:
return {"rows": 1500, "source": "orders_api"} # automatically pushed to XCom
@task()
def report(stats: dict) -> None:
print(f"Loaded {stats['rows']} rows from {stats['source']}")
# Wire it:
stats = extract()
report(stats)XCom is for small metadata (row counts, file paths, status codes). Never push DataFrames or large datasets through XCom — write to storage and pass the path.
8. Variables and Connections
Variables — store config in Airflow UI
from airflow.models import Variable
api_url = Variable.get("orders_api_url")
batch_size = int(Variable.get("batch_size", default_var="1000"))
# JSON variable
config = Variable.get("pipeline_config", deserialize_json=True)Set in UI: Admin → Variables, or via CLI:
airflow variables set orders_api_url "https://api.example.com"Connections — store credentials
from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection("snowflake_prod")
print(conn.host, conn.login, conn.password, conn.schema)Set in UI: Admin → Connections, or via environment variable:
AIRFLOW_CONN_SNOWFLAKE_PROD='snowflake://user:pass@account.snowflakecomputing.com/analytics'9. Retries and Error Handling
default_args = {
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True, # 5min, 10min, 20min
"max_retry_delay": timedelta(hours=1),
"on_failure_callback": alert_slack,
"on_retry_callback": log_retry,
}
def alert_slack(context):
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
hook = SlackWebhookHook(slack_webhook_conn_id="slack_alerts")
hook.send(
text=f":red_circle: DAG `{context['dag'].dag_id}` failed\n"
f"Task: `{context['task_instance'].task_id}`\n"
f"Time: {context['execution_date']}"
)10. Branching — Conditional Execution
from airflow.operators.python import BranchPythonOperator
@task.branch()
def choose_load_method(row_count: int) -> str:
if row_count > 100_000:
return "bulk_load"
else:
return "incremental_load"
branch = choose_load_method(count)
bulk = bulk_load_task()
incremental = incremental_load_task()
branch >> [bulk, incremental]11. Dynamic Task Mapping (Parallelism)
Process multiple items in parallel without writing a task per item:
@task()
def get_markets() -> list[str]:
return ["US", "UK", "DE", "FR", "NO"]
@task()
def process_market(market: str) -> int:
# runs once per market, all in parallel
rows = fetch_and_load(market)
return rows
markets = get_markets()
# expands into 5 parallel tasks automatically
process_market.expand(market=markets)12. Production Best Practices
DAG design rules
- Keep DAGs declarative — no complex logic in the DAG file itself
- One pipeline = one DAG (don't cram everything into one)
- Use tags for filtering:
tags=["ingestion", "orders", "tier-1"] - Set
catchup=Falseunless backfilling is needed - Test DAGs locally before deploying:
airflow dags test my_dag 2026-05-07
Task design rules
- Tasks should be idempotent — re-running produces the same result
- Tasks should be atomic — either fully complete or fail cleanly
- Pass metadata through XCom, not large data
- Use
mode="reschedule"on sensors to avoid holding worker slots
Infrastructure
# docker-compose production setup
services:
webserver:
image: apache/airflow:2.9.0
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
AIRFLOW__CORE__DAGS_FOLDER: /opt/airflow/dagsFolder structure
airflow/
dags/
orders_pipeline.py
customers_pipeline.py
reporting_pipeline.py
plugins/
hooks/
snowflake_hook.py
operators/
api_to_snowflake_operator.py
include/
sql/
transform_orders.sql
schemas/
orders_schema.json
tests/
test_orders_pipeline.py13. Testing DAGs
# tests/test_orders_pipeline.py
import pytest
from airflow.models import DagBag
def test_dag_loads_without_errors():
dagbag = DagBag(dag_folder="dags/", include_examples=False)
assert "orders_daily_pipeline" in dagbag.dags
assert len(dagbag.import_errors) == 0
def test_dag_structure():
dagbag = DagBag(dag_folder="dags/")
dag = dagbag.dags["orders_daily_pipeline"]
task_ids = [t.task_id for t in dag.tasks]
assert "extract" in task_ids
assert "validate" in task_ids
assert "load" in task_ids
def test_extract_task():
# unit test the callable directly
from dags.orders_pipeline import extract
with patch("requests.get") as mock_get:
mock_get.return_value.json.return_value = {"orders": [{"order_id": "1"}]}
result = extract.function()
assert len(result) == 1Summary
| Concept | Purpose |
|---------|---------|
| DAG | Pipeline definition — tasks + dependencies + schedule |
| Operator | Task type (Python, SQL, HTTP, Bash, etc.) |
| Sensor | Wait for a condition before proceeding |
| XCom | Pass small metadata between tasks |
| Variable | Store configuration in Airflow UI |
| Connection | Store credentials securely |
| Branching | Conditional task execution |
| Dynamic mapping | Fan-out tasks over a list in parallel |
| Idempotency | Re-running a task must be safe |
| catchup=False | Don't backfill historical runs unless needed |
Next: pipeline architecture patterns — medallion layers, data contracts, and ingestion strategies.
Enjoyed this article?
Explore the Data Engineering learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.