Back to blog
Backend Systemsadvanced

Secure Event-Driven Queue Architecture: Producers, Brokers, Workers, Prioritization and Retries

How to design a production-grade queue architecture with clear separation between producers, brokers, and workers. Security at the queue level, message prioritization, retry strategies with exponential backoff, dead letter queues, idempotency, and real-time examples from healthcare, fintech, and e-commerce.

SystemForgeApril 21, 202618 min read
Queue ArchitectureEvent-DrivenRabbitMQSQSSecurityRetry StrategyDead Letter QueuePrioritizationMicroservicesSystem DesignReal World
Share:š•

The Architecture at a Glance

A well-designed queue system has three distinct layers. Each layer has one job. Nothing bleeds across the boundary.

ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│  PRODUCERS                                                       │
│  (API handlers, Lambda functions, scheduled jobs)                │
│  Job: publish a message and forget                               │
│  Knows: what happened, who triggered it                          │
│  Does NOT know: who processes it, how many consumers exist       │
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
                           │  publish (authenticated, encrypted)
                           ā–¼
ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│  BROKER                                                          │
│  (RabbitMQ / Azure Service Bus / AWS SQS+SNS / Kafka)           │
│  Job: accept, route, persist, and deliver messages               │
│  Knows: routing rules, queue topology, delivery guarantees       │
│  Does NOT know: business logic                                   │
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
                           │  deliver (rate-limited, prioritized)
                           ā–¼
ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│  WORKERS (CONSUMERS)                                             │
│  (background services, Lambda functions, Celery tasks)           │
│  Job: process one message, acknowledge or reject                 │
│  Knows: business logic for this specific task                    │
│  Does NOT know: where the message came from, other workers       │
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜

This separation is not just a design preference. It is what enables:

  • Scaling workers independently of producers
  • Replacing the broker without touching application code
  • Adding new consumers without changing producers
  • Securing each layer independently with least-privilege access

Layer 1: Producers — What They Do and What They Must Not Do

A producer has one job: publish an event describing what happened and return immediately.

The Wrong Producer

Python
# WRONG — producer doing worker's job
def place_order(order_data):
    order = save_order(order_data)
    
    # Producer is now doing the work of every consumer
    send_confirmation_email(order)      # what if email service is down?
    reserve_inventory(order)            # what if inventory service is slow?
    notify_warehouse(order)             # producer now coupled to warehouse
    update_analytics(order)             # producer now coupled to analytics
    
    return order  # user waited for ALL of this

If the email service is down, the order placement fails. If inventory is slow, the user waits. Adding a new consumer (e.g. fraud check) requires changing this function.

The Right Producer

Python
# CORRECT — producer publishes and returns
def place_order(order_data):
    order = save_order(order_data)
    
    publish_event('order.placed', {
        'order_id': order.id,
        'customer_id': order.customer_id,
        'items': order.items,
        'total': order.total,
        'placed_at': order.created_at.isoformat()
    })
    
    return order  # returns immediately — everything else is async

The producer knows what happened. It does not know or care what happens next.

Producer Security — Least Privilege at Publish Time

Each producer should have credentials that allow it to publish to specific exchanges/topics only — nothing else.

RabbitMQ — per-service user with write-only permission:

Python
# RabbitMQ management API — create user for order service
# This user can ONLY publish to the 'orders' exchange
# It cannot read queues, cannot declare exchanges, cannot manage anything

import requests

# Create user
requests.put('http://rabbitmq:15672/api/users/order-service', 
    auth=('admin', 'admin_password'),
    json={'password': 'order_service_secret', 'tags': ''})

# Grant permission: configure nothing, write to 'orders' exchange only, read nothing
requests.put('http://rabbitmq:15672/api/permissions/%2F/order-service',
    auth=('admin', 'admin_password'),
    json={
        'configure': '',                  # cannot declare/delete exchanges or queues
        'write': '^orders$',              # can only publish to 'orders' exchange
        'read': ''                        # cannot consume from any queue
    })

AWS SQS — IAM policy for producer:

JSON
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "OrderServiceProducerOnly",
      "Effect": "Allow",
      "Action": [
        "sqs:SendMessage",
        "sqs:GetQueueUrl"
      ],
      "Resource": "arn:aws:sqs:eu-west-1:123456789:order-events",
      "Condition": {
        "StringEquals": {
          "aws:RequestedRegion": "eu-west-1"
        }
      }
    }
  ]
}

