BigQuery Python Pipelines: From Raw Data to Production
Build production-grade data pipelines with the BigQuery Python client ā authentication, schema enforcement, cost estimation, Airflow operators, dbt adapter config, and a complete API-to-BigQuery ETL.
Why Python for BigQuery Pipelines?
The BigQuery console and bq CLI are great for exploration. But production pipelines need version-controlled, testable, parameterized code that integrates with orchestration systems. The google-cloud-bigquery Python client is the standard bridge between BigQuery and the rest of your stack.
Installation and Setup
# Core client
pip install google-cloud-bigquery
# DataFrame support (pandas + pyarrow)
pip install "google-cloud-bigquery[pandas]"
# BigQuery Storage API for fast reads (optional but recommended)
pip install google-cloud-bigquery-storage
# Everything at once (recommended for data engineering work)
pip install \
google-cloud-bigquery \
google-cloud-bigquery-storage \
"google-cloud-bigquery[pandas]" \
pyarrow \
db-dtypesAuthentication
BigQuery supports two authentication patterns.
Application Default Credentials (ADC) ā Recommended
ADC uses the environment's credential chain: Workload Identity (GKE), instance service account (Compute Engine), or ~/.config/gcloud/application_default_credentials.json (local dev).
from google.cloud import bigquery
# ADC ā zero config needed in Cloud environments
client = bigquery.Client(project="myproject")Local development setup:
gcloud auth application-default login
# Or, for CI/CD pipelines that don't use Workload Identity:
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account.json"Explicit Service Account Key File
from google.cloud import bigquery
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file(
"/path/to/service-account.json",
scopes=["https://www.googleapis.com/auth/bigquery"],
)
client = bigquery.Client(project="myproject", credentials=credentials)In production on GCP, use Workload Identity or the default service account. Avoid key files ā they are long-lived credentials that are easy to leak. Use key files only as a last resort for services outside GCP.
Running Queries
Basic Query
from google.cloud import bigquery
client = bigquery.Client(project="myproject")
query = """
SELECT
user_id,
COUNT(*) AS session_count,
SUM(revenue) AS total_revenue
FROM `myproject.analytics.sessions`
WHERE session_date = CURRENT_DATE()
GROUP BY user_id
ORDER BY total_revenue DESC
LIMIT 100
"""
# client.query() returns a QueryJob; calling .result() blocks until done
job = client.query(query)
rows = job.result()
for row in rows:
print(f"user={row.user_id} sessions={row.session_count} revenue={row.total_revenue}")QueryJobConfig
QueryJobConfig controls job-level settings: destination table, write disposition, location, timeout, and more.
from google.cloud import bigquery
client = bigquery.Client(project="myproject")
job_config = bigquery.QueryJobConfig(
# Write result to a specific table
destination="myproject.analytics.top_users_today",
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
# Run in a specific location (must match dataset location)
location="EU",
# Job-level timeout
job_timeout_ms=60_000,
# Use a specific reservation slot (if on flat-rate pricing)
# reservation="projects/myproject/locations/eu/reservations/analytics-reservation",
)
job = client.query(query, job_config=job_config)
job.result() # Wait for completion
print(f"Job {job.job_id} wrote {job.num_dml_affected_rows} rows")Reading Results to DataFrame
import pandas as pd
from google.cloud import bigquery
client = bigquery.Client(project="myproject")
query = """
SELECT
event_date,
country,
COUNT(*) AS events,
APPROX_COUNT_DISTINCT(user_id) AS unique_users
FROM `myproject.analytics.events`
WHERE event_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
GROUP BY event_date, country
"""
# to_dataframe() converts BigQuery types to pandas dtypes automatically
df = client.query(query).to_dataframe()
print(df.dtypes)
# event_date dbdate
# country object
# events int64
# unique_users int64
# Specify progress bar for large results
df = client.query(query).to_dataframe(progress_bar_type="tqdm")Loading a DataFrame to BigQuery
import pandas as pd
from google.cloud import bigquery
client = bigquery.Client(project="myproject")
# Sample DataFrame
df = pd.DataFrame({
"user_id": ["u001", "u002", "u003"],
"signup_date": pd.to_datetime(["2026-05-01", "2026-05-02", "2026-05-03"]),
"plan": ["pro", "free", "pro"],
"mrr_usd": [99.0, 0.0, 99.0],
})
table_ref = "myproject.crm.new_signups"
job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
# Let BigQuery infer schema from the DataFrame
autodetect=True,
)
job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
job.result() # Wait for load job to complete
print(f"Loaded {job.output_rows} rows into {table_ref}")Schema Definition with SchemaField
Always define schemas explicitly for production tables. Never rely on autodetect=True in pipelines where schema drift can cause silent data quality issues.
from google.cloud import bigquery
# Explicit schema definition
schema = [
bigquery.SchemaField("order_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("customer_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("order_date", "DATE", mode="REQUIRED"),
bigquery.SchemaField("status", "STRING", mode="NULLABLE"),
bigquery.SchemaField("total_amount", "NUMERIC", mode="REQUIRED"),
bigquery.SchemaField("currency_code", "STRING", mode="REQUIRED"),
# RECORD = STRUCT
bigquery.SchemaField(
"shipping_address",
"RECORD",
mode="NULLABLE",
fields=[
bigquery.SchemaField("street", "STRING"),
bigquery.SchemaField("city", "STRING"),
bigquery.SchemaField("country_code", "STRING"),
bigquery.SchemaField("postal_code", "STRING"),
],
),
# REPEATED = ARRAY
bigquery.SchemaField("tags", "STRING", mode="REPEATED"),
]
job_config = bigquery.LoadJobConfig(
schema=schema,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
)
with open("orders.ndjson", "rb") as f:
job = client.load_table_from_file(
f,
"myproject.sales.orders",
job_config=job_config,
)
job.result()Write Dispositions
| Disposition | Constant | Behaviour |
|---|---|---|
| Truncate and replace | WRITE_TRUNCATE | Delete all existing rows, then load new data |
| Append | WRITE_APPEND | Add new rows to existing data |
| Fail if not empty | WRITE_EMPTY | Raise error if table has rows ā safe for initial loads |
# Full refresh (idempotent daily load)
job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
schema=schema,
)
# Incremental append (streaming new events)
job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
schema=schema,
)
# Initial load guard (fail if table already has data)
job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_EMPTY,
schema=schema,
)Creating Datasets and Tables Programmatically
from google.cloud import bigquery
from google.api_core.exceptions import Conflict
client = bigquery.Client(project="myproject")
# Create dataset
dataset_id = "myproject.analytics_v2"
dataset = bigquery.Dataset(dataset_id)
dataset.location = "EU"
dataset.default_table_expiration_ms = None # no auto-expiry
dataset.description = "Analytics V2 ā migrated 2026-05"
try:
dataset = client.create_dataset(dataset, timeout=30)
print(f"Created dataset {dataset.dataset_id}")
except Conflict:
print("Dataset already exists ā skipping creation")
# Create table with partitioning and clustering
table_id = "myproject.analytics_v2.events"
schema = [
bigquery.SchemaField("event_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("event_date", "DATE", mode="REQUIRED"),
bigquery.SchemaField("user_id", "STRING"),
bigquery.SchemaField("event_type", "STRING"),
bigquery.SchemaField("country", "STRING"),
bigquery.SchemaField("properties", "JSON"),
]
table = bigquery.Table(table_id, schema=schema)
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="event_date",
expiration_ms=365 * 24 * 60 * 60 * 1000, # 1 year
)
table.clustering_fields = ["country", "event_type"]
table.require_partition_filter = True
try:
table = client.create_table(table)
print(f"Created table {table.table_id}")
except Conflict:
print("Table already exists ā skipping creation")Cost Estimation with Dry Run
Before running an expensive query in production, estimate the bytes scanned using dry_run=True. The job is validated and costed but not executed.
from google.cloud import bigquery
client = bigquery.Client(project="myproject")
query = """
SELECT *
FROM `myproject.analytics.events`
WHERE event_date BETWEEN '2025-01-01' AND '2026-05-07'
"""
job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
dry_run_job = client.query(query, job_config=job_config)
bytes_processed = dry_run_job.total_bytes_processed
cost_usd = 6.25 * bytes_processed / (1024 ** 4) # $6.25 per TB on-demand
print(f"Bytes processed: {bytes_processed:,}")
print(f"Data scanned: {bytes_processed / (1024**3):.2f} GB")
print(f"Estimated cost: ${cost_usd:.4f}")
# Guard: abort if cost is above threshold
MAX_COST_USD = 1.00
if cost_usd > MAX_COST_USD:
raise ValueError(f"Query would cost ~${cost_usd:.2f} ā exceeds ${MAX_COST_USD} limit")Query Parameters: Safe Parameterized SQL
Never use Python string formatting to inject user input into SQL. Use BigQuery's typed query parameters instead.
from google.cloud import bigquery
client = bigquery.Client(project="myproject")
# Named parameters (recommended for readability)
query = """
SELECT
user_id,
email,
signup_date
FROM `myproject.crm.users`
WHERE country_code = @country
AND signup_date BETWEEN @start_date AND @end_date
AND plan IN UNNEST(@plans)
ORDER BY signup_date DESC
LIMIT @row_limit
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("country", "STRING", "DE"),
bigquery.ScalarQueryParameter("start_date", "DATE", "2026-01-01"),
bigquery.ScalarQueryParameter("end_date", "DATE", "2026-05-07"),
bigquery.ArrayQueryParameter("plans", "STRING", ["pro", "enterprise"]),
bigquery.ScalarQueryParameter("row_limit", "INT64", 500),
]
)
df = client.query(query, job_config=job_config).to_dataframe()
print(f"Found {len(df)} users")Positional Parameters
query = "SELECT * FROM `myproject.crm.users` WHERE country_code = ? AND plan = ?"
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter(None, "STRING", "DE"),
bigquery.ScalarQueryParameter(None, "STRING", "pro"),
]
)BigQuery Storage API for Fast Reads
The default to_dataframe() uses the REST API ā fine for small results but slow for millions of rows. The Storage API uses gRPC streaming and Apache Arrow, dramatically increasing throughput.
# Requires: pip install google-cloud-bigquery-storage
from google.cloud import bigquery
from google.cloud.bigquery_storage import BigQueryReadClient
client = bigquery.Client(project="myproject")
query = """
SELECT user_id, event_date, event_type, country
FROM `myproject.analytics.events`
WHERE event_date = '2026-05-01'
"""
# Pass bqstorage_client to enable the Storage API path
bqstorage_client = BigQueryReadClient()
df = (
client.query(query)
.result()
.to_dataframe(bqstorage_api=True, bqstorage_client=bqstorage_client)
)
print(f"Loaded {len(df):,} rows via Storage API")For even larger reads, use read_gbq from pandas_gbq:
import pandas_gbq
df = pandas_gbq.read_gbq(
"SELECT * FROM `myproject.analytics.events` WHERE event_date = '2026-05-01'",
project_id="myproject",
use_bqstorage_api=True,
progress_bar_type="tqdm",
)Airflow: BigQueryOperator and BigQueryHook
BigQueryInsertJobOperator (modern approach)
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
refresh_daily_summary = BigQueryInsertJobOperator(
task_id="refresh_daily_summary",
configuration={
"query": {
"query": """
DELETE FROM `myproject.analytics.daily_summary`
WHERE summary_date = DATE('{{ ds }}');
INSERT INTO `myproject.analytics.daily_summary`
SELECT
DATE('{{ ds }}') AS summary_date,
region,
COUNT(*) AS orders,
SUM(total_amount) AS revenue
FROM `myproject.sales.orders`
WHERE DATE(created_at) = DATE('{{ ds }}')
GROUP BY region;
""",
"useLegacySql": False,
}
},
gcp_conn_id="google_cloud_default",
location="EU",
)BigQueryHook for Custom Python Tasks
from airflow.decorators import task
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
@task
def load_staging_to_production(ds: str) -> dict:
hook = BigQueryHook(gcp_conn_id="google_cloud_default", location="EU")
client = hook.get_client(project_id="myproject")
query = f"""
MERGE `myproject.sales.orders` T
USING `myproject.staging.orders_delta` S
ON T.order_id = S.order_id
WHEN MATCHED THEN
UPDATE SET T.status = S.status, T.updated_at = S.updated_at
WHEN NOT MATCHED THEN
INSERT ROW
"""
job = client.query(query)
job.result()
return {
"job_id": job.job_id,
"rows_affected": job.num_dml_affected_rows,
"bytes_processed": job.total_bytes_processed,
}Full DAG Example
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryInsertJobOperator,
BigQueryCheckOperator,
)
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
GCSToBigQueryOperator,
)
default_args = {
"owner": "data-engineering",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"on_failure_callback": lambda ctx: print(f"Task failed: {ctx['task_instance_key_str']}"),
}
with DAG(
dag_id="daily_orders_pipeline",
default_args=default_args,
schedule="0 3 * * *", # 3 AM UTC daily
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["sales", "bigquery"],
) as dag:
# 1. Load raw CSV from GCS to staging table
load_raw = GCSToBigQueryOperator(
task_id="load_raw_orders",
bucket="my-data-bucket",
source_objects=["orders/{{ ds_nodash }}/*.csv"],
destination_project_dataset_table="myproject.staging.orders_raw",
source_format="CSV",
skip_leading_rows=1,
write_disposition="WRITE_TRUNCATE",
autodetect=True,
gcp_conn_id="google_cloud_default",
)
# 2. Data quality check
check_row_count = BigQueryCheckOperator(
task_id="check_row_count",
sql="""
SELECT COUNT(*) > 0
FROM `myproject.staging.orders_raw`
""",
use_legacy_sql=False,
gcp_conn_id="google_cloud_default",
)
# 3. Transform and load to production
transform_load = BigQueryInsertJobOperator(
task_id="transform_to_production",
configuration={
"query": {
"query": """
INSERT INTO `myproject.sales.orders`
SELECT
order_id,
TRIM(customer_id) AS customer_id,
DATE(order_date) AS order_date,
UPPER(TRIM(status)) AS status,
SAFE_CAST(total_amount AS NUMERIC) AS total_amount,
CURRENT_TIMESTAMP() AS loaded_at
FROM `myproject.staging.orders_raw`
WHERE SAFE_CAST(total_amount AS NUMERIC) IS NOT NULL
""",
"useLegacySql": False,
}
},
gcp_conn_id="google_cloud_default",
)
load_raw >> check_row_count >> transform_loaddbt with BigQuery Adapter
Installation and Profile Configuration
pip install dbt-bigquery~/.dbt/profiles.yml:
myproject:
target: dev
outputs:
dev:
type: bigquery
method: oauth # Uses ADC (gcloud auth application-default login)
project: myproject
dataset: dbt_dev # dbt writes models here in dev
location: EU
threads: 4
timeout_seconds: 300
priority: interactive
prod:
type: bigquery
method: service-account
project: myproject
dataset: analytics # production dataset
keyfile: /secrets/sa-dbt.json
location: EU
threads: 8
timeout_seconds: 600
priority: batch
# Optional: flat-rate billing via reservations
# job_creation_timeout_seconds: 10
# job_retry_deadline_seconds: 600dbt Model: Incremental (Partitioned Table)
models/marts/sales/fct_daily_orders.sql:
{{
config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
partition_by = {
"field": "order_date",
"data_type": "date",
"granularity": "day"
},
cluster_by = ["region", "channel"],
require_partition_filter = true,
tags = ["daily", "sales"]
)
}}
SELECT
DATE(o.created_at) AS order_date,
o.order_id,
o.customer_id,
c.region,
c.channel,
o.status,
o.total_amount,
o.currency_code,
CURRENT_TIMESTAMP() AS dbt_loaded_at
FROM {{ source('raw', 'orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c USING (customer_id)
{% if is_incremental() %}
-- Only process new data during incremental runs
WHERE DATE(o.created_at) >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 DAY)
{% endif %}Running dbt
# Run all models
dbt run --target prod
# Run only the sales mart
dbt run --select marts/sales --target prod
# Run with full refresh (WRITE_TRUNCATE)
dbt run --full-refresh --select fct_daily_orders --target prod
# Test data quality
dbt test --select fct_daily_orders
# Generate and serve docs
dbt docs generate && dbt docs serveComplete ETL: API to BigQuery with Schema Enforcement
This end-to-end example pulls exchange rate data from a public API, transforms it with pandas, enforces a schema, and loads it to BigQuery with idempotency.
"""
ETL: Exchange Rates API ā pandas ā BigQuery
Idempotent: safe to run multiple times for the same date.
"""
import logging
from datetime import date
from typing import Any
import pandas as pd
import requests
from google.cloud import bigquery
from google.api_core.exceptions import NotFound
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)
# āā Configuration āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
PROJECT = "myproject"
DATASET = "finance"
TABLE = "exchange_rates"
TABLE_REF = f"{PROJECT}.{DATASET}.{TABLE}"
API_URL = "https://api.exchangerate-api.com/v4/latest/USD"
SCHEMA = [
bigquery.SchemaField("rate_date", "DATE", mode="REQUIRED"),
bigquery.SchemaField("base_currency", "STRING", mode="REQUIRED"),
bigquery.SchemaField("target_currency", "STRING", mode="REQUIRED"),
bigquery.SchemaField("rate", "FLOAT64", mode="REQUIRED"),
bigquery.SchemaField("loaded_at", "TIMESTAMP", mode="REQUIRED"),
]
# āā Extract āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
def extract_rates(base: str = "USD") -> dict[str, Any]:
"""Fetch exchange rates from the public API."""
log.info("Fetching exchange rates for base=%s", base)
response = requests.get(API_URL, timeout=10)
response.raise_for_status()
data = response.json()
log.info("Received %d rates for date %s", len(data["rates"]), data["date"])
return data
# āā Transform āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
def transform_rates(raw: dict[str, Any]) -> pd.DataFrame:
"""Flatten the API response into a tidy DataFrame."""
rate_date = date.fromisoformat(raw["date"])
base = raw["base"]
rows = [
{
"rate_date": rate_date,
"base_currency": base,
"target_currency": currency,
"rate": float(rate_value),
"loaded_at": pd.Timestamp.utcnow(),
}
for currency, rate_value in raw["rates"].items()
if isinstance(rate_value, (int, float)) # skip malformed values
]
df = pd.DataFrame(rows)
# Enforce types explicitly ā never trust to_dataframe to infer correctly
df["rate_date"] = pd.to_datetime(df["rate_date"]).dt.date
df["base_currency"] = df["base_currency"].str.upper().str.strip()
df["target_currency"] = df["target_currency"].str.upper().str.strip()
df["rate"] = pd.to_numeric(df["rate"], errors="coerce")
df["loaded_at"] = pd.to_datetime(df["loaded_at"], utc=True)
# Drop rows where rate could not be parsed
n_before = len(df)
df = df.dropna(subset=["rate"])
if len(df) < n_before:
log.warning("Dropped %d rows with null rates", n_before - len(df))
log.info("Transformed %d rate rows for %s", len(df), rate_date)
return df
# āā Load āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
def ensure_table_exists(client: bigquery.Client) -> None:
"""Create the table if it does not exist (idempotent)."""
try:
client.get_table(TABLE_REF)
log.info("Table %s already exists", TABLE_REF)
except NotFound:
log.info("Creating table %s", TABLE_REF)
table = bigquery.Table(TABLE_REF, schema=SCHEMA)
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="rate_date",
)
table.clustering_fields = ["base_currency", "target_currency"]
client.create_table(table)
log.info("Table created successfully")
def delete_existing_partition(client: bigquery.Client, rate_date: date) -> None:
"""Delete existing rows for the target date to ensure idempotency."""
delete_query = f"""
DELETE FROM `{TABLE_REF}`
WHERE rate_date = DATE('{rate_date.isoformat()}')
"""
job = client.query(delete_query)
job.result()
log.info("Deleted existing rows for %s (affected: %s)", rate_date, job.num_dml_affected_rows)
def load_to_bigquery(client: bigquery.Client, df: pd.DataFrame) -> None:
"""Load DataFrame to BigQuery with schema enforcement."""
job_config = bigquery.LoadJobConfig(
schema=SCHEMA,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
# Explicitly set parquet as intermediate format for type fidelity
parquet_options=bigquery.ParquetOptions(enable_list_inference=True),
)
job = client.load_table_from_dataframe(df, TABLE_REF, job_config=job_config)
job.result()
log.info("Loaded %d rows into %s (job_id=%s)", job.output_rows, TABLE_REF, job.job_id)
# āā Dry-Run Cost Estimate āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
def estimate_query_cost(client: bigquery.Client, rate_date: date) -> float:
"""Estimate the cost of reading back today's loaded data."""
query = f"""
SELECT *
FROM `{TABLE_REF}`
WHERE rate_date = '{rate_date.isoformat()}'
"""
job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
dry_job = client.query(query, job_config=job_config)
cost = 6.25 * dry_job.total_bytes_processed / (1024 ** 4)
log.info("Dry-run estimate for read-back query: $%.6f", cost)
return cost
# āā Pipeline Entry Point āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
def run_pipeline() -> None:
client = bigquery.Client(project=PROJECT)
# Extract
raw = extract_rates(base="USD")
rate_date = date.fromisoformat(raw["date"])
# Transform
df = transform_rates(raw)
if df.empty:
log.error("No data after transformation ā aborting")
return
# Load
ensure_table_exists(client)
delete_existing_partition(client, rate_date) # idempotency
load_to_bigquery(client, df)
# Optional: estimate read cost for observability
estimate_query_cost(client, rate_date)
log.info("Pipeline complete for %s", rate_date)
if __name__ == "__main__":
run_pipeline()Running the Pipeline
# Local dev with ADC
gcloud auth application-default login
python etl_exchange_rates.py
# In a Docker container (GKE / Cloud Run)
# Mount the service account key via Secret Manager or Workload Identity
docker run \
-e GOOGLE_APPLICATION_CREDENTIALS=/secrets/sa.json \
-v $(pwd)/secrets:/secrets \
my-etl-image python etl_exchange_rates.pyTesting the Pipeline
import pytest
import pandas as pd
from unittest.mock import MagicMock, patch
from etl_exchange_rates import transform_rates, extract_rates
def test_transform_rates_shape():
raw = {
"date": "2026-05-07",
"base": "USD",
"rates": {"EUR": 0.92, "GBP": 0.79, "JPY": 155.3, "BAD": "notanumber"},
}
df = transform_rates(raw)
# "BAD" row should be dropped (coerce fails ā NaN ā dropped)
assert len(df) == 3
assert set(df.columns) == {"rate_date", "base_currency", "target_currency", "rate", "loaded_at"}
def test_transform_rates_types(sample_raw_response):
df = transform_rates(sample_raw_response)
assert df["rate"].dtype == float
assert str(df["loaded_at"].dtype) == "datetime64[ns, UTC]"
@patch("etl_exchange_rates.requests.get")
def test_extract_rates_calls_api(mock_get):
mock_get.return_value = MagicMock(
status_code=200,
json=lambda: {"date": "2026-05-07", "base": "USD", "rates": {"EUR": 0.92}},
)
result = extract_rates("USD")
mock_get.assert_called_once()
assert result["base"] == "USD"Summary
Production BigQuery Python pipelines follow a consistent pattern:
- Authenticate with ADC or Workload Identity ā avoid key files in Cloud environments.
- Always pass an explicit schema to
LoadJobConfigāautodetect=Trueis a liability in production. - Use
dry_run=Truebefore running large queries to gate on cost. - Use named
QueryParameterobjects for any user-controlled SQL input. - Enable the BigQuery Storage API (
bqstorage_api=True) for reads exceeding a few million rows. - In Airflow, prefer
BigQueryInsertJobOperatorover the deprecatedBigQueryOperator. - In dbt, use
incremental_strategy = 'insert_overwrite'on partitioned tables for efficient incremental loads.
This completes the BigQuery series: architecture and fundamentals, SQL analytics and scripting, and Python pipeline engineering.
Enjoyed this article?
Explore the Data Engineering learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.