Secure Event-Driven Queue Architecture: Producers, Brokers, Workers, Prioritization and Retries
How to design a production-grade queue architecture with clear separation between producers, brokers, and workers. Security at the queue level, message prioritization, retry strategies with exponential backoff, dead letter queues, idempotency, and real-time examples from healthcare, fintech, and e-commerce.
The Architecture at a Glance
A well-designed queue system has three distinct layers. Each layer has one job. Nothing bleeds across the boundary.
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā PRODUCERS ā
ā (API handlers, Lambda functions, scheduled jobs) ā
ā Job: publish a message and forget ā
ā Knows: what happened, who triggered it ā
ā Does NOT know: who processes it, how many consumers exist ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāā¬āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā publish (authenticated, encrypted)
ā¼
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā BROKER ā
ā (RabbitMQ / Azure Service Bus / AWS SQS+SNS / Kafka) ā
ā Job: accept, route, persist, and deliver messages ā
ā Knows: routing rules, queue topology, delivery guarantees ā
ā Does NOT know: business logic ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāā¬āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā deliver (rate-limited, prioritized)
ā¼
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā WORKERS (CONSUMERS) ā
ā (background services, Lambda functions, Celery tasks) ā
ā Job: process one message, acknowledge or reject ā
ā Knows: business logic for this specific task ā
ā Does NOT know: where the message came from, other workers ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāThis separation is not just a design preference. It is what enables:
- Scaling workers independently of producers
- Replacing the broker without touching application code
- Adding new consumers without changing producers
- Securing each layer independently with least-privilege access
Layer 1: Producers ā What They Do and What They Must Not Do
A producer has one job: publish an event describing what happened and return immediately.
The Wrong Producer
# WRONG ā producer doing worker's job
def place_order(order_data):
order = save_order(order_data)
# Producer is now doing the work of every consumer
send_confirmation_email(order) # what if email service is down?
reserve_inventory(order) # what if inventory service is slow?
notify_warehouse(order) # producer now coupled to warehouse
update_analytics(order) # producer now coupled to analytics
return order # user waited for ALL of thisIf the email service is down, the order placement fails. If inventory is slow, the user waits. Adding a new consumer (e.g. fraud check) requires changing this function.
The Right Producer
# CORRECT ā producer publishes and returns
def place_order(order_data):
order = save_order(order_data)
publish_event('order.placed', {
'order_id': order.id,
'customer_id': order.customer_id,
'items': order.items,
'total': order.total,
'placed_at': order.created_at.isoformat()
})
return order # returns immediately ā everything else is asyncThe producer knows what happened. It does not know or care what happens next.
Producer Security ā Least Privilege at Publish Time
Each producer should have credentials that allow it to publish to specific exchanges/topics only ā nothing else.
RabbitMQ ā per-service user with write-only permission:
# RabbitMQ management API ā create user for order service
# This user can ONLY publish to the 'orders' exchange
# It cannot read queues, cannot declare exchanges, cannot manage anything
import requests
# Create user
requests.put('http://rabbitmq:15672/api/users/order-service',
auth=('admin', 'admin_password'),
json={'password': 'order_service_secret', 'tags': ''})
# Grant permission: configure nothing, write to 'orders' exchange only, read nothing
requests.put('http://rabbitmq:15672/api/permissions/%2F/order-service',
auth=('admin', 'admin_password'),
json={
'configure': '', # cannot declare/delete exchanges or queues
'write': '^orders$', # can only publish to 'orders' exchange
'read': '' # cannot consume from any queue
})AWS SQS ā IAM policy for producer:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "OrderServiceProducerOnly",
"Effect": "Allow",
"Action": [
"sqs:SendMessage",
"sqs:GetQueueUrl"
],
"Resource": "arn:aws:sqs:eu-west-1:123456789:order-events",
"Condition": {
"StringEquals": {
"aws:RequestedRegion": "eu-west-1"
}
}
}
]
}The order service Lambda has this IAM policy attached. It can send messages to order-events queue only. It cannot read, purge, delete, or touch any other queue.
Azure Service Bus ā Shared Access Policy:
// Producer uses a SAS policy with Send-only rights
// Created in Azure Portal or Terraform:
resource "azurerm_servicebus_namespace_authorization_rule" "order_producer" {
name = "order-service-producer"
namespace_id = azurerm_servicebus_namespace.main.id
listen = false // cannot read
send = true // can publish
manage = false // cannot create/delete queues
}
// Producer connection string uses this restricted policy
var client = new ServiceBusClient(sendOnlyConnectionString);Layer 2: The Broker ā Routing, Security, and Guarantees
Message Encryption at Rest and in Transit
In transit ā TLS everywhere:
# RabbitMQ ā TLS connection
import ssl
import pika
ssl_context = ssl.create_default_context(cafile='/etc/ssl/certs/ca.pem')
ssl_context.load_cert_chain('/etc/ssl/certs/client.pem', '/etc/ssl/private/client.key')
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='rabbitmq.internal',
port=5671, # TLS port (not 5672)
ssl_options=pika.SSLOptions(ssl_context)
)
)# AWS SQS ā enforce TLS via queue policy
resource "aws_sqs_queue_policy" "enforce_tls" {
queue_url = aws_sqs_queue.order_events.url
policy = jsonencode({
Statement = [{
Effect = "Deny"
Principal = "*"
Action = "sqs:*"
Resource = aws_sqs_queue.order_events.arn
Condition = {
Bool = {
"aws:SecureTransport" = "false" # deny any non-TLS request
}
}
}]
})
}At rest ā encrypted queues:
# AWS SQS ā KMS encryption at rest
resource "aws_sqs_queue" "order_events" {
name = "order-events"
kms_master_key_id = aws_kms_key.queue_key.arn
kms_data_key_reuse_period_seconds = 300
}
resource "aws_kms_key" "queue_key" {
description = "SQS encryption key for order events"
deletion_window_in_days = 30
enable_key_rotation = true
}# RabbitMQ ā encrypt message payload before publishing
# (RabbitMQ itself does not encrypt payloads ā do it at application level)
from cryptography.fernet import Fernet
fernet = Fernet(os.environ['MESSAGE_ENCRYPTION_KEY'])
def publish_secure(exchange, routing_key, payload: dict):
encrypted = fernet.encrypt(json.dumps(payload).encode())
channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=encrypted,
properties=pika.BasicProperties(
content_type='application/octet-stream',
headers={'encrypted': True, 'version': '1'}
)
)Message Signing ā Verify the Producer is Who They Say They Are
import hmac
import hashlib
import time
SECRET_KEY = os.environ['MESSAGE_SIGNING_KEY']
def sign_message(payload: dict) -> dict:
timestamp = str(int(time.time()))
body = json.dumps(payload, sort_keys=True)
signature = hmac.new(
SECRET_KEY.encode(),
f"{timestamp}.{body}".encode(),
hashlib.sha256
).hexdigest()
return {
'payload': payload,
'timestamp': timestamp,
'signature': signature
}
def verify_message(signed_message: dict, max_age_seconds=300) -> dict:
timestamp = signed_message['timestamp']
payload = signed_message['payload']
received_sig = signed_message['signature']
# Reject replayed messages (older than 5 minutes)
if abs(time.time() - int(timestamp)) > max_age_seconds:
raise SecurityError("Message timestamp too old ā possible replay attack")
body = json.dumps(payload, sort_keys=True)
expected_sig = hmac.new(
SECRET_KEY.encode(),
f"{timestamp}.{body}".encode(),
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(received_sig, expected_sig):
raise SecurityError("Message signature invalid ā possible tampering")
return payloadThis is the same pattern Stripe uses for webhook signatures ā timestamp + HMAC prevents both tampering and replay attacks.
Priority Queues ā High-Value Work First
Not all messages are equal. A payment failure alert should not wait behind 10,000 marketing emails.
Priority in RabbitMQ
RabbitMQ supports per-message priority natively (0ā255, practically 0ā10):
# Declare queue with max priority level
channel.queue_declare(
queue='notifications',
durable=True,
arguments={'x-max-priority': 10} # supports priority 0-10
)
# High priority ā appointment cancellation (patient impacted)
channel.basic_publish(
exchange='',
routing_key='notifications',
body=json.dumps(cancellation_message),
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent,
priority=9 # high ā process before lower-priority messages
)
)
# Low priority ā marketing newsletter
channel.basic_publish(
exchange='',
routing_key='notifications',
body=json.dumps(newsletter_message),
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent,
priority=1 # low ā can wait
)
)RabbitMQ delivers priority 9 messages before priority 1 messages in the same queue, even if the priority 1 messages arrived first.
Priority via Separate Queues (More Common in Production)
Using message priority numbers is simple but brittle. The production pattern most teams use is separate queues per priority tier, with workers that drain high-priority queues first.
āāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā critical-notifications ā ā SLA: 10 seconds
ā (appointment cancelled, ā
ā payment failed, ā
Notification ā emergency alerts) ā
Exchange āāāāāāāāāāāāāāŗāāāāāāāāāāāāāāāāāāāāāāāāāāāā¤
(topic) ā standard-notifications ā ā SLA: 2 minutes
ā (confirmations, ā
ā reminders, receipts) ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāā¤
ā bulk-notifications ā ā SLA: 30 minutes
ā (newsletters, ā
ā marketing, reports) ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāā# RabbitMQ topic exchange routing by priority
channel.exchange_declare(exchange='notifications', exchange_type='topic')
# Queues
channel.queue_declare(queue='critical-notifications', durable=True)
channel.queue_declare(queue='standard-notifications', durable=True)
channel.queue_declare(queue='bulk-notifications', durable=True)
# Bindings
channel.queue_bind(exchange='notifications', queue='critical-notifications',
routing_key='notification.critical.#')
channel.queue_bind(exchange='notifications', queue='standard-notifications',
routing_key='notification.standard.#')
channel.queue_bind(exchange='notifications', queue='bulk-notifications',
routing_key='notification.bulk.#')
# Publisher sets the priority tier in the routing key
def publish_notification(notification_type: str, priority: str, payload: dict):
routing_key = f'notification.{priority}.{notification_type}'
channel.basic_publish(
exchange='notifications',
routing_key=routing_key,
body=json.dumps(payload),
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent)
)
# Publish appointment cancellation ā critical
publish_notification('appointment_cancelled', 'critical', {
'patient_id': 'pat_123',
'appointment_id': 'apt_456',
'cancellation_reason': 'Provider unavailable'
})
# Publish newsletter ā bulk
publish_notification('newsletter', 'bulk', {
'campaign_id': 'camp_789',
'batch_id': 'batch_001'
})Worker that prioritises ā checks critical first, then standard, then bulk:
def run_priority_worker():
queues_by_priority = [
'critical-notifications', # checked first every cycle
'standard-notifications', # checked if critical is empty
'bulk-notifications' # checked if both above are empty
]
while True:
processed = False
for queue in queues_by_priority:
method, properties, body = channel.basic_get(queue=queue, auto_ack=False)
if method: # message available in this queue
try:
process_notification(json.loads(body))
channel.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
processed = True
break # go back to checking critical queue first
if not processed:
time.sleep(0.1) # nothing in any queue, brief pausePriority in AWS SQS
Standard SQS does not support message priority natively. The pattern is always separate queues:
# Terraform ā three priority queues
resource "aws_sqs_queue" "critical" {
name = "notifications-critical"
message_retention_seconds = 3600 # 1 hour ā stale critical alerts are useless
visibility_timeout_seconds = 30
}
resource "aws_sqs_queue" "standard" {
name = "notifications-standard"
message_retention_seconds = 86400 # 24 hours
visibility_timeout_seconds = 60
}
resource "aws_sqs_queue" "bulk" {
name = "notifications-bulk"
message_retention_seconds = 604800 # 7 days
visibility_timeout_seconds = 300
}
# Lambda triggered by critical queue ā reserved concurrency 50 (always available)
resource "aws_lambda_event_source_mapping" "critical_trigger" {
event_source_arn = aws_sqs_queue.critical.arn
function_name = aws_lambda_function.notification_handler.arn
batch_size = 1 # process one at a time for speed
}
# Lambda triggered by bulk queue ā reserved concurrency 5 (low priority)
resource "aws_lambda_event_source_mapping" "bulk_trigger" {
event_source_arn = aws_sqs_queue.bulk.arn
function_name = aws_lambda_function.notification_handler.arn
batch_size = 10 # batch for efficiency
}Real example ā Uber Eats:
When a restaurant marks an order as "ready for pickup", that message goes to the driver-dispatch-critical queue (driver must be notified immediately). When Uber Eats sends a promotional email, it goes to marketing-bulk. Critical dispatch workers have 100x the concurrency of bulk marketing workers.
Retry Strategies ā How to Handle Failure Without Losing Messages
The Naive Retry (What Not to Do)
# WRONG ā immediate retry hammers a struggling service
def process(message):
for attempt in range(3):
try:
call_external_api(message)
return
except Exception:
continue # immediately retry ā this makes the problem worse
raise Exception("Failed after 3 retries")If the external API is slow because it is overloaded, immediately retrying adds more load. You turn a slowdown into a complete outage.
Exponential Backoff with Jitter
import time
import random
def process_with_backoff(message, max_retries=5):
attempt = message.get('_retry_count', 0)
try:
process_business_logic(message)
except TransientError as e:
if attempt >= max_retries:
send_to_dead_letter_queue(message, str(e))
return
# Exponential backoff: 1s, 2s, 4s, 8s, 16s
# Jitter: random 0-1s added to prevent thundering herd
delay = (2 ** attempt) + random.uniform(0, 1)
message['_retry_count'] = attempt + 1
message['_last_error'] = str(e)
message['_next_attempt'] = time.time() + delay
# Requeue with delay
schedule_retry(message, delay_seconds=delay)
except PermanentError as e:
# No retry ā straight to DLQ with context
send_to_dead_letter_queue(message, str(e), permanent=True)Thundering herd without jitter:
Service goes down at T=0
1000 consumers all fail at T=1
All 1000 retry at T=3 (2 second backoff)
Service comes back up at T=2.5
All 1000 hit it simultaneously at T=3 ā overloaded again, goes back downWith jitter:
All 1000 retry between T=2.5 and T=3.5
Service recovers, handles gradual ramp-up
No spikeDelay Queues ā Retry with Scheduled Re-delivery
The pattern for retries in RabbitMQ: route failed messages to a "wait" exchange that has a TTL (time-to-live). When the TTL expires, messages dead-letter back into the main queue for retry.
# Setup: dead-letter chain creates delay queues
def setup_retry_queues(channel, base_queue: str):
delays = [10, 60, 300, 1800] # 10s, 1m, 5m, 30m
for delay_ms in [d * 1000 for d in delays]:
wait_queue = f'{base_queue}.wait.{delay_ms}ms'
channel.queue_declare(
queue=wait_queue,
durable=True,
arguments={
'x-dead-letter-exchange': '', # default exchange
'x-dead-letter-routing-key': base_queue, # return to main queue
'x-message-ttl': delay_ms, # wait this long then redeliver
'x-expires': delay_ms * 2 # delete queue if unused
}
)
# DLQ ā messages that exhausted all retries
channel.queue_declare(queue=f'{base_queue}.dead', durable=True)
def retry_message(channel, message_body: bytes, retry_count: int, base_queue: str):
delays = [10000, 60000, 300000, 1800000] # ms
if retry_count >= len(delays):
# Exhausted retries ā send to DLQ
channel.basic_publish(
exchange='',
routing_key=f'{base_queue}.dead',
body=message_body,
properties=pika.BasicProperties(
headers={'retry_count': retry_count, 'failed_at': time.time()}
)
)
return
wait_queue = f'{base_queue}.wait.{delays[retry_count]}ms'
channel.basic_publish(
exchange='',
routing_key=wait_queue,
body=message_body,
properties=pika.BasicProperties(
headers={'retry_count': retry_count + 1}
)
)AWS SQS ā delay queues built in:
resource "aws_sqs_queue" "payment_retry" {
name = "payment-processing-retry"
delay_seconds = 30 # all messages delayed 30s before visible
visibility_timeout_seconds = 60
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.payment_dlq.arn
maxReceiveCount = 5 # after 5 failures ā DLQ
})
}MassTransit (.NET) ā built-in retry with policy:
cfg.ReceiveEndpoint("payment-processing", e =>
{
e.ConfigureConsumer<PaymentConsumer>(context);
// Immediate retries for transient errors (network blip)
e.UseMessageRetry(r => r
.Immediate(2) // retry twice immediately
.Intervals(TimeSpan.FromSeconds(10), // then 10s
TimeSpan.FromSeconds(60), // then 60s
TimeSpan.FromMinutes(5), // then 5m
TimeSpan.FromMinutes(30)) // then 30m
.Ignore<ValidationException>() // don't retry validation errors
.Ignore<DuplicatePaymentException>() // don't retry known duplicates
);
// Circuit breaker ā stop hammering a failing service
e.UseCircuitBreaker(cb =>
{
cb.TrackingPeriod = TimeSpan.FromMinutes(1);
cb.TripThreshold = 15; // trip if 15% of requests fail
cb.ActiveThreshold = 10; // need at least 10 requests to evaluate
cb.ResetInterval = TimeSpan.FromMinutes(5); // try again after 5m
});
});Dead Letter Queues ā What Happens When Everything Fails
A Dead Letter Queue (DLQ) is where messages go after exhausting all retries. It is not a bin ā it is a holding area for investigation and manual re-processing.
DLQ Setup and Monitoring
# AWS ā SQS with DLQ and CloudWatch alarm
resource "aws_sqs_queue" "orders" {
name = "order-processing"
visibility_timeout_seconds = 60
message_retention_seconds = 86400
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.orders_dlq.arn
maxReceiveCount = 3
})
}
resource "aws_sqs_queue" "orders_dlq" {
name = "order-processing-dlq"
message_retention_seconds = 1209600 # 14 days to investigate
}
# Alert if anything lands in DLQ ā this should never happen silently
resource "aws_cloudwatch_metric_alarm" "orders_dlq_depth" {
alarm_name = "orders-dlq-messages"
alarm_description = "Orders landing in DLQ ā investigate immediately"
namespace = "AWS/SQS"
metric_name = "ApproximateNumberOfMessagesVisible"
dimensions = { QueueName = aws_sqs_queue.orders_dlq.name }
statistic = "Sum"
period = 60
evaluation_periods = 1
threshold = 1 # alert on first message
comparison_operator = "GreaterThanOrEqualToThreshold"
alarm_actions = [aws_sns_topic.alerts.arn]
}Enriched DLQ Messages ā Context for Debugging
When a message lands in the DLQ, add everything needed to diagnose and replay it:
def send_to_dlq(original_message: dict, error: Exception, retry_count: int):
dlq_message = {
'original_message': original_message,
'failure_context': {
'error_type': type(error).__name__,
'error_message': str(error),
'stack_trace': traceback.format_exc(),
'retry_count': retry_count,
'first_attempt_at': original_message.get('_first_attempt_at'),
'final_failure_at': datetime.utcnow().isoformat(),
'worker_id': os.environ.get('HOSTNAME', 'unknown'),
'queue_name': original_message.get('_source_queue')
},
'replay_instructions': {
'target_queue': original_message.get('_source_queue'),
'command': f'aws sqs send-message --queue-url <url> --message-body <body>'
}
}
sqs.send_message(
QueueUrl=DLQ_URL,
MessageBody=json.dumps(dlq_message)
)DLQ Re-drive ā Replaying Failed Messages
After fixing the underlying bug, replay the DLQ:
def redrive_dlq(dlq_url: str, target_queue_url: str, batch_size: int = 10):
"""Replay all messages from DLQ back to the original queue."""
redriven = 0
errors = 0
while True:
response = sqs.receive_message(
QueueUrl=dlq_url,
MaxNumberOfMessages=batch_size,
WaitTimeSeconds=5
)
messages = response.get('Messages', [])
if not messages:
break
for msg in messages:
try:
dlq_envelope = json.loads(msg['Body'])
original = dlq_envelope['original_message']
# Remove failure metadata before replaying
original.pop('_retry_count', None)
original.pop('_last_error', None)
sqs.send_message(
QueueUrl=target_queue_url,
MessageBody=json.dumps(original)
)
sqs.delete_message(
QueueUrl=dlq_url,
ReceiptHandle=msg['ReceiptHandle']
)
redriven += 1
except Exception as e:
log.error(f"Failed to redrive message: {e}")
errors += 1
return {'redriven': redriven, 'errors': errors}AWS built-in redrive (2023 feature):
# AWS now has native DLQ redrive ā no custom code needed
response = sqs.start_message_move_task(
SourceArn=dlq_arn,
DestinationArn=source_queue_arn,
MaxNumberOfMessagesPerSecond=10 # controlled rate
)
task_handle = response['TaskHandle']
# Monitor progress
status = sqs.list_message_move_tasks(SourceArn=dlq_arn)Idempotency ā Handling Duplicate Messages
Every distributed message system delivers at-least-once. This means the same message can arrive twice: once from the original delivery, once from a retry after a network timeout where the broker didn't receive the acknowledgement.
The problem without idempotency:
T=0: Worker receives "charge_card" message, calls Stripe API
T=1: Stripe charges card successfully
T=2: Worker crashes before acknowledging the message
T=3: Message reappears in queue (visibility timeout expired)
T=4: Another worker receives same message, calls Stripe again
T=5: Customer charged twiceIdempotency with a Deduplication Key
import boto3
from boto3.dynamodb.conditions import Attr
dynamodb = boto3.resource('dynamodb')
processed_table = dynamodb.Table('processed-message-ids')
def process_idempotently(message: dict):
message_id = message['message_id'] # unique ID from producer
try:
# Atomic: create record only if it doesn't exist
processed_table.put_item(
Item={
'message_id': message_id,
'processed_at': datetime.utcnow().isoformat(),
'ttl': int(time.time()) + 86400 # expire after 24 hours
},
ConditionExpression=Attr('message_id').not_exists()
)
except dynamodb.meta.client.exceptions.ConditionalCheckFailedException:
# Already processed ā this is a duplicate, skip safely
log.info(f"Duplicate message {message_id} ā skipping")
return
# Safe to process ā guaranteed first time
do_actual_work(message)Producer must generate stable idempotency keys:
def publish_payment_event(payment_id: str, amount: float):
# Key is deterministic from business ID ā same payment always same key
idempotency_key = f"payment-charged-{payment_id}"
publish_event('payment.charged', {
'message_id': idempotency_key, # stable, not random
'payment_id': payment_id,
'amount': amount,
'timestamp': datetime.utcnow().isoformat()
})SQS FIFO ā deduplication built in:
sqs.send_message(
QueueUrl='https://sqs.eu-west-1.amazonaws.com/123/payments.fifo',
MessageBody=json.dumps(payment_event),
MessageGroupId=f'customer-{customer_id}',
MessageDeduplicationId=f'payment-{payment_id}' # SQS deduplicates for 5 minutes
)Layer 3: Workers ā Processing with Safety
The Perfect Worker
import signal
import sys
class GracefulWorker:
"""
Worker that:
- Processes one message at a time
- Acks only on success
- Nacks permanently broken messages to DLQ
- Retries transient failures with backoff
- Shuts down gracefully (finishes current message before stopping)
"""
def __init__(self, channel, queue_name: str):
self.channel = channel
self.queue_name = queue_name
self.running = True
self.processing = False
# Handle SIGTERM gracefully ā finish current message before exiting
signal.signal(signal.SIGTERM, self._handle_shutdown)
signal.signal(signal.SIGINT, self._handle_shutdown)
def _handle_shutdown(self, signum, frame):
self.running = False
if not self.processing:
sys.exit(0)
# If processing, set flag ā worker will exit after current message
def run(self):
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(
queue=self.queue_name,
on_message_callback=self._handle_message
)
while self.running:
self.channel.connection.process_data_events(time_limit=1)
sys.exit(0)
def _handle_message(self, ch, method, properties, body):
self.processing = True
retry_count = (properties.headers or {}).get('retry_count', 0)
try:
message = json.loads(body)
self._process(message)
ch.basic_ack(delivery_tag=method.delivery_tag)
except TransientError as e:
log.warning(f"Transient failure (attempt {retry_count + 1}): {e}")
ch.basic_ack(delivery_tag=method.delivery_tag) # remove from queue
retry_message(ch, body, retry_count, self.queue_name)
except PermanentError as e:
log.error(f"Permanent failure ā sending to DLQ: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
except Exception as e:
log.exception(f"Unexpected error processing message")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
finally:
self.processing = False
if not self.running:
sys.exit(0)
def _process(self, message: dict):
raise NotImplementedError("Subclass must implement _process")Worker Security ā Least Privilege at Consume Time
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "NotificationWorkerOnly",
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:ChangeMessageVisibility",
"sqs:GetQueueAttributes"
],
"Resource": [
"arn:aws:sqs:eu-west-1:123456789:notifications-critical",
"arn:aws:sqs:eu-west-1:123456789:notifications-standard"
]
},
{
"Sid": "DLQSendOnly",
"Effect": "Allow",
"Action": ["sqs:SendMessage"],
"Resource": "arn:aws:sqs:eu-west-1:123456789:notifications-dlq"
}
]
}The notification worker:
- Can read from its two queues
- Can delete messages (to acknowledge)
- Can send to the DLQ (to reject permanently broken messages)
- Cannot send to any other queue
- Cannot touch payment queues or order queues
Complete Real-World Example: MyBCAT Appointment System
End-to-end flow combining all the patterns above.
Patient books appointment via web app
ā
ā¼
API Gateway ā Booking Lambda (Producer)
ā
ā publishes: appointment.confirmed (signed + encrypted)
ā¼
SNS Topic: appointment-events
ā
āāāāāā“āāāāāāāāāāāāāāāāāāā¬āāāāāāāāāāāāāāāāāāāāā
ā¼ ā¼ ā¼
SQS: notifications SQS: calendar-sync SQS: analytics
(critical tier) (standard tier) (bulk tier)
ā ā ā
ā¼ ā¼ ā¼
Notification Worker CalSync Worker Analytics Worker
(Twilio SMS + Email) (Google Calendar) (DynamoDB write)
ā
āā Success: ack, delete message
āā Transient (Twilio 503): retry with backoff (10s ā 60s ā 300s)
āā Permanent (invalid phone): DLQ + CloudWatch alarm# Booking Lambda (Producer)
def handler(event, context):
body = json.loads(event['body'])
# 1. Save to database (with idempotency key)
appointment = booking_service.create_appointment(
slot_id=body['slot_id'],
patient_id=body['patient_id'],
idempotency_key=body['idempotency_key']
)
# 2. Sign and publish event
message = sign_message({
'message_id': f'apt-confirmed-{appointment.id}', # stable idempotency key
'event_type': 'appointment.confirmed',
'appointment_id': appointment.id,
'patient_id': appointment.patient_id,
'provider_id': appointment.provider_id,
'date': appointment.date.isoformat(),
'time': appointment.time,
'practice_id': appointment.practice_id
})
sns.publish(
TopicArn=APPOINTMENT_EVENTS_TOPIC,
Message=json.dumps(message),
MessageAttributes={
'event_type': {'DataType': 'String', 'StringValue': 'appointment.confirmed'},
'priority': {'DataType': 'String', 'StringValue': 'critical'}
}
)
return {'statusCode': 201, 'body': json.dumps({'appointment_id': appointment.id})}
# Notification Worker
def process_notification(message: dict):
# 1. Verify signature (reject tampered messages)
payload = verify_message(message)
# 2. Idempotency check
if already_processed(payload['message_id']):
return # duplicate delivery, skip safely
# 3. Load patient details
patient = patient_service.get(payload['patient_id'])
appointment = appointment_service.get(payload['appointment_id'])
# 4. Send SMS (Twilio)
try:
twilio_client.messages.create(
to=patient.phone,
from_=TWILIO_FROM,
body=f"Confirmed: {appointment.provider} on {appointment.date} at {appointment.time}. Ref: {appointment.confirmation_number}"
)
except TwilioRestException as e:
if e.status == 400: # invalid number ā permanent failure
raise PermanentError(f"Invalid phone number for patient {patient.id}: {e}")
raise TransientError(f"Twilio error {e.status}: {e}") # will retry
# 5. Send email
ses.send_email(...)
# 6. Mark as processed (idempotency record)
mark_as_processed(payload['message_id'])Production Checklist
Before going live with any queue-based system:
Security
ā Producers use send-only credentials (cannot read)
ā Workers use receive-only credentials (cannot send to non-DLQ queues)
ā All connections over TLS
ā Queues encrypted at rest (KMS for AWS, TLS for RabbitMQ)
ā Sensitive payloads encrypted at application level
ā Messages signed with HMAC (replay attack prevention)
Reliability
ā All messages durable/persistent (survive broker restart)
ā Manual acknowledgement (not auto-ack)
ā Dead letter queue configured for every consumer queue
ā DLQ monitored with CloudWatch/Prometheus alarm
ā Retry strategy: exponential backoff with jitter, no infinite loops
ā Idempotency on all consumers (at-least-once delivery handled)
Prioritization
ā Critical messages on separate high-priority queues
ā Workers check high-priority queues first
ā Reserved concurrency for critical queues (AWS Lambda)
ā Message TTL on critical queues (stale critical messages are useless)
Observability
ā Queue depth metrics (alert if queue grows unexpectedly)
ā Consumer lag monitoring
ā DLQ depth alerts (every DLQ message triggers a notification)
ā Message processing time tracked
ā Correlation ID threaded through all messages for tracing
Operations
ā DLQ redrive procedure documented in runbook
ā Graceful shutdown on workers (finish current message before exit)
ā prefetch_count set to 1 (don't over-fetch)
ā Visibility timeout > max processing timeQueue Architecture & Security Knowledge Check
5 questions Ā· Test what you just learned Ā· Instant explanations
Enjoyed this article?
Explore the Backend Systems learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.