The order service Lambda has this IAM policy attached. It can send messages to order-events queue only. It cannot read, purge, delete, or touch any other queue.

Azure Service Bus — Shared Access Policy:

C#
// Producer uses a SAS policy with Send-only rights
// Created in Azure Portal or Terraform:

resource "azurerm_servicebus_namespace_authorization_rule" "order_producer" {
  name                = "order-service-producer"
  namespace_id        = azurerm_servicebus_namespace.main.id
  
  listen = false  // cannot read
  send   = true   // can publish
  manage = false  // cannot create/delete queues
}

// Producer connection string uses this restricted policy
var client = new ServiceBusClient(sendOnlyConnectionString);

Layer 2: The Broker — Routing, Security, and Guarantees

Message Encryption at Rest and in Transit

In transit — TLS everywhere:

Python
# RabbitMQ — TLS connection
import ssl
import pika

ssl_context = ssl.create_default_context(cafile='/etc/ssl/certs/ca.pem')
ssl_context.load_cert_chain('/etc/ssl/certs/client.pem', '/etc/ssl/private/client.key')

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='rabbitmq.internal',
        port=5671,                          # TLS port (not 5672)
        ssl_options=pika.SSLOptions(ssl_context)
    )
)
HCL
# AWS SQS — enforce TLS via queue policy
resource "aws_sqs_queue_policy" "enforce_tls" {
  queue_url = aws_sqs_queue.order_events.url

  policy = jsonencode({
    Statement = [{
      Effect    = "Deny"
      Principal = "*"
      Action    = "sqs:*"
      Resource  = aws_sqs_queue.order_events.arn
      Condition = {
        Bool = {
          "aws:SecureTransport" = "false"  # deny any non-TLS request
        }
      }
    }]
  })
}

At rest — encrypted queues:

HCL
# AWS SQS — KMS encryption at rest
resource "aws_sqs_queue" "order_events" {
  name                              = "order-events"
  kms_master_key_id                 = aws_kms_key.queue_key.arn
  kms_data_key_reuse_period_seconds = 300
}

resource "aws_kms_key" "queue_key" {
  description             = "SQS encryption key for order events"
  deletion_window_in_days = 30
  enable_key_rotation     = true
}
Python
# RabbitMQ — encrypt message payload before publishing
# (RabbitMQ itself does not encrypt payloads — do it at application level)
from cryptography.fernet import Fernet

fernet = Fernet(os.environ['MESSAGE_ENCRYPTION_KEY'])

def publish_secure(exchange, routing_key, payload: dict):
    encrypted = fernet.encrypt(json.dumps(payload).encode())
    
    channel.basic_publish(
        exchange=exchange,
        routing_key=routing_key,
        body=encrypted,
        properties=pika.BasicProperties(
            content_type='application/octet-stream',
            headers={'encrypted': True, 'version': '1'}
        )
    )

Message Signing — Verify the Producer is Who They Say They Are

Python
import hmac
import hashlib
import time

SECRET_KEY = os.environ['MESSAGE_SIGNING_KEY']

def sign_message(payload: dict) -> dict:
    timestamp = str(int(time.time()))
    body = json.dumps(payload, sort_keys=True)
    
    signature = hmac.new(
        SECRET_KEY.encode(),
        f"{timestamp}.{body}".encode(),
        hashlib.sha256
    ).hexdigest()
    
    return {
        'payload': payload,
        'timestamp': timestamp,
        'signature': signature
    }

def verify_message(signed_message: dict, max_age_seconds=300) -> dict:
    timestamp = signed_message['timestamp']
    payload = signed_message['payload']
    received_sig = signed_message['signature']
    
    # Reject replayed messages (older than 5 minutes)
    if abs(time.time() - int(timestamp)) > max_age_seconds:
        raise SecurityError("Message timestamp too old — possible replay attack")
    
    body = json.dumps(payload, sort_keys=True)
    expected_sig = hmac.new(
        SECRET_KEY.encode(),
        f"{timestamp}.{body}".encode(),
        hashlib.sha256
    ).hexdigest()
    
    if not hmac.compare_digest(received_sig, expected_sig):
        raise SecurityError("Message signature invalid — possible tampering")
    
    return payload

