Python Logging & Observability: Structured Logs for Production
Set up the Python logging module correctly, add structured JSON logging, track errors to Sentry, and build the observability layer every production pipeline needs.
Why Logging Is Not Optional
print() statements tell you what happened locally. Logs tell you what happened in production, at 3am, while you were asleep. Every pipeline, API, and automation script needs proper logging before it touches real data.
1. The logging Module
Python's logging module has five built-in levels:
| Level | Value | When to use |
|-------|-------|-------------|
| DEBUG | 10 | Detailed diagnostic info, dev only |
| INFO | 20 | Normal operation, key events |
| WARNING | 30 | Something unexpected, but continuing |
| ERROR | 40 | A function failed, handled gracefully |
| CRITICAL | 50 | Fatal error, process will stop |
2. Quick Setup (Don't Do This in Libraries)
import logging
# configure root logger — only in scripts / main entry point
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
logger.debug("detailed info: %s", some_var)
logger.info("user %s logged in", user_id)
logger.warning("rate limit at %d%%", pct)
logger.error("failed to process order %s", order_id)
logger.critical("database unreachable — aborting")Always get a logger with __name__ — it creates a logger named after your module, giving you fine-grained control.
Never call logging.basicConfig in library code — it's the caller's job to configure logging. Libraries only get a logger and call methods on it.
3. Logger Hierarchy
Python loggers form a tree: root → myapp → myapp.pipeline → myapp.pipeline.csv.
# These are separate loggers that inherit from their parents
root_logger = logging.getLogger() # root
app_logger = logging.getLogger("myapp")
pipe_logger = logging.getLogger("myapp.pipeline")
csv_logger = logging.getLogger("myapp.pipeline.csv")
# Setting level on parent affects children
logging.getLogger("myapp").setLevel(logging.WARNING)
# myapp.pipeline and myapp.pipeline.csv are now also WARNING4. Handlers and Formatters
Handlers determine where logs go. Formatters control how they look.
import logging
import logging.handlers
from pathlib import Path
def setup_logging(log_dir: Path, level: int = logging.INFO) -> None:
log_dir.mkdir(parents=True, exist_ok=True)
formatter = logging.Formatter(
fmt="%(asctime)s | %(levelname)-8s | %(name)s:%(lineno)d | %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
# console handler
console = logging.StreamHandler()
console.setLevel(level)
console.setFormatter(formatter)
# rotating file handler — 5 files of 10MB each
file_handler = logging.handlers.RotatingFileHandler(
log_dir / "app.log",
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=5,
encoding="utf-8",
)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
# error-only file handler
error_handler = logging.handlers.RotatingFileHandler(
log_dir / "errors.log",
maxBytes=5 * 1024 * 1024,
backupCount=3,
)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(formatter)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(console)
root.addHandler(file_handler)
root.addHandler(error_handler)5. Logging Exceptions
logger = logging.getLogger(__name__)
try:
result = int(user_input)
except ValueError:
logger.exception("Failed to parse user input %r", user_input)
# .exception() logs at ERROR level AND includes the full traceback
# or manually attach exception info
try:
process_order(order)
except OrderError as e:
logger.error("Order processing failed for %s: %s", order.id, e, exc_info=True)6. LoggerAdapter — Attach Context to Every Log
import logging
from typing import Any
class PipelineLogger(logging.LoggerAdapter):
def process(self, msg: str, kwargs: Any) -> tuple[str, Any]:
run_id = self.extra.get("run_id", "unknown")
step = self.extra.get("step", "unknown")
return f"[run={run_id}] [step={step}] {msg}", kwargs
base_logger = logging.getLogger("pipeline")
logger = PipelineLogger(base_logger, {"run_id": "abc123", "step": "ingest"})
logger.info("Processing %d records", 5000)
# [run=abc123] [step=ingest] Processing 5000 records7. Structured JSON Logging with structlog
For production systems, JSON logs are machine-parseable by tools like Datadog, CloudWatch, and Grafana Loki.
pip install structlogSetup
import logging
import structlog
def configure_structlog(json: bool = True) -> None:
shared_processors = [
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.stdlib.add_logger_name,
]
if json:
processors = shared_processors + [
structlog.processors.dict_tracebacks,
structlog.processors.JSONRenderer(),
]
else:
processors = shared_processors + [
structlog.dev.ConsoleRenderer(colors=True),
]
structlog.configure(
processors=processors,
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
)Using structlog
import structlog
logger = structlog.get_logger()
# bind context that appears in every log from this logger
log = logger.bind(component="ingestion", pipeline_id="run-42")
log.info("started", source="s3://bucket/data.csv")
log.info("processed", records=1250, duration_ms=430)
log.error("failed", error="connection timeout", retries=3)Output (JSON):
{"timestamp": "2026-05-07T10:23:45Z", "level": "info", "event": "processed", "component": "ingestion", "pipeline_id": "run-42", "records": 1250, "duration_ms": 430}Bind request context (FastAPI / web apps)
import structlog
from fastapi import Request
async def log_requests_middleware(request: Request, call_next):
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
request_id=request.headers.get("x-request-id", "none"),
path=request.url.path,
method=request.method,
)
response = await call_next(request)
return response8. Performance Logging
Log timing around expensive operations:
import time
import logging
from contextlib import contextmanager
from typing import Generator
logger = logging.getLogger(__name__)
@contextmanager
def log_duration(operation: str, **context: object) -> Generator[None, None, None]:
start = time.perf_counter()
logger.debug("Starting %s", operation, extra=context)
try:
yield
finally:
elapsed = time.perf_counter() - start
logger.info(
"Completed %s in %.3fs",
operation,
elapsed,
extra={"duration_ms": round(elapsed * 1000)},
)
# usage
with log_duration("database_query", table="orders", filters=3):
results = db.query(...)9. Level Control from Environment
import logging
import os
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(level=getattr(logging, LOG_LEVEL, logging.INFO))LOG_LEVEL=DEBUG python pipeline.py # verbose
LOG_LEVEL=WARNING python pipeline.py # quiet10. Silence Noisy Third-Party Loggers
# after basicConfig, quiet down noisy libraries
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("sqlalchemy.engine").setLevel(logging.WARNING)11. Complete Production Logging Setup
# config/logging.py
from __future__ import annotations
import logging
import logging.handlers
import os
from pathlib import Path
def setup(log_dir: Path | None = None, json_output: bool = False) -> None:
level_name = os.getenv("LOG_LEVEL", "INFO").upper()
level = getattr(logging, level_name, logging.INFO)
if json_output:
try:
import structlog
_setup_structlog(level)
return
except ImportError:
pass
fmt = "%(asctime)s | %(levelname)-8s | %(name)s:%(lineno)d | %(message)s"
handlers: list[logging.Handler] = [logging.StreamHandler()]
if log_dir:
log_dir.mkdir(parents=True, exist_ok=True)
handlers.append(
logging.handlers.RotatingFileHandler(
log_dir / "app.log",
maxBytes=10_485_760,
backupCount=5,
)
)
logging.basicConfig(level=level, format=fmt, handlers=handlers)
# silence noisy libs
for noisy in ("urllib3", "httpx", "boto3", "botocore"):
logging.getLogger(noisy).setLevel(logging.WARNING)
def _setup_structlog(level: int) -> None:
import structlog
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(level),
logger_factory=structlog.PrintLoggerFactory(),
)Exercises
Exercise 1: Wrap your CSV pipeline (from the Pandas lesson) with proper logging: log start/end, row counts before and after cleaning, and any skipped rows with reasons.
Exercise 2: Write a decorator @log_errors(logger) that catches any exception from the decorated function, logs it with logger.exception(), and re-raises it.
Exercise 3: Set up structlog in JSON mode with a bound context containing app_name and env (read from env vars), so every log line includes these fields automatically.
Summary
| Concern | Solution |
|---------|---------|
| Basic setup in scripts | logging.basicConfig(level=..., format=...) |
| Module loggers | logging.getLogger(__name__) |
| Log rotation | RotatingFileHandler |
| Exception tracebacks | logger.exception() or exc_info=True |
| Shared context per logger | LoggerAdapter |
| Structured JSON logs | structlog |
| Performance tracking | log_duration context manager |
| Silence noisy libs | logging.getLogger("lib").setLevel(WARNING) |
Next: async Python — asyncio, async/await, and concurrent HTTP with httpx.
Enjoyed this article?
Explore the Backend Systems learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.