Back to blog
Data Engineeringintermediate

BigQuery Python Pipelines: From Raw Data to Production

Build production-grade data pipelines with the BigQuery Python client — authentication, schema enforcement, cost estimation, Airflow operators, dbt adapter config, and a complete API-to-BigQuery ETL.

LearnixoMay 7, 202613 min read
BigQueryPythonETLAirflowdbtPandasData EngineeringGCP
Share:š•

Why Python for BigQuery Pipelines?

The BigQuery console and bq CLI are great for exploration. But production pipelines need version-controlled, testable, parameterized code that integrates with orchestration systems. The google-cloud-bigquery Python client is the standard bridge between BigQuery and the rest of your stack.

Installation and Setup

Bash
# Core client
pip install google-cloud-bigquery

# DataFrame support (pandas + pyarrow)
pip install "google-cloud-bigquery[pandas]"

# BigQuery Storage API for fast reads (optional but recommended)
pip install google-cloud-bigquery-storage

# Everything at once (recommended for data engineering work)
pip install \
    google-cloud-bigquery \
    google-cloud-bigquery-storage \
    "google-cloud-bigquery[pandas]" \
    pyarrow \
    db-dtypes

Authentication

BigQuery supports two authentication patterns.

Application Default Credentials (ADC) — Recommended

ADC uses the environment's credential chain: Workload Identity (GKE), instance service account (Compute Engine), or ~/.config/gcloud/application_default_credentials.json (local dev).

Python
from google.cloud import bigquery

# ADC — zero config needed in Cloud environments
client = bigquery.Client(project="myproject")

Local development setup:

Bash
gcloud auth application-default login
# Or, for CI/CD pipelines that don't use Workload Identity:
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account.json"

Explicit Service Account Key File

Python
from google.cloud import bigquery
from google.oauth2 import service_account

credentials = service_account.Credentials.from_service_account_file(
    "/path/to/service-account.json",
    scopes=["https://www.googleapis.com/auth/bigquery"],
)

client = bigquery.Client(project="myproject", credentials=credentials)

In production on GCP, use Workload Identity or the default service account. Avoid key files — they are long-lived credentials that are easy to leak. Use key files only as a last resort for services outside GCP.

Running Queries

Basic Query

Python
from google.cloud import bigquery

client = bigquery.Client(project="myproject")

query = """
    SELECT
        user_id,
        COUNT(*) AS session_count,
        SUM(revenue) AS total_revenue
    FROM `myproject.analytics.sessions`
    WHERE session_date = CURRENT_DATE()
    GROUP BY user_id
    ORDER BY total_revenue DESC
    LIMIT 100
"""

# client.query() returns a QueryJob; calling .result() blocks until done
job = client.query(query)
rows = job.result()

for row in rows:
    print(f"user={row.user_id}  sessions={row.session_count}  revenue={row.total_revenue}")

QueryJobConfig

QueryJobConfig controls job-level settings: destination table, write disposition, location, timeout, and more.

Python
from google.cloud import bigquery

client = bigquery.Client(project="myproject")

job_config = bigquery.QueryJobConfig(
    # Write result to a specific table
    destination="myproject.analytics.top_users_today",
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    # Run in a specific location (must match dataset location)
    location="EU",
    # Job-level timeout
    job_timeout_ms=60_000,
    # Use a specific reservation slot (if on flat-rate pricing)
    # reservation="projects/myproject/locations/eu/reservations/analytics-reservation",
)

job = client.query(query, job_config=job_config)
job.result()  # Wait for completion
print(f"Job {job.job_id} wrote {job.num_dml_affected_rows} rows")

Reading Results to DataFrame

Python
import pandas as pd
from google.cloud import bigquery

client = bigquery.Client(project="myproject")

query = """
    SELECT
        event_date,
        country,
        COUNT(*) AS events,
        APPROX_COUNT_DISTINCT(user_id) AS unique_users
    FROM `myproject.analytics.events`
    WHERE event_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
    GROUP BY event_date, country
"""

# to_dataframe() converts BigQuery types to pandas dtypes automatically
df = client.query(query).to_dataframe()
print(df.dtypes)
# event_date       dbdate
# country           object
# events             int64
# unique_users       int64

# Specify progress bar for large results
df = client.query(query).to_dataframe(progress_bar_type="tqdm")

Loading a DataFrame to BigQuery

Python
import pandas as pd
from google.cloud import bigquery

client = bigquery.Client(project="myproject")