This is the same pattern Stripe uses for webhook signatures — timestamp + HMAC prevents both tampering and replay attacks.


Priority Queues — High-Value Work First

Not all messages are equal. A payment failure alert should not wait behind 10,000 marketing emails.

Priority in RabbitMQ

RabbitMQ supports per-message priority natively (0–255, practically 0–10):

Python
# Declare queue with max priority level
channel.queue_declare(
    queue='notifications',
    durable=True,
    arguments={'x-max-priority': 10}  # supports priority 0-10
)

# High priority — appointment cancellation (patient impacted)
channel.basic_publish(
    exchange='',
    routing_key='notifications',
    body=json.dumps(cancellation_message),
    properties=pika.BasicProperties(
        delivery_mode=pika.DeliveryMode.Persistent,
        priority=9  # high — process before lower-priority messages
    )
)

# Low priority — marketing newsletter
channel.basic_publish(
    exchange='',
    routing_key='notifications',
    body=json.dumps(newsletter_message),
    properties=pika.BasicProperties(
        delivery_mode=pika.DeliveryMode.Persistent,
        priority=1  # low — can wait
    )
)

RabbitMQ delivers priority 9 messages before priority 1 messages in the same queue, even if the priority 1 messages arrived first.

Priority via Separate Queues (More Common in Production)

Using message priority numbers is simple but brittle. The production pattern most teams use is separate queues per priority tier, with workers that drain high-priority queues first.

                       ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
                       │  critical-notifications  │ ← SLA: 10 seconds
                       │  (appointment cancelled, │
                       │   payment failed,        │
Notification           │   emergency alerts)      │
Exchange ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā–ŗā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤
(topic)                │  standard-notifications  │ ← SLA: 2 minutes
                       │  (confirmations,         │
                       │   reminders, receipts)   │
                       ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤
                       │  bulk-notifications      │ ← SLA: 30 minutes
                       │  (newsletters,           │
                       │   marketing, reports)    │
                       ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
Python
# RabbitMQ topic exchange routing by priority
channel.exchange_declare(exchange='notifications', exchange_type='topic')

# Queues
channel.queue_declare(queue='critical-notifications', durable=True)
channel.queue_declare(queue='standard-notifications', durable=True)
channel.queue_declare(queue='bulk-notifications', durable=True)

# Bindings
channel.queue_bind(exchange='notifications', queue='critical-notifications',
                   routing_key='notification.critical.#')
channel.queue_bind(exchange='notifications', queue='standard-notifications',
                   routing_key='notification.standard.#')
channel.queue_bind(exchange='notifications', queue='bulk-notifications',
                   routing_key='notification.bulk.#')

# Publisher sets the priority tier in the routing key
def publish_notification(notification_type: str, priority: str, payload: dict):
    routing_key = f'notification.{priority}.{notification_type}'
    
    channel.basic_publish(
        exchange='notifications',
        routing_key=routing_key,
        body=json.dumps(payload),
        properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent)
    )

# Publish appointment cancellation — critical
publish_notification('appointment_cancelled', 'critical', {
    'patient_id': 'pat_123',
    'appointment_id': 'apt_456',
    'cancellation_reason': 'Provider unavailable'
})

# Publish newsletter — bulk
publish_notification('newsletter', 'bulk', {
    'campaign_id': 'camp_789',
    'batch_id': 'batch_001'
})

Worker that prioritises — checks critical first, then standard, then bulk:

Python
def run_priority_worker():
    queues_by_priority = [
        'critical-notifications',   # checked first every cycle
        'standard-notifications',   # checked if critical is empty
        'bulk-notifications'        # checked if both above are empty
    ]
    
    while True:
        processed = False
        
        for queue in queues_by_priority:
            method, properties, body = channel.basic_get(queue=queue, auto_ack=False)
            
            if method:  # message available in this queue
                try:
                    process_notification(json.loads(body))
                    channel.basic_ack(delivery_tag=method.delivery_tag)
                except Exception as e:
                    channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
                processed = True
                break  # go back to checking critical queue first
        
        if not processed:
            time.sleep(0.1)  # nothing in any queue, brief pause

