Messaging Systems Complete Guide: RabbitMQ, Kafka, Redis, NATS, Azure Service Bus and More
Every major messaging system explained — RabbitMQ exchanges and routing, Kafka partitions and consumer groups, Redis Streams, NATS, Azure Service Bus, AWS SQS/SNS, ZeroMQ. When to use each, real-world examples, code in Python, .NET, and TypeScript, and a decision framework.
Why Messaging Systems Exist
In a synchronous system, Service A calls Service B directly over HTTP. This is simple and easy to reason about — but it creates tight coupling:
- If Service B is down, Service A fails
- If Service B is slow, Service A is slow
- If traffic spikes, both services must scale together
- Adding Service C means changing Service A to call both B and C
A messaging system sits between producers and consumers. Service A publishes a message to the broker. The broker holds it. Service B (and C, and D) consume it when they are ready.
Without messaging (tight coupling):
Service A ──HTTP──► Service B (if B is down, A fails)
With messaging (loose coupling):
Service A ──► Broker ──► Service B (B can be down, message waits)
└──► Service C (C added without changing A)
└──► Service DThe key question when choosing a messaging system: Are you distributing work (task queue), broadcasting events (pub/sub), or streaming data continuously (event stream)?
The Messaging Landscape
Task Queues (point-to-point, one consumer processes each message):
RabbitMQ → most flexible routing, enterprise-grade
Azure Service Bus → managed, deep Azure integration
AWS SQS → managed, simplest, massive scale
Redis Queues → when Redis is already in the stack
Pub/Sub (fan-out, all subscribers receive each message):
RabbitMQ fanout → routing + fan-out combined
Azure Service Bus Topics → managed pub/sub
AWS SNS → managed fan-out
Redis Pub/Sub → ephemeral, in-memory, no persistence
NATS → ultra-low latency, cloud-native
Event Streaming (ordered, replayable, retained log):
Apache Kafka → the industry standard for high-volume streams
Azure Event Hubs → managed Kafka-compatible
AWS Kinesis → managed streaming
Redis Streams → lightweight streaming when Redis is already there
Embedded / In-Process:
ZeroMQ → direct socket messaging, no broker
MassTransit → .NET abstraction over RabbitMQ/Azure/SQS
MediatR → in-process mediator pattern (.NET)RabbitMQ — The Most Flexible Message Broker
RabbitMQ implements the AMQP (Advanced Message Queuing Protocol) standard. It is the most flexible general-purpose message broker — you can model almost any messaging pattern with it.
Core Concepts
Producer → Exchange → Binding → Queue → Consumer- Producer: publishes messages to an exchange (never directly to a queue)
- Exchange: receives messages and routes them to queues based on rules
- Binding: a rule linking an exchange to a queue (with an optional routing key)
- Queue: holds messages until a consumer picks them up
- Consumer: reads from a queue and acknowledges each message
The critical design choice in RabbitMQ is the exchange type. This determines how messages are routed.
Exchange Types
Direct Exchange — route by exact key:
Producer → [routing_key: "payment.processed"] → Direct Exchange
│
┌───────────────────────────┤
▼ ▼
Queue: payment-receipts Queue: payment-audit
(binding key: payment.processed) (binding key: payment.processed)import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Declare exchange
channel.exchange_declare(exchange='payments', exchange_type='direct')
# Declare queues
channel.queue_declare(queue='payment-receipts', durable=True)
channel.queue_declare(queue='payment-audit', durable=True)
# Bind queues to exchange with routing key
channel.queue_bind(exchange='payments', queue='payment-receipts', routing_key='payment.processed')
channel.queue_bind(exchange='payments', queue='payment-audit', routing_key='payment.processed')
# Publish
channel.basic_publish(
exchange='payments',
routing_key='payment.processed',
body='{"amount": 49.99, "currency": "GBP"}',
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent # survive broker restart
)
)Fanout Exchange — broadcast to all queues:
Every queue bound to a fanout exchange receives every message, regardless of routing key.
channel.exchange_declare(exchange='order-events', exchange_type='fanout')
# All three queues get every order event
channel.queue_bind(exchange='order-events', queue='inventory-service')
channel.queue_bind(exchange='order-events', queue='notification-service')
channel.queue_bind(exchange='order-events', queue='analytics-service')
# Publish — routing_key is ignored for fanout
channel.basic_publish(exchange='order-events', routing_key='', body='{"orderId": "ORD-123"}')Real example: Deliveroo uses fanout for order status events — when an order is confirmed, the inventory service, courier assignment service, customer notification service, and analytics service all need to react.
Topic Exchange — route by pattern:
Topic exchanges route by routing key patterns using * (one word) and # (zero or more words).
Routing key pattern: ..
"order.created.uk" → matches "order.#" and "*.created.*" and "order.created.uk"
"order.cancelled.us" → matches "order.#" and "*.cancelled.*"
"payment.failed.eu" → matches "payment.#" and "*.failed.*" channel.exchange_declare(exchange='events', exchange_type='topic')
# UK team: only UK events
channel.queue_bind(exchange='events', queue='uk-ops', routing_key='#.uk')
# Fraud team: all payment events everywhere
channel.queue_bind(exchange='events', queue='fraud-detection', routing_key='payment.#')
# Audit: everything
channel.queue_bind(exchange='events', queue='audit-log', routing_key='#')
# Publish UK order
channel.basic_publish(exchange='events', routing_key='order.created.uk', body='...')
# → goes to uk-ops and audit-log (not fraud-detection)
# Publish payment failure
channel.basic_publish(exchange='events', routing_key='payment.failed.eu', body='...')
# → goes to fraud-detection and audit-log (not uk-ops)Headers Exchange — route by message headers (rarely used):
Routes based on message headers instead of routing key. Useful when the routing decision involves multiple attributes.
channel.exchange_declare(exchange='priority-router', exchange_type='headers')
channel.queue_bind(
exchange='priority-router',
queue='high-priority-queue',
arguments={'x-match': 'all', 'priority': 'high', 'region': 'eu'}
)Consumer Acknowledgements — The Most Important RabbitMQ Concept
This is where most teams make mistakes.
def process_payment(ch, method, properties, body):
try:
data = json.loads(body)
process(data)
# Acknowledge ONLY after successful processing
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# Negative acknowledge — requeue the message for retry
# requeue=False sends to DLQ instead of retrying forever
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
log.error(f"Payment processing failed: {e}")
# prefetch_count=1: don't give me a second message until I ack the first
# Without this, RabbitMQ floods the consumer with messages it can't process fast enough
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='payments', on_message_callback=process_payment)
channel.start_consuming()What happens without proper acknowledgement:
- Auto-ack mode: message deleted the moment it is delivered, even if processing crashes halfway through. Payment deducted, order never created.
- Never acking: messages pile up as "unacknowledged", memory fills up, broker crashes.
- Always requeue on failure: infinite retry loop on a bad message (poison pill) blocks the entire queue.
Dead Letter Exchange (DLQ in RabbitMQ)
Messages that fail processing land in the Dead Letter Exchange and get routed to a dead letter queue for inspection.
# Main queue — messages that fail 3 times go to DLX
channel.queue_declare(
queue='payments',
durable=True,
arguments={
'x-dead-letter-exchange': 'payments.dlx',
'x-dead-letter-routing-key': 'dead',
'x-message-ttl': 300000, # 5 minutes max in queue
'x-max-length': 10000 # max 10k messages before dropping
}
)
# Dead letter exchange and queue
channel.exchange_declare(exchange='payments.dlx', exchange_type='direct')
channel.queue_declare(queue='payments.dead', durable=True)
channel.queue_bind(exchange='payments.dlx', queue='payments.dead', routing_key='dead')RabbitMQ in .NET with MassTransit
Raw RabbitMQ clients are verbose. MassTransit is the standard .NET abstraction:
// Program.cs
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<PaymentProcessedConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq://localhost", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint("payment-processed", e =>
{
e.ConfigureConsumer<PaymentProcessedConsumer>(context);
e.UseMessageRetry(r => r.Intervals(100, 500, 1000, 5000)); // retry with backoff
e.UseDeadLetterQueue("payment-processed-dlq");
});
});
});
// Publishing a message
public class OrderService
{
private readonly IPublishEndpoint _publishEndpoint;
public async Task PlaceOrder(Order order)
{
await _orderRepository.SaveAsync(order);
await _publishEndpoint.Publish(new OrderPlaced
{
OrderId = order.Id,
CustomerId = order.CustomerId,
Amount = order.Total,
PlacedAt = DateTimeOffset.UtcNow
});
}
}
// Consumer
public class PaymentProcessedConsumer : IConsumer<PaymentProcessed>
{
public async Task Consume(ConsumeContext<PaymentProcessed> context)
{
var payment = context.Message;
await _orderService.MarkAsPaidAsync(payment.OrderId);
// MassTransit auto-acks on success, auto-nacks on exception
}
}MassTransit handles: serialisation, retries with exponential backoff, dead letter routing, correlation IDs, and saga state machines — all things you would otherwise build manually.
RabbitMQ Real-World Examples
Shopify — order processing pipeline: When a Shopify order is placed, a message goes to a topic exchange. Multiple consumers process different aspects in parallel: inventory reservation, fraud check, payment capture, fulfilment routing, analytics ingestion. Each runs independently. If the analytics service is slow, it does not delay payment capture.
Healthcare booking (MyBCAT pattern): After a call ends in Amazon Connect → EventBridge event → Lambda → RabbitMQ message published. Three consumers pick it up: transcription service (DeepGram), CRM update service, billing service. The call handler Lambda returns immediately. All three process asynchronously.
Apache Kafka — Event Streaming at Scale
Kafka is fundamentally different from RabbitMQ. RabbitMQ is a message broker — messages are consumed and deleted. Kafka is an event log — events are appended to a log and retained.
RabbitMQ: Queue (messages consumed and gone)
Producer → [msg1, msg2, msg3] → Consumer (reads, acks, deleted)
Kafka: Log (events retained, multiple consumers, replayable)
Producer → [evt1, evt2, evt3, evt4, evt5] (retained 7 days)
↑
Consumer A: read up to evt3 (offset 3)
Consumer B: read up to evt5 (offset 5)
Consumer C: re-reading from evt1 (replaying)Core Concepts
Topic: A named log. Producers write to a topic. Consumers read from a topic.
Partition: A topic is split into partitions. Each partition is an ordered, immutable sequence of events. Partitions enable parallelism — multiple consumers can read from different partitions simultaneously.
Offset: Every event in a partition has a sequential number (offset). Consumers track their position by offset. This is how Kafka enables replay — a consumer can reset its offset to 0 and re-read everything.
Consumer Group: A group of consumers that share the work of reading a topic. Each partition is assigned to exactly one consumer in the group. Add more consumers = more parallelism (up to the number of partitions).
Topic: payments (3 partitions)
Partition 0: [e0, e1, e2, e3, e4]
Partition 1: [e0, e1, e2]
Partition 2: [e0, e1, e2, e3]
Consumer Group A (payment-processor, 3 instances):
Instance 1 → reads Partition 0
Instance 2 → reads Partition 1
Instance 3 → reads Partition 2
Consumer Group B (analytics, 1 instance):
Instance 1 → reads ALL 3 partitions (one group, three partitions)Both groups read independently. Group A processing payments does not affect Group B doing analytics.
Producing and Consuming in Python
from confluent_kafka import Producer, Consumer
# Producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})
def delivery_callback(err, msg):
if err:
print(f'Message delivery failed: {err}')
else:
print(f'Delivered to {msg.topic()} partition {msg.partition()} offset {msg.offset()}')
producer.produce(
topic='payments',
key='customer-123', # same key → same partition (ordering per customer)
value='{"amount": 49.99}',
callback=delivery_callback
)
producer.flush() # wait for all messages to be delivered
# Consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'payment-processor',
'auto.offset.reset': 'earliest', # start from beginning if no committed offset
'enable.auto.commit': False # manual commit for exactly-once guarantees
})
consumer.subscribe(['payments'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise Exception(msg.error())
data = json.loads(msg.value())
process_payment(data)
# Commit offset only after successful processing
consumer.commit(asynchronous=False)
finally:
consumer.close()Kafka in .NET with Confluent.Kafka
// Producer
var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
using var producer = new ProducerBuilder<string, string>(config).Build();
var result = await producer.ProduceAsync("payments", new Message<string, string>
{
Key = customerId, // partition by customer for ordering
Value = JsonSerializer.Serialize(paymentEvent)
});
Console.WriteLine($"Delivered to partition {result.Partition}, offset {result.Offset}");
// Consumer
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "payment-processor",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false
};
using var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
consumer.Subscribe("payments");
while (!cancellationToken.IsCancellationRequested)
{
var result = consumer.Consume(cancellationToken);
var payment = JsonSerializer.Deserialize<PaymentEvent>(result.Message.Value);
await ProcessPaymentAsync(payment);
consumer.Commit(result); // only commit after processing
}When Kafka is the Right Choice
Kafka is the right choice when you need:
- Replay — re-process historical events (audit, reindex, debug production issues)
- Multiple independent consumers — analytics team reads same events as payment processor
- High throughput — millions of events per second
- Event sourcing — the event log IS the source of truth
Kafka is overkill when:
- You have simple task distribution (use RabbitMQ or SQS)
- Messages should be processed once and deleted
- You need complex routing logic (RabbitMQ topic exchanges are better)
- You have low volume (< 10k messages/day)
Real example — LinkedIn (Kafka's origin): LinkedIn built Kafka because they needed to capture every user action (view, click, search, connection request) for real-time analytics and feed ranking. The data volume was too high for a traditional message queue, and they needed multiple teams (analytics, recommendation engine, spam detection) to read the same event stream independently without affecting each other.
Real example — Uber: Uber uses Kafka to track driver and rider location updates. 1 million location updates per second across all drivers worldwide. Multiple consumers: surge pricing calculation, ETA calculation, dispatch matching, analytics. Each consumer reads independently.
Redis Pub/Sub and Redis Streams
Redis is primarily a cache, but it has two messaging features built in.
Redis Pub/Sub — Ephemeral Fan-Out
Redis Pub/Sub is the simplest possible pub/sub system: publish to a channel, all current subscribers receive it instantly. No persistence, no replay, no delivery guarantees.
import redis
r = redis.Redis(host='localhost', port=6379)
# Publisher
r.publish('notifications', json.dumps({
'type': 'appointment_confirmed',
'patient_id': 'pat_123',
'appointment_id': 'apt_456'
}))
# Subscriber
pubsub = r.pubsub()
pubsub.subscribe('notifications')
for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
handle_notification(data)The critical limitation: If a subscriber is offline when the message is published, the message is gone. There is no queue, no persistence, no retry.
When to use Redis Pub/Sub:
- Real-time dashboard updates (if they miss one update, the next update will correct it)
- Cache invalidation signals ("this key changed, invalidate your local copy")
- Live chat presence ("user is typing")
- WebSocket broadcast to connected users
When NOT to use Redis Pub/Sub:
- Anything where message delivery matters (payments, orders, notifications that must arrive)
- Long-running background jobs
- Any situation where the consumer might be offline
Redis Streams — Lightweight Persistent Streaming
Redis Streams (added in Redis 5.0) is a persistent, replayable message log. It is essentially a lightweight Kafka inside Redis.
r = redis.Redis(host='localhost', port=6379)
# Producer — XADD appends to the stream
r.xadd('appointments', {
'event_type': 'appointment_confirmed',
'appointment_id': 'apt_456',
'patient_id': 'pat_123',
'timestamp': '2026-04-21T09:00:00Z'
})
# Consumer with consumer group (like Kafka consumer groups)
# Create group if not exists
try:
r.xgroup_create('appointments', 'notification-service', id='0', mkstream=True)
except redis.exceptions.ResponseError:
pass # group already exists
# Read messages (XREADGROUP — messages assigned to this consumer)
messages = r.xreadgroup(
groupname='notification-service',
consumername='instance-1',
streams={'appointments': '>'}, # '>' means undelivered messages
count=10,
block=5000 # block for 5 seconds if no messages
)
for stream_name, events in messages:
for event_id, data in events:
try:
send_confirmation_notification(data)
r.xack('appointments', 'notification-service', event_id) # acknowledge
except Exception as e:
log.error(f"Failed to process {event_id}: {e}")
# Message stays pending — will be reassigned after timeoutRedis Streams vs Kafka:
| | Redis Streams | Kafka | |---|---|---| | Throughput | ~100k msg/sec | Millions/sec | | Retention | Up to Redis memory limit | Configurable (days/TB) | | Replay | Yes, from any ID | Yes, from any offset | | Consumer groups | Yes | Yes | | Cluster | Redis Cluster | Kafka Cluster | | Operational complexity | Low (Redis already in stack) | High (Zookeeper/KRaft) | | Use when | Redis already there, moderate volume | High volume, long retention needed |
NATS — The Lightweight Cloud-Native Broker
NATS is a messaging system built for cloud-native architectures. It is written in Go, extremely fast (tens of millions of messages per second), and has a very small footprint (single binary, ~10MB).
NATS is used heavily in Kubernetes environments for microservice-to-microservice communication.
Core NATS Patterns
Basic Pub/Sub:
import asyncio
import nats
async def main():
nc = await nats.connect("nats://localhost:4222")
# Subscribe
async def message_handler(msg):
data = json.loads(msg.data.decode())
print(f"Received on {msg.subject}: {data}")
await nc.subscribe("appointments.confirmed", cb=message_handler)
# Publish
await nc.publish("appointments.confirmed", json.dumps({
"appointment_id": "apt_456",
"patient_id": "pat_123"
}).encode())
await asyncio.sleep(1)
await nc.close()
asyncio.run(main())Queue Groups (load balancing):
In NATS, multiple subscribers on the same subject all receive the message (pub/sub). Queue groups change this — only one member of the group receives each message.
# Three instances of the notification service, all in the same queue group
# Each message goes to exactly ONE instance (load balanced)
await nc.subscribe("notifications", queue="notification-workers", cb=handler)Request/Reply (RPC over NATS):
# Server (handler)
async def handle_slot_check(msg):
request = json.loads(msg.data)
available_slots = get_available_slots(request['date'])
await msg.respond(json.dumps(available_slots).encode())
await nc.subscribe("slots.available", cb=handle_slot_check)
# Client (requester)
response = await nc.request(
"slots.available",
json.dumps({"date": "2026-04-22", "practice_id": "prc_123"}).encode(),
timeout=5.0
)
slots = json.loads(response.data)JetStream — persistence for NATS:
NATS core is fire-and-forget (like Redis Pub/Sub). JetStream adds persistence, replay, and consumer groups:
js = nc.jetstream()
# Create a stream (persistent log)
await js.add_stream(name="APPOINTMENTS", subjects=["appointments.>"])
# Publish to persistent stream
await js.publish("appointments.confirmed", json.dumps(event).encode())
# Durable consumer (survives restarts)
consumer = await js.subscribe(
"appointments.confirmed",
durable="notification-service",
manual_ack=True
)
async for msg in consumer.messages:
process(json.loads(msg.data))
await msg.ack()NATS real-world use:
- Kubernetes sidecar communication (Istio-less service mesh)
- IoT device messaging (millions of devices, lightweight protocol)
- Financial trading platforms (ultra-low latency pub/sub)
- Synadia (NATS creators) runs the global NATS cloud infrastructure
Azure Service Bus — Managed Enterprise Messaging
Azure Service Bus is the managed messaging service on Azure. It supports both queues (point-to-point) and topics/subscriptions (pub/sub).
Queues vs Topics
Queue (point-to-point):
Sender → Queue → One Receiver
Topic (pub/sub):
Sender → Topic → Subscription 1 → Receiver A
→ Subscription 2 → Receiver B
→ Subscription 3 → Receiver C (with filter: only type=urgent).NET with Azure.Messaging.ServiceBus
// Sending a message
var client = new ServiceBusClient(connectionString);
var sender = client.CreateSender("appointment-events");
var message = new ServiceBusMessage(JsonSerializer.Serialize(appointmentEvent))
{
ContentType = "application/json",
MessageId = Guid.NewGuid().ToString(), // idempotency
Subject = "appointment.confirmed",
TimeToLive = TimeSpan.FromHours(24), // expire unprocessed messages
ScheduledEnqueueTime = DateTimeOffset.UtcNow.AddMinutes(5) // delay delivery
};
// Add custom properties for subscription filters
message.ApplicationProperties.Add("region", "uk");
message.ApplicationProperties.Add("priority", "high");
await sender.SendMessageAsync(message);
// Receiving messages (processor — recommended for production)
var processor = client.CreateProcessor("appointment-events", "notification-subscription", new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 5,
AutoCompleteMessages = false // manual complete for control
});
processor.ProcessMessageAsync += async (args) =>
{
var appointment = JsonSerializer.Deserialize<AppointmentEvent>(args.Message.Body);
try
{
await SendConfirmationAsync(appointment);
await args.CompleteMessageAsync(args.Message); // remove from queue
}
catch (TransientException)
{
await args.AbandonMessageAsync(args.Message); // retry later
}
catch (PermanentException)
{
await args.DeadLetterMessageAsync(args.Message, "PermanentFailure", ex.Message);
}
};
processor.ProcessErrorAsync += async (args) =>
{
log.LogError(args.Exception, "Message processing error");
};
await processor.StartProcessingAsync();Subscription Filters
Subscription filters are one of Azure Service Bus's most powerful features — they let different subscribers receive different subsets of messages from the same topic.
// Create subscription with SQL filter
var adminClient = new ServiceBusAdministrationClient(connectionString);
// UK team: only UK messages
await adminClient.CreateRuleAsync("appointment-events", "uk-subscription", new CreateRuleOptions
{
Name = "uk-only",
Filter = new SqlRuleFilter("region = 'uk'")
});
// High priority: urgent messages
await adminClient.CreateRuleAsync("appointment-events", "urgent-subscription", new CreateRuleOptions
{
Name = "high-priority",
Filter = new SqlRuleFilter("priority = 'high' AND event_type = 'appointment.cancelled'")
});
// Correlation filter (faster, index-based):
await adminClient.CreateRuleAsync("appointment-events", "payment-subscription", new CreateRuleOptions
{
Name = "payments-only",
Filter = new CorrelationRuleFilter { Subject = "payment.processed" }
});Azure equivalent of RabbitMQ topic exchange: Azure Service Bus topics with SQL filters.
Sessions — Ordered Processing
Service Bus Sessions guarantee message ordering for a session ID. All messages with the same session ID are delivered to the same consumer in order.
// Sending with session (all messages for one patient go to same consumer)
var message = new ServiceBusMessage(payload)
{
SessionId = $"patient-{patientId}" // all this patient's messages → same consumer instance
};
// Session-aware receiver
var sessionProcessor = client.CreateSessionProcessor("appointment-events", new ServiceBusSessionProcessorOptions
{
MaxConcurrentSessions = 10
});
sessionProcessor.ProcessMessageAsync += async (args) =>
{
// All messages for the same SessionId arrive here in order
// args.SessionId contains the session identifier
};AWS SQS and SNS — Managed Cloud Messaging
(Covered in depth in the Event-Driven Architecture guide. Summary here for completeness.)
SQS Patterns in Code
import boto3
sqs = boto3.client('sqs', region_name='eu-west-1')
# Send message with deduplication (FIFO queue)
response = sqs.send_message(
QueueUrl='https://sqs.eu-west-1.amazonaws.com/123456789/appointments.fifo',
MessageBody=json.dumps(appointment_event),
MessageGroupId=f'practice-{practice_id}', # ordering per practice
MessageDeduplicationId=idempotency_key # prevent duplicates
)
# Receive and process
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20, # long polling (cheaper, faster than short poll)
VisibilityTimeout=60 # 60s to process before message reappears
)
for message in response.get('Messages', []):
try:
process(json.loads(message['Body']))
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
)
except Exception:
# Don't delete → message reappears after VisibilityTimeout → retry
passSNS Fan-Out with Filters
sns = boto3.client('sns', region_name='eu-west-1')
# Publish with message attributes for filtering
sns.publish(
TopicArn='arn:aws:sns:eu-west-1:123456789:appointment-events',
Message=json.dumps(event),
MessageAttributes={
'event_type': {
'DataType': 'String',
'StringValue': 'appointment.confirmed'
},
'region': {
'DataType': 'String',
'StringValue': 'uk'
}
}
)# Terraform — SNS subscription filter (only UK confirmations)
resource "aws_sns_topic_subscription" "uk_notifications" {
topic_arn = aws_sns_topic.appointment_events.arn
protocol = "sqs"
endpoint = aws_sqs_queue.uk_notifications.arn
filter_policy = jsonencode({
event_type = ["appointment.confirmed"]
region = ["uk"]
})
}ZeroMQ — No-Broker Messaging
ZeroMQ is different from everything above — there is no broker. Messages are sent directly between processes over sockets, using TCP, IPC (inter-process communication), or in-process.
This makes it extremely fast (no network hop to a broker) but also means there is no persistence, no management UI, and no delivery guarantees beyond the socket layer.
import zmq
# Pub side
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")
socket.send_string("temperature 22.5") # topic prefix
socket.send_string("humidity 65.2")
# Sub side
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt_string(zmq.SUBSCRIBE, "temperature") # only temperature messages
while True:
message = socket.recv_string()
print(f"Received: {message}")Push/Pull — work distribution without a broker:
# Worker distributor (PUSH)
context = zmq.Context()
pusher = context.socket(zmq.PUSH)
pusher.bind("tcp://*:5557")
for task in tasks:
pusher.send_json(task)
# Worker (PULL)
puller = context.socket(zmq.PULL)
puller.connect("tcp://localhost:5557")
while True:
task = puller.recv_json()
process(task)When ZeroMQ makes sense:
- High-frequency sensor data (IoT, trading) where broker overhead is unacceptable
- Inter-process communication on the same machine
- Embedded systems where running a broker is not possible
- When you need microsecond latency
When ZeroMQ is wrong:
- Any situation where message loss is unacceptable
- Cross-datacenter messaging
- When you need routing, filtering, or persistence
MediatR — In-Process Messaging (.NET)
MediatR is not a distributed message broker. It is an in-process mediator that decouples the caller from the handler within a single application. It implements the CQRS pattern inside one process.
// Command (write)
public record BookAppointmentCommand(
string SlotId,
string PatientId,
string Reason
) : IRequest<AppointmentResult>;
// Command handler
public class BookAppointmentHandler : IRequestHandler<BookAppointmentCommand, AppointmentResult>
{
private readonly IAppointmentRepository _repo;
private readonly IPublishEndpoint _bus; // RabbitMQ/SBus for external events
public async Task<AppointmentResult> Handle(
BookAppointmentCommand command,
CancellationToken ct)
{
var appointment = await _repo.BookSlotAsync(command.SlotId, command.PatientId);
// Publish domain event within same process (in-process)
await _mediator.Publish(new AppointmentBookedEvent(appointment.Id));
return new AppointmentResult(appointment.Id, appointment.ConfirmationNumber);
}
}
// Notification handler (in-process subscriber)
public class SendConfirmationOnBooking : INotificationHandler<AppointmentBookedEvent>
{
public async Task Handle(AppointmentBookedEvent notification, CancellationToken ct)
{
// Runs in the same HTTP request lifecycle
await _emailService.SendConfirmationAsync(notification.AppointmentId);
}
}
// Controller — just dispatches, knows nothing about handlers
[HttpPost("appointments")]
public async Task<IActionResult> Book([FromBody] BookAppointmentRequest request)
{
var result = await _mediator.Send(new BookAppointmentCommand(
request.SlotId,
request.PatientId,
request.Reason
));
return Created($"/appointments/{result.Id}", result);
}MediatR pipeline behaviors (middleware for commands):
public class ValidationBehavior<TRequest, TResponse>
: IPipelineBehavior<TRequest, TResponse>
{
private readonly IEnumerable<IValidator<TRequest>> _validators;
public async Task<TResponse> Handle(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken ct)
{
var failures = _validators
.Select(v => v.Validate(request))
.SelectMany(r => r.Errors)
.Where(f => f != null)
.ToList();
if (failures.Any())
throw new ValidationException(failures);
return await next(); // continue to actual handler
}
}
// Register behaviors (run in order: logging → validation → handler)
builder.Services.AddMediatR(cfg =>
{
cfg.RegisterServicesFromAssembly(typeof(Program).Assembly);
cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(LoggingBehavior<,>));
cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(ValidationBehavior<,>));
});Comparing All Messaging Systems
| System | Type | Persistence | Throughput | Routing | Managed | Best For | |--------|------|-------------|------------|---------|---------|----------| | RabbitMQ | Broker | Yes | High | Very flexible (exchanges) | Self-hosted (CloudAMQP managed) | Complex routing, enterprise workflows | | Kafka | Event log | Yes (retained) | Very high | By partition key | Confluent Cloud | Event streaming, replay, multiple consumers | | Azure Service Bus | Broker | Yes | High | SQL filters, sessions | Yes (Azure) | Enterprise .NET, Azure ecosystem | | AWS SQS | Queue | Yes | Very high | None (basic) | Yes (AWS) | Simple decoupling, Lambda triggers | | AWS SNS | Pub/sub | No | Very high | Attribute filters | Yes (AWS) | Fan-out, cross-service broadcast | | Redis Pub/Sub | Pub/sub | No | Very high | Channel name | Redis Cloud | Cache invalidation, ephemeral real-time | | Redis Streams | Event log | Yes (in memory) | High | Consumer groups | Redis Cloud | When Redis already in stack, moderate volume | | NATS | Broker + log | Optional (JetStream) | Very high | Subject wildcards | Synadia Cloud | Cloud-native, Kubernetes, IoT, low latency | | ZeroMQ | Brokerless | No | Extreme | None | Self-hosted | Sensor data, inter-process, embedded | | MediatR | In-process | No | In-process | Handler types | N/A | CQRS within a monolith or service |
Decision Framework
Ask these questions in order:
1. Does it need to survive restarts and be replayable?
No → Redis Pub/Sub, NATS core, ZeroMQ
Yes → continue
2. Is it one message → one processor (task queue)?
Yes → RabbitMQ, Azure Service Bus Queue, AWS SQS
No → continue
3. Is it one event → many consumers (fan-out)?
Yes, complex routing needed → RabbitMQ topic exchange, Azure Service Bus Topics
Yes, simple → AWS SNS, Azure Event Grid
No → continue
4. Is it a continuous stream with multiple independent consumers and replay?
Yes, high volume → Apache Kafka, Azure Event Hubs
Yes, moderate volume → Redis Streams, NATS JetStream
5. Are you on Azure?
→ Azure Service Bus (queues for tasks, topics for events)
→ Azure Event Hubs if Kafka-compatible streaming
6. Are you on AWS?
→ SQS for task queues
→ SNS + SQS for fan-out
→ Kinesis or MSK (managed Kafka) for streaming
7. Is this within a single .NET application?
→ MediatR for in-process CQRSProduction Patterns
Pattern 1: The Outbox + RabbitMQ (Guaranteed Delivery)
# Inside a database transaction:
with db.transaction():
db.execute("INSERT INTO appointments ...")
db.execute("""
INSERT INTO outbox (event_type, payload, status, created_at)
VALUES ('appointment.confirmed', ?, 'PENDING', NOW())
""", [json.dumps(event)])
# Separate outbox poller (runs every second):
def publish_outbox_events():
events = db.query("SELECT * FROM outbox WHERE status = 'PENDING' LIMIT 100")
for event in events:
try:
channel.basic_publish(
exchange='appointments',
routing_key=event['event_type'],
body=event['payload']
)
db.execute("UPDATE outbox SET status='PUBLISHED' WHERE id=?", [event['id']])
except Exception as e:
db.execute("UPDATE outbox SET retries = retries + 1 WHERE id=?", [event['id']])This guarantees exactly-once publish: the appointment is created AND the event is queued atomically. The broker being down does not lose the event.
Pattern 2: Competing Consumers with Backoff
import time
import random
def process_with_backoff(max_retries=5):
retries = 0
def callback(ch, method, properties, body):
nonlocal retries
try:
process(json.loads(body))
ch.basic_ack(delivery_tag=method.delivery_tag)
retries = 0
except RetryableException as e:
retries += 1
if retries > max_retries:
# Give up, send to DLQ
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
return
# Exponential backoff with jitter
delay = (2 ** retries) + random.uniform(0, 1)
time.sleep(delay)
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
except PermanentException:
# No retry — straight to DLQ
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
return callbackPattern 3: Saga — Distributed Transactions Over Messaging
A saga coordinates a multi-step transaction where each step is a separate service.
Order Placement Saga:
1. Payment Service: charge card → success → continue
2. Inventory Service: reserve items → success → continue
3. Fulfilment Service: create order → success → done
If step 3 fails:
→ Compensation: Inventory Service: release reservation
→ Compensation: Payment Service: refund chargeUsing MassTransit Saga State Machine:
public class OrderSaga : MassTransitStateMachine<OrderSagaState>
{
public State Pending { get; set; }
public State PaymentTaken { get; set; }
public State InventoryReserved { get; set; }
public State Completed { get; set; }
public State Failed { get; set; }
public Event<OrderPlaced> OrderPlaced { get; set; }
public Event<PaymentProcessed> PaymentProcessed { get; set; }
public Event<PaymentFailed> PaymentFailed { get; set; }
public Event<InventoryReserved> InventoryReserved { get; set; }
public OrderSaga()
{
InstanceState(x => x.CurrentState);
Initially(
When(OrderPlaced)
.Then(ctx => ctx.Saga.OrderId = ctx.Message.OrderId)
.PublishAsync(ctx => ctx.Init<ProcessPayment>(new { ctx.Message.Amount }))
.TransitionTo(Pending)
);
During(Pending,
When(PaymentProcessed)
.PublishAsync(ctx => ctx.Init<ReserveInventory>(new { ctx.Saga.OrderId }))
.TransitionTo(PaymentTaken),
When(PaymentFailed)
.TransitionTo(Failed)
);
During(PaymentTaken,
When(InventoryReserved)
.PublishAsync(ctx => ctx.Init<CreateFulfilment>(new { ctx.Saga.OrderId }))
.TransitionTo(Completed)
);
}
}Interview Answers
"RabbitMQ vs Kafka — when do you use each?"
"The core difference is what happens to a message after it is consumed. In RabbitMQ, a message is deleted after the consumer acknowledges it — it is a task queue. In Kafka, a message is retained in a log for a configured retention period, and multiple independent consumer groups can read it at different offsets — it is an event stream.
I use RabbitMQ when I have discrete units of work to distribute — booking a slot, sending a notification, processing a payment. I use Kafka when I need multiple teams or services to independently consume the same events, when I need to replay events for debugging or reindexing, or when I am processing millions of events per second. For most application-level messaging, RabbitMQ is simpler and sufficient. Kafka is not a drop-in replacement — it is a different tool for a different problem."
"What is the difference between pub/sub and a task queue?"
"In a task queue, each message is processed by exactly one consumer. The queue distributes work — if I have 10 jobs and 3 workers, each job goes to one worker. In pub/sub, each message goes to all subscribers simultaneously. If I publish an OrderPlaced event, the inventory service, the notification service, and the analytics service all receive it independently.
The practical consequence: task queues are for distributing work, pub/sub is for broadcasting events. An order confirmation email is a task — one worker sends it. An order placed event is a pub/sub notification — every service that cares gets a copy."
"How do you guarantee a message is not lost?"
"Three things. First, durable/persistent messages — the message is written to disk on the broker so a broker restart doesn't lose it. Second, manual acknowledgement — the consumer only acknowledges the message after successfully processing it, so a consumer crash before processing is complete doesn't lose it. Third, the Outbox Pattern — publish the event inside the same database transaction that creates the data, then a separate process reads from the outbox and publishes to the broker. This means the data write and the event publication are atomic — they either both happen or neither does."
RabbitMQ & Messaging Knowledge Check
5 questions · Test what you just learned · Instant explanations
Enjoyed this article?
Explore the Backend Systems learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.