# Sample DataFrame
df = pd.DataFrame({
    "user_id": ["u001", "u002", "u003"],
    "signup_date": pd.to_datetime(["2026-05-01", "2026-05-02", "2026-05-03"]),
    "plan": ["pro", "free", "pro"],
    "mrr_usd": [99.0, 0.0, 99.0],
})

table_ref = "myproject.crm.new_signups"

job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
    # Let BigQuery infer schema from the DataFrame
    autodetect=True,
)

job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
job.result()  # Wait for load job to complete

print(f"Loaded {job.output_rows} rows into {table_ref}")

Schema Definition with SchemaField

Always define schemas explicitly for production tables. Never rely on autodetect=True in pipelines where schema drift can cause silent data quality issues.

Python
from google.cloud import bigquery

# Explicit schema definition
schema = [
    bigquery.SchemaField("order_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("customer_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("order_date", "DATE", mode="REQUIRED"),
    bigquery.SchemaField("status", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("total_amount", "NUMERIC", mode="REQUIRED"),
    bigquery.SchemaField("currency_code", "STRING", mode="REQUIRED"),
    # RECORD = STRUCT
    bigquery.SchemaField(
        "shipping_address",
        "RECORD",
        mode="NULLABLE",
        fields=[
            bigquery.SchemaField("street", "STRING"),
            bigquery.SchemaField("city", "STRING"),
            bigquery.SchemaField("country_code", "STRING"),
            bigquery.SchemaField("postal_code", "STRING"),
        ],
    ),
    # REPEATED = ARRAY
    bigquery.SchemaField("tags", "STRING", mode="REPEATED"),
]

job_config = bigquery.LoadJobConfig(
    schema=schema,
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
)

with open("orders.ndjson", "rb") as f:
    job = client.load_table_from_file(
        f,
        "myproject.sales.orders",
        job_config=job_config,
    )
job.result()

Write Dispositions

| Disposition | Constant | Behaviour | |---|---|---| | Truncate and replace | WRITE_TRUNCATE | Delete all existing rows, then load new data | | Append | WRITE_APPEND | Add new rows to existing data | | Fail if not empty | WRITE_EMPTY | Raise error if table has rows — safe for initial loads |

Python
# Full refresh (idempotent daily load)
job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    schema=schema,
)

# Incremental append (streaming new events)
job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
    schema=schema,
)

# Initial load guard (fail if table already has data)
job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_EMPTY,
    schema=schema,
)

Creating Datasets and Tables Programmatically

Python
from google.cloud import bigquery
from google.api_core.exceptions import Conflict

client = bigquery.Client(project="myproject")

# Create dataset
dataset_id = "myproject.analytics_v2"
dataset = bigquery.Dataset(dataset_id)
dataset.location = "EU"
dataset.default_table_expiration_ms = None  # no auto-expiry
dataset.description = "Analytics V2 — migrated 2026-05"

try:
    dataset = client.create_dataset(dataset, timeout=30)
    print(f"Created dataset {dataset.dataset_id}")
except Conflict:
    print("Dataset already exists — skipping creation")

# Create table with partitioning and clustering
table_id = "myproject.analytics_v2.events"
schema = [
    bigquery.SchemaField("event_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("event_date", "DATE", mode="REQUIRED"),
    bigquery.SchemaField("user_id", "STRING"),
    bigquery.SchemaField("event_type", "STRING"),
    bigquery.SchemaField("country", "STRING"),
    bigquery.SchemaField("properties", "JSON"),
]

table = bigquery.Table(table_id, schema=schema)

table.time_partitioning = bigquery.TimePartitioning(
    type_=bigquery.TimePartitioningType.DAY,
    field="event_date",
    expiration_ms=365 * 24 * 60 * 60 * 1000,  # 1 year
)

table.clustering_fields = ["country", "event_type"]

table.require_partition_filter = True

try:
    table = client.create_table(table)
    print(f"Created table {table.table_id}")
except Conflict:
    print("Table already exists — skipping creation")

Cost Estimation with Dry Run

Before running an expensive query in production, estimate the bytes scanned using dry_run=True. The job is validated and costed but not executed.

Python
from google.cloud import bigquery

client = bigquery.Client(project="myproject")

query = """
    SELECT *
    FROM `myproject.analytics.events`
    WHERE event_date BETWEEN '2025-01-01' AND '2026-05-07'
"""

job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)

dry_run_job = client.query(query, job_config=job_config)