Priority in AWS SQS

Standard SQS does not support message priority natively. The pattern is always separate queues:

HCL
# Terraform — three priority queues
resource "aws_sqs_queue" "critical" {
  name                       = "notifications-critical"
  message_retention_seconds  = 3600        # 1 hour — stale critical alerts are useless
  visibility_timeout_seconds = 30
}

resource "aws_sqs_queue" "standard" {
  name                       = "notifications-standard"
  message_retention_seconds  = 86400       # 24 hours
  visibility_timeout_seconds = 60
}

resource "aws_sqs_queue" "bulk" {
  name                       = "notifications-bulk"
  message_retention_seconds  = 604800      # 7 days
  visibility_timeout_seconds = 300
}

# Lambda triggered by critical queue — reserved concurrency 50 (always available)
resource "aws_lambda_event_source_mapping" "critical_trigger" {
  event_source_arn = aws_sqs_queue.critical.arn
  function_name    = aws_lambda_function.notification_handler.arn
  batch_size       = 1                     # process one at a time for speed
}

# Lambda triggered by bulk queue — reserved concurrency 5 (low priority)
resource "aws_lambda_event_source_mapping" "bulk_trigger" {
  event_source_arn = aws_sqs_queue.bulk.arn
  function_name    = aws_lambda_function.notification_handler.arn
  batch_size       = 10                    # batch for efficiency
}

Real example — Uber Eats: When a restaurant marks an order as "ready for pickup", that message goes to the driver-dispatch-critical queue (driver must be notified immediately). When Uber Eats sends a promotional email, it goes to marketing-bulk. Critical dispatch workers have 100x the concurrency of bulk marketing workers.


Retry Strategies — How to Handle Failure Without Losing Messages

The Naive Retry (What Not to Do)

Python
# WRONG — immediate retry hammers a struggling service
def process(message):
    for attempt in range(3):
        try:
            call_external_api(message)
            return
        except Exception:
            continue  # immediately retry — this makes the problem worse
    raise Exception("Failed after 3 retries")

If the external API is slow because it is overloaded, immediately retrying adds more load. You turn a slowdown into a complete outage.

Exponential Backoff with Jitter

Python
import time
import random

def process_with_backoff(message, max_retries=5):
    attempt = message.get('_retry_count', 0)
    
    try:
        process_business_logic(message)
        
    except TransientError as e:
        if attempt >= max_retries:
            send_to_dead_letter_queue(message, str(e))
            return
        
        # Exponential backoff: 1s, 2s, 4s, 8s, 16s
        # Jitter: random 0-1s added to prevent thundering herd
        delay = (2 ** attempt) + random.uniform(0, 1)
        
        message['_retry_count'] = attempt + 1
        message['_last_error'] = str(e)
        message['_next_attempt'] = time.time() + delay
        
        # Requeue with delay
        schedule_retry(message, delay_seconds=delay)
        
    except PermanentError as e:
        # No retry — straight to DLQ with context
        send_to_dead_letter_queue(message, str(e), permanent=True)

Thundering herd without jitter:

Service goes down at T=0
1000 consumers all fail at T=1
All 1000 retry at T=3 (2 second backoff)
Service comes back up at T=2.5
All 1000 hit it simultaneously at T=3 — overloaded again, goes back down

With jitter:

All 1000 retry between T=2.5 and T=3.5
Service recovers, handles gradual ramp-up
No spike

Delay Queues — Retry with Scheduled Re-delivery

The pattern for retries in RabbitMQ: route failed messages to a "wait" exchange that has a TTL (time-to-live). When the TTL expires, messages dead-letter back into the main queue for retry.

Python
# Setup: dead-letter chain creates delay queues
def setup_retry_queues(channel, base_queue: str):
    delays = [10, 60, 300, 1800]  # 10s, 1m, 5m, 30m
    
    for delay_ms in [d * 1000 for d in delays]:
        wait_queue = f'{base_queue}.wait.{delay_ms}ms'
        
        channel.queue_declare(
            queue=wait_queue,
            durable=True,
            arguments={
                'x-dead-letter-exchange': '',         # default exchange
                'x-dead-letter-routing-key': base_queue,  # return to main queue
                'x-message-ttl': delay_ms,            # wait this long then redeliver
                'x-expires': delay_ms * 2             # delete queue if unused
            }
        )
    
    # DLQ — messages that exhausted all retries
    channel.queue_declare(queue=f'{base_queue}.dead', durable=True)

