Python Lambda Functions: Production Patterns on AWS
Write production-grade Python Lambda functions — handler structure, cold starts, environment variables, error handling, structured logging, layers, and connection pooling for DynamoDB and RDS.
What Is AWS Lambda?
AWS Lambda runs your code in response to events — HTTP requests, queue messages, scheduled timers, S3 uploads — without you managing any servers. You pay only for the compute time consumed, billed in 1ms increments.
Every Lambda function follows the same lifecycle:
Event source → Lambda runtime → Your handler → ResponseHandler Structure
A Python Lambda handler is a plain function that receives two arguments:
import json
def handler(event, context):
# event: the triggering payload (dict)
# context: runtime metadata (request ID, timeout remaining, etc.)
return {
"statusCode": 200,
"body": json.dumps({"message": "ok"})
}event is the raw payload from the event source. For API Gateway proxy integration it looks like:
{
"httpMethod": "GET",
"path": "/appointments",
"queryStringParameters": {"clinic_id": "CLN-001"},
"headers": {"Authorization": "Bearer eyJ..."},
"body": None
}context gives you runtime info:
context.aws_request_id # unique ID per invocation
context.function_name # function name
context.get_remaining_time_in_millis() # ms until timeoutCold Starts
Lambda keeps your execution environment "warm" for a few minutes after each invocation. A cold start happens when a new environment is initialized — the runtime downloads your code, imports your modules, and runs module-level code.
Cold start cost breakdown:
| Phase | Typical duration |
|-------|-----------------|
| Environment init | 100–300 ms |
| Python runtime start | 100–200 ms |
| Module imports | 50–500 ms (depends on packages) |
| Your __init__ code | Variable |
Minimize cold starts:
# BAD — imports inside the handler run on every invocation
def handler(event, context):
import boto3
import pandas # heavy import!
...
# GOOD — module-level imports run once per container lifetime
import boto3
import json
from decimal import Decimal
dynamodb = boto3.resource("dynamodb") # also initialize clients here
table = dynamodb.Table("Appointments")
def handler(event, context):
...Move all imports and AWS client initialization to module scope. They execute once during cold start, then are reused across warm invocations.
Environment Variables
Never hardcode resource names, connection strings, or secrets. Use environment variables:
import os
TABLE_NAME = os.environ["APPOINTMENTS_TABLE"]
REGION = os.environ.get("AWS_REGION", "us-east-1")
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")Set them in your Terraform or SAM config:
# Terraform
resource "aws_lambda_function" "appointments" {
function_name = "appointments-api"
...
environment {
variables = {
APPOINTMENTS_TABLE = aws_dynamodb_table.appointments.name
LOG_LEVEL = "INFO"
}
}
}For secrets (API keys, DB passwords), use AWS Secrets Manager — never put them in environment variables in plain text:
import boto3
import json
def get_secret(secret_name: str) -> dict:
client = boto3.client("secretsmanager")
response = client.get_secret_value(SecretId=secret_name)
return json.loads(response["SecretString"])
# Cache at module level — only fetched once per cold start
_secrets = None
def get_db_credentials():
global _secrets
if _secrets is None:
_secrets = get_secret("prod/myapp/db")
return _secretsStructured Logging
Lambda automatically ships stdout to CloudWatch. Use structured JSON logs — they're searchable and filterable:
import json
import logging
import os
logger = logging.getLogger()
logger.setLevel(os.environ.get("LOG_LEVEL", "INFO"))
def log(level: str, message: str, **kwargs):
entry = {
"level": level,
"message": message,
**kwargs
}
getattr(logger, level.lower())(json.dumps(entry))
def handler(event, context):
log("info", "handler invoked",
request_id=context.aws_request_id,
path=event.get("path"))
try:
result = process(event)
log("info", "request completed", status=200)
return {"statusCode": 200, "body": json.dumps(result)}
except ValueError as e:
log("warning", "validation error", error=str(e))
return {"statusCode": 400, "body": json.dumps({"error": str(e)})}
except Exception as e:
log("error", "unexpected error", error=str(e), exc_info=True)
return {"statusCode": 500, "body": json.dumps({"error": "internal server error"})}Error Handling Patterns
For API Gateway (synchronous):
Return structured HTTP responses — never let an exception bubble up raw:
from dataclasses import dataclass
from typing import Any
import json
def response(status: int, body: Any, headers: dict = None) -> dict:
return {
"statusCode": status,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
**(headers or {})
},
"body": json.dumps(body, default=str) # default=str handles Decimals, datetimes
}
def handler(event, context):
try:
data = parse_input(event)
result = business_logic(data)
return response(200, result)
except ValidationError as e:
return response(400, {"error": e.message})
except NotFoundError as e:
return response(404, {"error": str(e)})
except Exception as e:
logger.exception("unhandled error")
return response(500, {"error": "internal server error"})For SQS / async invocations:
Raise exceptions to trigger automatic retries. Use a Dead Letter Queue (DLQ) for messages that fail repeatedly:
def handler(event, context):
failed = []
for record in event["Records"]:
try:
process_message(json.loads(record["body"]))
except Exception as e:
logger.error(f"failed to process {record['messageId']}: {e}")
failed.append({"itemIdentifier": record["messageId"]})
# Return partial batch failures — only failed messages go back to queue
return {"batchItemFailures": failed}Lambda Layers
Layers let you share code and large dependencies across functions without bundling them into every deployment package.
Common uses:
- Shared utilities / common libraries
- Heavy packages (pandas, numpy, Pillow)
- Internal SDKs
my-layer/
└── python/
└── lib/
└── python3.12/
└── site-packages/
└── shared_utils/Reference in Terraform:
resource "aws_lambda_layer_version" "shared" {
layer_name = "shared-utils"
filename = "layer.zip"
compatible_runtimes = ["python3.12"]
}
resource "aws_lambda_function" "appointments" {
...
layers = [aws_lambda_layer_version.shared.arn]
}Connection Pooling for Databases
Lambda functions can connect to RDS (PostgreSQL). The challenge: each Lambda invocation might open a new connection, and you can exhaust the DB connection pool under load.
Solution — RDS Proxy:
import psycopg2
import os
# Module-level — connection reused across warm invocations
_conn = None
def get_connection():
global _conn
if _conn is None or _conn.closed:
_conn = psycopg2.connect(
host=os.environ["RDS_PROXY_ENDPOINT"], # RDS Proxy, not direct DB
database=os.environ["DB_NAME"],
user=os.environ["DB_USER"],
password=os.environ["DB_PASSWORD"],
connect_timeout=5
)
return _conn
def handler(event, context):
conn = get_connection()
with conn.cursor() as cur:
cur.execute("SELECT id, name FROM clinics WHERE active = true")
rows = cur.fetchall()
return response(200, [{"id": r[0], "name": r[1]} for r in rows])RDS Proxy multiplexes Lambda connections to the database, preventing connection exhaustion even at high concurrency.
Concurrency & Throttling
Lambda scales automatically — each concurrent request gets its own execution environment. Be aware of:
- Account concurrency limit: 1,000 by default (soft limit, can be increased)
- Reserved concurrency: guarantee a function's minimum capacity, also acts as a hard cap
- Provisioned concurrency: pre-warm environments to eliminate cold starts for latency-sensitive APIs
resource "aws_lambda_provisioned_concurrency_config" "portal_api" {
function_name = aws_lambda_function.portal_api.function_name
qualifier = aws_lambda_alias.live.name
provisioned_concurrent_executions = 5
}Testing Lambda Functions Locally
Test handlers directly — they're just functions:
# tests/test_handler.py
from unittest.mock import patch, MagicMock
from handlers.appointments import handler
def test_get_appointments_returns_200():
event = {
"httpMethod": "GET",
"path": "/appointments",
"queryStringParameters": {"clinic_id": "CLN-001"},
}
context = MagicMock()
context.aws_request_id = "test-request-id"
with patch("handlers.appointments.table") as mock_table:
mock_table.query.return_value = {
"Items": [{"id": "APT-1", "patient": "Jane Doe"}]
}
result = handler(event, context)
assert result["statusCode"] == 200Use AWS SAM CLI to invoke functions locally with real Lambda emulation:
sam local invoke AppointmentsFunction --event events/get-appointments.json
sam local start-api # spin up a local API GatewayKey Takeaways
| Pattern | Rule | |---------|------| | Imports & client init | Module scope — runs once per cold start | | Secrets | AWS Secrets Manager — never plain-text env vars | | Logging | Structured JSON to stdout | | Error handling | Always return proper HTTP responses for API GW | | DB connections | RDS Proxy + module-level connection reuse | | Concurrency | Reserved concurrency to protect downstream services |
Enjoyed this article?
Explore the Backend Systems learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.