bytes_processed = dry_run_job.total_bytes_processed
cost_usd = 6.25 * bytes_processed / (1024 ** 4)  # $6.25 per TB on-demand

print(f"Bytes processed:  {bytes_processed:,}")
print(f"Data scanned:     {bytes_processed / (1024**3):.2f} GB")
print(f"Estimated cost:   ${cost_usd:.4f}")

# Guard: abort if cost is above threshold
MAX_COST_USD = 1.00
if cost_usd > MAX_COST_USD:
    raise ValueError(f"Query would cost ~${cost_usd:.2f} — exceeds ${MAX_COST_USD} limit")

Query Parameters: Safe Parameterized SQL

Never use Python string formatting to inject user input into SQL. Use BigQuery's typed query parameters instead.

Python
from google.cloud import bigquery

client = bigquery.Client(project="myproject")

# Named parameters (recommended for readability)
query = """
    SELECT
        user_id,
        email,
        signup_date
    FROM `myproject.crm.users`
    WHERE country_code = @country
      AND signup_date BETWEEN @start_date AND @end_date
      AND plan IN UNNEST(@plans)
    ORDER BY signup_date DESC
    LIMIT @row_limit
"""

job_config = bigquery.QueryJobConfig(
    query_parameters=[
        bigquery.ScalarQueryParameter("country", "STRING", "DE"),
        bigquery.ScalarQueryParameter("start_date", "DATE", "2026-01-01"),
        bigquery.ScalarQueryParameter("end_date", "DATE", "2026-05-07"),
        bigquery.ArrayQueryParameter("plans", "STRING", ["pro", "enterprise"]),
        bigquery.ScalarQueryParameter("row_limit", "INT64", 500),
    ]
)

df = client.query(query, job_config=job_config).to_dataframe()
print(f"Found {len(df)} users")

Positional Parameters

Python
query = "SELECT * FROM `myproject.crm.users` WHERE country_code = ? AND plan = ?"

job_config = bigquery.QueryJobConfig(
    query_parameters=[
        bigquery.ScalarQueryParameter(None, "STRING", "DE"),
        bigquery.ScalarQueryParameter(None, "STRING", "pro"),
    ]
)

BigQuery Storage API for Fast Reads

The default to_dataframe() uses the REST API — fine for small results but slow for millions of rows. The Storage API uses gRPC streaming and Apache Arrow, dramatically increasing throughput.

Python
# Requires: pip install google-cloud-bigquery-storage
from google.cloud import bigquery
from google.cloud.bigquery_storage import BigQueryReadClient

client = bigquery.Client(project="myproject")

query = """
    SELECT user_id, event_date, event_type, country
    FROM `myproject.analytics.events`
    WHERE event_date = '2026-05-01'
"""

# Pass bqstorage_client to enable the Storage API path
bqstorage_client = BigQueryReadClient()

df = (
    client.query(query)
    .result()
    .to_dataframe(bqstorage_api=True, bqstorage_client=bqstorage_client)
)

print(f"Loaded {len(df):,} rows via Storage API")

For even larger reads, use read_gbq from pandas_gbq:

Python
import pandas_gbq

df = pandas_gbq.read_gbq(
    "SELECT * FROM `myproject.analytics.events` WHERE event_date = '2026-05-01'",
    project_id="myproject",
    use_bqstorage_api=True,
    progress_bar_type="tqdm",
)

Airflow: BigQueryOperator and BigQueryHook

BigQueryInsertJobOperator (modern approach)

Python
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

refresh_daily_summary = BigQueryInsertJobOperator(
    task_id="refresh_daily_summary",
    configuration={
        "query": {
            "query": """
                DELETE FROM `myproject.analytics.daily_summary`
                WHERE summary_date = DATE('{{ ds }}');

                INSERT INTO `myproject.analytics.daily_summary`
                SELECT
                    DATE('{{ ds }}') AS summary_date,
                    region,
                    COUNT(*) AS orders,
                    SUM(total_amount) AS revenue
                FROM `myproject.sales.orders`
                WHERE DATE(created_at) = DATE('{{ ds }}')
                GROUP BY region;
            """,
            "useLegacySql": False,
        }
    },
    gcp_conn_id="google_cloud_default",
    location="EU",
)

BigQueryHook for Custom Python Tasks

Python
from airflow.decorators import task
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook

@task
def load_staging_to_production(ds: str) -> dict:
    hook = BigQueryHook(gcp_conn_id="google_cloud_default", location="EU")
    client = hook.get_client(project_id="myproject")

    query = f"""
        MERGE `myproject.sales.orders` T
        USING `myproject.staging.orders_delta` S
        ON T.order_id = S.order_id
        WHEN MATCHED THEN
            UPDATE SET T.status = S.status, T.updated_at = S.updated_at
        WHEN NOT MATCHED THEN
            INSERT ROW
    """

    job = client.query(query)
    job.result()

    return {
        "job_id": job.job_id,
        "rows_affected": job.num_dml_affected_rows,
        "bytes_processed": job.total_bytes_processed,
    }

Full DAG Example

Python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryInsertJobOperator,
    BigQueryCheckOperator,
)
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)

default_args = {
    "owner": "data-engineering",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "on_failure_callback": lambda ctx: print(f"Task failed: {ctx['task_instance_key_str']}"),
}

with DAG(
    dag_id="daily_orders_pipeline",
    default_args=default_args,
    schedule="0 3 * * *",  # 3 AM UTC daily
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["sales", "bigquery"],
) as dag:

    # 1. Load raw CSV from GCS to staging table
    load_raw = GCSToBigQueryOperator(
        task_id="load_raw_orders",
        bucket="my-data-bucket",
        source_objects=["orders/{{ ds_nodash }}/*.csv"],
        destination_project_dataset_table="myproject.staging.orders_raw",
        source_format="CSV",
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
        autodetect=True,
        gcp_conn_id="google_cloud_default",
    )

    # 2. Data quality check
    check_row_count = BigQueryCheckOperator(
        task_id="check_row_count",
        sql="""
            SELECT COUNT(*) > 0
            FROM `myproject.staging.orders_raw`
        """,
        use_legacy_sql=False,
        gcp_conn_id="google_cloud_default",
    )

    # 3. Transform and load to production
    transform_load = BigQueryInsertJobOperator(
        task_id="transform_to_production",
        configuration={
            "query": {
                "query": """
                    INSERT INTO `myproject.sales.orders`
                    SELECT
                        order_id,
                        TRIM(customer_id) AS customer_id,
                        DATE(order_date) AS order_date,
                        UPPER(TRIM(status)) AS status,
                        SAFE_CAST(total_amount AS NUMERIC) AS total_amount,
                        CURRENT_TIMESTAMP() AS loaded_at
                    FROM `myproject.staging.orders_raw`
                    WHERE SAFE_CAST(total_amount AS NUMERIC) IS NOT NULL
                """,
                "useLegacySql": False,
            }
        },
        gcp_conn_id="google_cloud_default",
    )

    load_raw >> check_row_count >> transform_load

dbt with BigQuery Adapter

Installation and Profile Configuration

Bash
pip install dbt-bigquery

~/.dbt/profiles.yml:

YAML
myproject:
  target: dev
  outputs:
    dev:
      type: bigquery
      method: oauth               # Uses ADC (gcloud auth application-default login)
      project: myproject
      dataset: dbt_dev            # dbt writes models here in dev
      location: EU
      threads: 4
      timeout_seconds: 300
      priority: interactive

    prod:
      type: bigquery
      method: service-account
      project: myproject
      dataset: analytics          # production dataset
      keyfile: /secrets/sa-dbt.json
      location: EU
      threads: 8
      timeout_seconds: 600
      priority: batch
      # Optional: flat-rate billing via reservations
      # job_creation_timeout_seconds: 10
      # job_retry_deadline_seconds: 600

dbt Model: Incremental (Partitioned Table)

models/marts/sales/fct_daily_orders.sql:

SQL
{{
    config(
        materialized = 'incremental',
        incremental_strategy = 'insert_overwrite',
        partition_by = {
            "field": "order_date",
            "data_type": "date",
            "granularity": "day"
        },
        cluster_by = ["region", "channel"],
        require_partition_filter = true,
        tags = ["daily", "sales"]
    )
}}

SELECT
    DATE(o.created_at) AS order_date,
    o.order_id,
    o.customer_id,
    c.region,
    c.channel,
    o.status,
    o.total_amount,
    o.currency_code,
    CURRENT_TIMESTAMP() AS dbt_loaded_at