def retry_message(channel, message_body: bytes, retry_count: int, base_queue: str):
    delays = [10000, 60000, 300000, 1800000]  # ms
    
    if retry_count >= len(delays):
        # Exhausted retries — send to DLQ
        channel.basic_publish(
            exchange='',
            routing_key=f'{base_queue}.dead',
            body=message_body,
            properties=pika.BasicProperties(
                headers={'retry_count': retry_count, 'failed_at': time.time()}
            )
        )
        return
    
    wait_queue = f'{base_queue}.wait.{delays[retry_count]}ms'
    channel.basic_publish(
        exchange='',
        routing_key=wait_queue,
        body=message_body,
        properties=pika.BasicProperties(
            headers={'retry_count': retry_count + 1}
        )
    )

AWS SQS — delay queues built in:

HCL
resource "aws_sqs_queue" "payment_retry" {
  name                       = "payment-processing-retry"
  delay_seconds              = 30      # all messages delayed 30s before visible
  visibility_timeout_seconds = 60
  
  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.payment_dlq.arn
    maxReceiveCount     = 5            # after 5 failures → DLQ
  })
}

MassTransit (.NET) — built-in retry with policy:

C#
cfg.ReceiveEndpoint("payment-processing", e =>
{
    e.ConfigureConsumer<PaymentConsumer>(context);
    
    // Immediate retries for transient errors (network blip)
    e.UseMessageRetry(r => r
        .Immediate(2)                            // retry twice immediately
        .Intervals(TimeSpan.FromSeconds(10),     // then 10s
                   TimeSpan.FromSeconds(60),     // then 60s
                   TimeSpan.FromMinutes(5),      // then 5m
                   TimeSpan.FromMinutes(30))     // then 30m
        .Ignore<ValidationException>()           // don't retry validation errors
        .Ignore<DuplicatePaymentException>()     // don't retry known duplicates
    );
    
    // Circuit breaker — stop hammering a failing service
    e.UseCircuitBreaker(cb =>
    {
        cb.TrackingPeriod = TimeSpan.FromMinutes(1);
        cb.TripThreshold = 15;           // trip if 15% of requests fail
        cb.ActiveThreshold = 10;         // need at least 10 requests to evaluate
        cb.ResetInterval = TimeSpan.FromMinutes(5); // try again after 5m
    });
});

Dead Letter Queues — What Happens When Everything Fails

A Dead Letter Queue (DLQ) is where messages go after exhausting all retries. It is not a bin — it is a holding area for investigation and manual re-processing.

DLQ Setup and Monitoring

HCL
# AWS — SQS with DLQ and CloudWatch alarm
resource "aws_sqs_queue" "orders" {
  name                       = "order-processing"
  visibility_timeout_seconds = 60
  message_retention_seconds  = 86400
  
  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.orders_dlq.arn
    maxReceiveCount     = 3
  })
}

resource "aws_sqs_queue" "orders_dlq" {
  name                      = "order-processing-dlq"
  message_retention_seconds = 1209600  # 14 days to investigate
}

# Alert if anything lands in DLQ — this should never happen silently
resource "aws_cloudwatch_metric_alarm" "orders_dlq_depth" {
  alarm_name          = "orders-dlq-messages"
  alarm_description   = "Orders landing in DLQ — investigate immediately"
  namespace           = "AWS/SQS"
  metric_name         = "ApproximateNumberOfMessagesVisible"
  dimensions          = { QueueName = aws_sqs_queue.orders_dlq.name }
  statistic           = "Sum"
  period              = 60
  evaluation_periods  = 1
  threshold           = 1                 # alert on first message
  comparison_operator = "GreaterThanOrEqualToThreshold"
  alarm_actions       = [aws_sns_topic.alerts.arn]
}

Enriched DLQ Messages — Context for Debugging

When a message lands in the DLQ, add everything needed to diagnose and replay it:

Python
def send_to_dlq(original_message: dict, error: Exception, retry_count: int):
    dlq_message = {
        'original_message': original_message,
        'failure_context': {
            'error_type': type(error).__name__,
            'error_message': str(error),
            'stack_trace': traceback.format_exc(),
            'retry_count': retry_count,
            'first_attempt_at': original_message.get('_first_attempt_at'),
            'final_failure_at': datetime.utcnow().isoformat(),
            'worker_id': os.environ.get('HOSTNAME', 'unknown'),
            'queue_name': original_message.get('_source_queue')
        },
        'replay_instructions': {
            'target_queue': original_message.get('_source_queue'),
            'command': f'aws sqs send-message --queue-url <url> --message-body <body>'
        }
    }
    
    sqs.send_message(
        QueueUrl=DLQ_URL,
        MessageBody=json.dumps(dlq_message)
    )

DLQ Re-drive — Replaying Failed Messages

After fixing the underlying bug, replay the DLQ:

Python
def redrive_dlq(dlq_url: str, target_queue_url: str, batch_size: int = 10):
    """Replay all messages from DLQ back to the original queue."""
    
    redriven = 0
    errors = 0
    
    while True:
        response = sqs.receive_message(
            QueueUrl=dlq_url,
            MaxNumberOfMessages=batch_size,
            WaitTimeSeconds=5
        )
        
        messages = response.get('Messages', [])
        if not messages:
            break
        
        for msg in messages:
            try:
                dlq_envelope = json.loads(msg['Body'])
                original = dlq_envelope['original_message']
                
                # Remove failure metadata before replaying
                original.pop('_retry_count', None)
                original.pop('_last_error', None)
                
                sqs.send_message(
                    QueueUrl=target_queue_url,
                    MessageBody=json.dumps(original)
                )
                
                sqs.delete_message(
                    QueueUrl=dlq_url,
                    ReceiptHandle=msg['ReceiptHandle']
                )
                redriven += 1
                
            except Exception as e:
                log.error(f"Failed to redrive message: {e}")
                errors += 1
    
    return {'redriven': redriven, 'errors': errors}

AWS built-in redrive (2023 feature):

Python
# AWS now has native DLQ redrive — no custom code needed
response = sqs.start_message_move_task(
    SourceArn=dlq_arn,
    DestinationArn=source_queue_arn,
    MaxNumberOfMessagesPerSecond=10  # controlled rate
)
task_handle = response['TaskHandle']

# Monitor progress
status = sqs.list_message_move_tasks(SourceArn=dlq_arn)

Idempotency — Handling Duplicate Messages

Every distributed message system delivers at-least-once. This means the same message can arrive twice: once from the original delivery, once from a retry after a network timeout where the broker didn't receive the acknowledgement.

The problem without idempotency:

T=0: Worker receives "charge_card" message, calls Stripe API
T=1: Stripe charges card successfully
T=2: Worker crashes before acknowledging the message
T=3: Message reappears in queue (visibility timeout expired)
T=4: Another worker receives same message, calls Stripe again
T=5: Customer charged twice

Idempotency with a Deduplication Key

Python
import boto3
from boto3.dynamodb.conditions import Attr

dynamodb = boto3.resource('dynamodb')
processed_table = dynamodb.Table('processed-message-ids')

def process_idempotently(message: dict):
    message_id = message['message_id']  # unique ID from producer
    
    try:
        # Atomic: create record only if it doesn't exist
        processed_table.put_item(
            Item={
                'message_id': message_id,
                'processed_at': datetime.utcnow().isoformat(),
                'ttl': int(time.time()) + 86400  # expire after 24 hours
            },
            ConditionExpression=Attr('message_id').not_exists()
        )
    except dynamodb.meta.client.exceptions.ConditionalCheckFailedException:
        # Already processed — this is a duplicate, skip safely
        log.info(f"Duplicate message {message_id} — skipping")
        return
    
    # Safe to process — guaranteed first time
    do_actual_work(message)

Producer must generate stable idempotency keys:

Python
def publish_payment_event(payment_id: str, amount: float):
    # Key is deterministic from business ID — same payment always same key
    idempotency_key = f"payment-charged-{payment_id}"
    
    publish_event('payment.charged', {
        'message_id': idempotency_key,   # stable, not random
        'payment_id': payment_id,
        'amount': amount,
        'timestamp': datetime.utcnow().isoformat()
    })

SQS FIFO — deduplication built in:

Python
sqs.send_message(
    QueueUrl='https://sqs.eu-west-1.amazonaws.com/123/payments.fifo',
    MessageBody=json.dumps(payment_event),
    MessageGroupId=f'customer-{customer_id}',
    MessageDeduplicationId=f'payment-{payment_id}'  # SQS deduplicates for 5 minutes
)

Layer 3: Workers — Processing with Safety

The Perfect Worker

Python
import signal
import sys

class GracefulWorker:
    """
    Worker that:
    - Processes one message at a time
    - Acks only on success
    - Nacks permanently broken messages to DLQ
    - Retries transient failures with backoff
    - Shuts down gracefully (finishes current message before stopping)
    """
    
    def __init__(self, channel, queue_name: str):
        self.channel = channel
        self.queue_name = queue_name
        self.running = True
        self.processing = False
        
        # Handle SIGTERM gracefully — finish current message before exiting
        signal.signal(signal.SIGTERM, self._handle_shutdown)
        signal.signal(signal.SIGINT, self._handle_shutdown)
    
    def _handle_shutdown(self, signum, frame):
        self.running = False
        if not self.processing:
            sys.exit(0)
        # If processing, set flag — worker will exit after current message
    
    def run(self):
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(
            queue=self.queue_name,
            on_message_callback=self._handle_message
        )
        
        while self.running:
            self.channel.connection.process_data_events(time_limit=1)
        
        sys.exit(0)
    
    def _handle_message(self, ch, method, properties, body):
        self.processing = True
        retry_count = (properties.headers or {}).get('retry_count', 0)
        
        try:
            message = json.loads(body)
            self._process(message)
            ch.basic_ack(delivery_tag=method.delivery_tag)
            
        except TransientError as e:
            log.warning(f"Transient failure (attempt {retry_count + 1}): {e}")
            ch.basic_ack(delivery_tag=method.delivery_tag)  # remove from queue
            retry_message(ch, body, retry_count, self.queue_name)
            
        except PermanentError as e:
            log.error(f"Permanent failure — sending to DLQ: {e}")
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
            
        except Exception as e:
            log.exception(f"Unexpected error processing message")
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
            
        finally:
            self.processing = False
            if not self.running:
                sys.exit(0)
    
    def _process(self, message: dict):
        raise NotImplementedError("Subclass must implement _process")

Worker Security — Least Privilege at Consume Time

JSON
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "NotificationWorkerOnly",
      "Effect": "Allow",
      "Action": [
        "sqs:ReceiveMessage",
        "sqs:DeleteMessage",
        "sqs:ChangeMessageVisibility",
        "sqs:GetQueueAttributes"
      ],
      "Resource": [
        "arn:aws:sqs:eu-west-1:123456789:notifications-critical",
        "arn:aws:sqs:eu-west-1:123456789:notifications-standard"
      ]
    },
    {
      "Sid": "DLQSendOnly",
      "Effect": "Allow",
      "Action": ["sqs:SendMessage"],
      "Resource": "arn:aws:sqs:eu-west-1:123456789:notifications-dlq"
    }
  ]
}

The notification worker:

  • Can read from its two queues
  • Can delete messages (to acknowledge)
  • Can send to the DLQ (to reject permanently broken messages)
  • Cannot send to any other queue
  • Cannot touch payment queues or order queues

Complete Real-World Example: MyBCAT Appointment System

End-to-end flow combining all the patterns above.

Patient books appointment via web app
        │
        ā–¼
API Gateway → Booking Lambda (Producer)
        │
        │ publishes: appointment.confirmed (signed + encrypted)
        ā–¼
SNS Topic: appointment-events
        │
   ā”Œā”€ā”€ā”€ā”€ā”“ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
   ā–¼                       ā–¼                    ā–¼
SQS: notifications     SQS: calendar-sync    SQS: analytics
(critical tier)        (standard tier)       (bulk tier)
   │                       │                    │
   ā–¼                       ā–¼                    ā–¼
Notification Worker    CalSync Worker        Analytics Worker
(Twilio SMS + Email)   (Google Calendar)     (DynamoDB write)
   │
   ā”œā”€ Success: ack, delete message
   ā”œā”€ Transient (Twilio 503): retry with backoff (10s → 60s → 300s)
   └─ Permanent (invalid phone): DLQ + CloudWatch alarm