FROM {{ source('raw', 'orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c USING (customer_id)

{% if is_incremental() %}
    -- Only process new data during incremental runs
    WHERE DATE(o.created_at) >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 DAY)
{% endif %}

Running dbt

Bash
# Run all models
dbt run --target prod

# Run only the sales mart
dbt run --select marts/sales --target prod

# Run with full refresh (WRITE_TRUNCATE)
dbt run --full-refresh --select fct_daily_orders --target prod

# Test data quality
dbt test --select fct_daily_orders

# Generate and serve docs
dbt docs generate && dbt docs serve

Complete ETL: API to BigQuery with Schema Enforcement

This end-to-end example pulls exchange rate data from a public API, transforms it with pandas, enforces a schema, and loads it to BigQuery with idempotency.

Python
"""
ETL: Exchange Rates API → pandas → BigQuery
Idempotent: safe to run multiple times for the same date.
"""

import logging
from datetime import date
from typing import Any

import pandas as pd
import requests
from google.cloud import bigquery
from google.api_core.exceptions import NotFound

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)

# ── Configuration ─────────────────────────────────────────────────────────────

PROJECT = "myproject"
DATASET = "finance"
TABLE = "exchange_rates"
TABLE_REF = f"{PROJECT}.{DATASET}.{TABLE}"

API_URL = "https://api.exchangerate-api.com/v4/latest/USD"

SCHEMA = [
    bigquery.SchemaField("rate_date", "DATE", mode="REQUIRED"),
    bigquery.SchemaField("base_currency", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("target_currency", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("rate", "FLOAT64", mode="REQUIRED"),
    bigquery.SchemaField("loaded_at", "TIMESTAMP", mode="REQUIRED"),
]


# ── Extract ───────────────────────────────────────────────────────────────────

def extract_rates(base: str = "USD") -> dict[str, Any]:
    """Fetch exchange rates from the public API."""
    log.info("Fetching exchange rates for base=%s", base)
    response = requests.get(API_URL, timeout=10)
    response.raise_for_status()
    data = response.json()
    log.info("Received %d rates for date %s", len(data["rates"]), data["date"])
    return data


# ── Transform ─────────────────────────────────────────────────────────────────

def transform_rates(raw: dict[str, Any]) -> pd.DataFrame:
    """Flatten the API response into a tidy DataFrame."""
    rate_date = date.fromisoformat(raw["date"])
    base = raw["base"]

    rows = [
        {
            "rate_date": rate_date,
            "base_currency": base,
            "target_currency": currency,
            "rate": float(rate_value),
            "loaded_at": pd.Timestamp.utcnow(),
        }
        for currency, rate_value in raw["rates"].items()
        if isinstance(rate_value, (int, float))  # skip malformed values
    ]

    df = pd.DataFrame(rows)

    # Enforce types explicitly — never trust to_dataframe to infer correctly
    df["rate_date"] = pd.to_datetime(df["rate_date"]).dt.date
    df["base_currency"] = df["base_currency"].str.upper().str.strip()
    df["target_currency"] = df["target_currency"].str.upper().str.strip()
    df["rate"] = pd.to_numeric(df["rate"], errors="coerce")
    df["loaded_at"] = pd.to_datetime(df["loaded_at"], utc=True)

    # Drop rows where rate could not be parsed
    n_before = len(df)
    df = df.dropna(subset=["rate"])
    if len(df) < n_before:
        log.warning("Dropped %d rows with null rates", n_before - len(df))

    log.info("Transformed %d rate rows for %s", len(df), rate_date)
    return df


# ── Load ──────────────────────────────────────────────────────────────────────

def ensure_table_exists(client: bigquery.Client) -> None:
    """Create the table if it does not exist (idempotent)."""
    try:
        client.get_table(TABLE_REF)
        log.info("Table %s already exists", TABLE_REF)
    except NotFound:
        log.info("Creating table %s", TABLE_REF)
        table = bigquery.Table(TABLE_REF, schema=SCHEMA)
        table.time_partitioning = bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field="rate_date",
        )
        table.clustering_fields = ["base_currency", "target_currency"]
        client.create_table(table)
        log.info("Table created successfully")


def delete_existing_partition(client: bigquery.Client, rate_date: date) -> None:
    """Delete existing rows for the target date to ensure idempotency."""
    delete_query = f"""
        DELETE FROM `{TABLE_REF}`
        WHERE rate_date = DATE('{rate_date.isoformat()}')
    """
    job = client.query(delete_query)
    job.result()
    log.info("Deleted existing rows for %s (affected: %s)", rate_date, job.num_dml_affected_rows)


def load_to_bigquery(client: bigquery.Client, df: pd.DataFrame) -> None:
    """Load DataFrame to BigQuery with schema enforcement."""
    job_config = bigquery.LoadJobConfig(
        schema=SCHEMA,
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
        # Explicitly set parquet as intermediate format for type fidelity
        parquet_options=bigquery.ParquetOptions(enable_list_inference=True),
    )

    job = client.load_table_from_dataframe(df, TABLE_REF, job_config=job_config)
    job.result()
    log.info("Loaded %d rows into %s (job_id=%s)", job.output_rows, TABLE_REF, job.job_id)


# ── Dry-Run Cost Estimate ─────────────────────────────────────────────────────

def estimate_query_cost(client: bigquery.Client, rate_date: date) -> float:
    """Estimate the cost of reading back today's loaded data."""
    query = f"""
        SELECT *
        FROM `{TABLE_REF}`
        WHERE rate_date = '{rate_date.isoformat()}'
    """
    job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
    dry_job = client.query(query, job_config=job_config)
    cost = 6.25 * dry_job.total_bytes_processed / (1024 ** 4)
    log.info("Dry-run estimate for read-back query: $%.6f", cost)
    return cost


# ── Pipeline Entry Point ──────────────────────────────────────────────────────

def run_pipeline() -> None:
    client = bigquery.Client(project=PROJECT)

    # Extract
    raw = extract_rates(base="USD")
    rate_date = date.fromisoformat(raw["date"])

    # Transform
    df = transform_rates(raw)

    if df.empty:
        log.error("No data after transformation — aborting")
        return

    # Load
    ensure_table_exists(client)
    delete_existing_partition(client, rate_date)  # idempotency
    load_to_bigquery(client, df)

    # Optional: estimate read cost for observability
    estimate_query_cost(client, rate_date)

    log.info("Pipeline complete for %s", rate_date)


if __name__ == "__main__":
    run_pipeline()

Running the Pipeline

Bash
# Local dev with ADC
gcloud auth application-default login
python etl_exchange_rates.py

# In a Docker container (GKE / Cloud Run)
# Mount the service account key via Secret Manager or Workload Identity
docker run \
  -e GOOGLE_APPLICATION_CREDENTIALS=/secrets/sa.json \
  -v $(pwd)/secrets:/secrets \
  my-etl-image python etl_exchange_rates.py

Testing the Pipeline

Python
import pytest
import pandas as pd
from unittest.mock import MagicMock, patch
from etl_exchange_rates import transform_rates, extract_rates

def test_transform_rates_shape():
    raw = {
        "date": "2026-05-07",
        "base": "USD",
        "rates": {"EUR": 0.92, "GBP": 0.79, "JPY": 155.3, "BAD": "notanumber"},
    }
    df = transform_rates(raw)
    # "BAD" row should be dropped (coerce fails → NaN → dropped)
    assert len(df) == 3
    assert set(df.columns) == {"rate_date", "base_currency", "target_currency", "rate", "loaded_at"}

def test_transform_rates_types(sample_raw_response):
    df = transform_rates(sample_raw_response)
    assert df["rate"].dtype == float
    assert str(df["loaded_at"].dtype) == "datetime64[ns, UTC]"

@patch("etl_exchange_rates.requests.get")
def test_extract_rates_calls_api(mock_get):
    mock_get.return_value = MagicMock(
        status_code=200,
        json=lambda: {"date": "2026-05-07", "base": "USD", "rates": {"EUR": 0.92}},
    )
    result = extract_rates("USD")
    mock_get.assert_called_once()
    assert result["base"] == "USD"

Summary

Production BigQuery Python pipelines follow a consistent pattern:

  • Authenticate with ADC or Workload Identity — avoid key files in Cloud environments.
  • Always pass an explicit schema to LoadJobConfig — autodetect=True is a liability in production.
  • Use dry_run=True before running large queries to gate on cost.
  • Use named QueryParameter objects for any user-controlled SQL input.
  • Enable the BigQuery Storage API (bqstorage_api=True) for reads exceeding a few million rows.
  • In Airflow, prefer BigQueryInsertJobOperator over the deprecated BigQueryOperator.
  • In dbt, use incremental_strategy = 'insert_overwrite' on partitioned tables for efficient incremental loads.

This completes the BigQuery series: architecture and fundamentals, SQL analytics and scripting, and Python pipeline engineering.

Enjoyed this article?

Explore the Data Engineering learning path for more.

Found this helpful?

Share:š•

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.