Python
# Booking Lambda (Producer)
def handler(event, context):
    body = json.loads(event['body'])
    
    # 1. Save to database (with idempotency key)
    appointment = booking_service.create_appointment(
        slot_id=body['slot_id'],
        patient_id=body['patient_id'],
        idempotency_key=body['idempotency_key']
    )
    
    # 2. Sign and publish event
    message = sign_message({
        'message_id': f'apt-confirmed-{appointment.id}',  # stable idempotency key
        'event_type': 'appointment.confirmed',
        'appointment_id': appointment.id,
        'patient_id': appointment.patient_id,
        'provider_id': appointment.provider_id,
        'date': appointment.date.isoformat(),
        'time': appointment.time,
        'practice_id': appointment.practice_id
    })
    
    sns.publish(
        TopicArn=APPOINTMENT_EVENTS_TOPIC,
        Message=json.dumps(message),
        MessageAttributes={
            'event_type': {'DataType': 'String', 'StringValue': 'appointment.confirmed'},
            'priority': {'DataType': 'String', 'StringValue': 'critical'}
        }
    )
    
    return {'statusCode': 201, 'body': json.dumps({'appointment_id': appointment.id})}

# Notification Worker
def process_notification(message: dict):
    # 1. Verify signature (reject tampered messages)
    payload = verify_message(message)
    
    # 2. Idempotency check
    if already_processed(payload['message_id']):
        return  # duplicate delivery, skip safely
    
    # 3. Load patient details
    patient = patient_service.get(payload['patient_id'])
    appointment = appointment_service.get(payload['appointment_id'])
    
    # 4. Send SMS (Twilio)
    try:
        twilio_client.messages.create(
            to=patient.phone,
            from_=TWILIO_FROM,
            body=f"Confirmed: {appointment.provider} on {appointment.date} at {appointment.time}. Ref: {appointment.confirmation_number}"
        )
    except TwilioRestException as e:
        if e.status == 400:  # invalid number — permanent failure
            raise PermanentError(f"Invalid phone number for patient {patient.id}: {e}")
        raise TransientError(f"Twilio error {e.status}: {e}")  # will retry
    
    # 5. Send email
    ses.send_email(...)
    
    # 6. Mark as processed (idempotency record)
    mark_as_processed(payload['message_id'])

Production Checklist

Before going live with any queue-based system:

Security
  āœ“ Producers use send-only credentials (cannot read)
  āœ“ Workers use receive-only credentials (cannot send to non-DLQ queues)
  āœ“ All connections over TLS
  āœ“ Queues encrypted at rest (KMS for AWS, TLS for RabbitMQ)
  āœ“ Sensitive payloads encrypted at application level
  āœ“ Messages signed with HMAC (replay attack prevention)

Reliability
  āœ“ All messages durable/persistent (survive broker restart)
  āœ“ Manual acknowledgement (not auto-ack)
  āœ“ Dead letter queue configured for every consumer queue
  āœ“ DLQ monitored with CloudWatch/Prometheus alarm
  āœ“ Retry strategy: exponential backoff with jitter, no infinite loops
  āœ“ Idempotency on all consumers (at-least-once delivery handled)

Prioritization
  āœ“ Critical messages on separate high-priority queues
  āœ“ Workers check high-priority queues first
  āœ“ Reserved concurrency for critical queues (AWS Lambda)
  āœ“ Message TTL on critical queues (stale critical messages are useless)

Observability
  āœ“ Queue depth metrics (alert if queue grows unexpectedly)
  āœ“ Consumer lag monitoring
  āœ“ DLQ depth alerts (every DLQ message triggers a notification)
  āœ“ Message processing time tracked
  āœ“ Correlation ID threaded through all messages for tracing

Operations
  āœ“ DLQ redrive procedure documented in runbook
  āœ“ Graceful shutdown on workers (finish current message before exit)
  āœ“ prefetch_count set to 1 (don't over-fetch)
  āœ“ Visibility timeout > max processing time

Queue Architecture & Security Knowledge Check

5 questions Ā· Test what you just learned Ā· Instant explanations

Enjoyed this article?

Explore the Backend Systems learning path for more.

Found this helpful?

Share:š•

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.