Back to blog
Backend Systemsadvanced

Messaging Systems Complete Guide: RabbitMQ, Kafka, Redis, NATS, Azure Service Bus and More

Every major messaging system explained — RabbitMQ exchanges and routing, Kafka partitions and consumer groups, Redis Streams, NATS, Azure Service Bus, AWS SQS/SNS, ZeroMQ. When to use each, real-world examples, code in Python, .NET, and TypeScript, and a decision framework.

SystemForgeApril 21, 202625 min read
RabbitMQKafkaRedisNATSAzure Service BusMessagingEvent-Driven ArchitectureMicroservicesSystem DesignInterview Prep
Share:𝕏

Why Messaging Systems Exist

In a synchronous system, Service A calls Service B directly over HTTP. This is simple and easy to reason about — but it creates tight coupling:

  • If Service B is down, Service A fails
  • If Service B is slow, Service A is slow
  • If traffic spikes, both services must scale together
  • Adding Service C means changing Service A to call both B and C

A messaging system sits between producers and consumers. Service A publishes a message to the broker. The broker holds it. Service B (and C, and D) consume it when they are ready.

Without messaging (tight coupling):
Service A ──HTTP──► Service B  (if B is down, A fails)

With messaging (loose coupling):
Service A ──► Broker ──► Service B  (B can be down, message waits)
                    └──► Service C  (C added without changing A)
                    └──► Service D

The key question when choosing a messaging system: Are you distributing work (task queue), broadcasting events (pub/sub), or streaming data continuously (event stream)?


The Messaging Landscape

Task Queues (point-to-point, one consumer processes each message):
  RabbitMQ          → most flexible routing, enterprise-grade
  Azure Service Bus → managed, deep Azure integration
  AWS SQS           → managed, simplest, massive scale
  Redis Queues      → when Redis is already in the stack

Pub/Sub (fan-out, all subscribers receive each message):
  RabbitMQ fanout   → routing + fan-out combined
  Azure Service Bus Topics → managed pub/sub
  AWS SNS           → managed fan-out
  Redis Pub/Sub     → ephemeral, in-memory, no persistence
  NATS              → ultra-low latency, cloud-native

Event Streaming (ordered, replayable, retained log):
  Apache Kafka      → the industry standard for high-volume streams
  Azure Event Hubs  → managed Kafka-compatible
  AWS Kinesis       → managed streaming
  Redis Streams     → lightweight streaming when Redis is already there

Embedded / In-Process:
  ZeroMQ            → direct socket messaging, no broker
  MassTransit       → .NET abstraction over RabbitMQ/Azure/SQS
  MediatR           → in-process mediator pattern (.NET)

RabbitMQ — The Most Flexible Message Broker

RabbitMQ implements the AMQP (Advanced Message Queuing Protocol) standard. It is the most flexible general-purpose message broker — you can model almost any messaging pattern with it.

Core Concepts

Producer → Exchange → Binding → Queue → Consumer
  • Producer: publishes messages to an exchange (never directly to a queue)
  • Exchange: receives messages and routes them to queues based on rules
  • Binding: a rule linking an exchange to a queue (with an optional routing key)
  • Queue: holds messages until a consumer picks them up
  • Consumer: reads from a queue and acknowledges each message

The critical design choice in RabbitMQ is the exchange type. This determines how messages are routed.

Exchange Types

Direct Exchange — route by exact key:

Producer → [routing_key: "payment.processed"] → Direct Exchange
                                                    │
                        ┌───────────────────────────┤
                        ▼                           ▼
              Queue: payment-receipts      Queue: payment-audit
              (binding key: payment.processed)  (binding key: payment.processed)
Python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare exchange
channel.exchange_declare(exchange='payments', exchange_type='direct')

# Declare queues
channel.queue_declare(queue='payment-receipts', durable=True)
channel.queue_declare(queue='payment-audit', durable=True)

# Bind queues to exchange with routing key
channel.queue_bind(exchange='payments', queue='payment-receipts', routing_key='payment.processed')
channel.queue_bind(exchange='payments', queue='payment-audit', routing_key='payment.processed')

# Publish
channel.basic_publish(
    exchange='payments',
    routing_key='payment.processed',
    body='{"amount": 49.99, "currency": "GBP"}',
    properties=pika.BasicProperties(
        delivery_mode=pika.DeliveryMode.Persistent  # survive broker restart
    )
)

Fanout Exchange — broadcast to all queues:

Every queue bound to a fanout exchange receives every message, regardless of routing key.

Python
channel.exchange_declare(exchange='order-events', exchange_type='fanout')

# All three queues get every order event
channel.queue_bind(exchange='order-events', queue='inventory-service')
channel.queue_bind(exchange='order-events', queue='notification-service')
channel.queue_bind(exchange='order-events', queue='analytics-service')

# Publish  routing_key is ignored for fanout
channel.basic_publish(exchange='order-events', routing_key='', body='{"orderId": "ORD-123"}')

Real example: Deliveroo uses fanout for order status events — when an order is confirmed, the inventory service, courier assignment service, customer notification service, and analytics service all need to react.

Topic Exchange — route by pattern:

Topic exchanges route by routing key patterns using * (one word) and # (zero or more words).

Routing key pattern: ..

"order.created.uk"     → matches "order.#" and "*.created.*" and "order.created.uk"
"order.cancelled.us"   → matches "order.#" and "*.cancelled.*"
"payment.failed.eu"    → matches "payment.#" and "*.failed.*"
Python
channel.exchange_declare(exchange='events', exchange_type='topic')

# UK team: only UK events
channel.queue_bind(exchange='events', queue='uk-ops', routing_key='#.uk')

# Fraud team: all payment events everywhere
channel.queue_bind(exchange='events', queue='fraud-detection', routing_key='payment.#')

# Audit: everything
channel.queue_bind(exchange='events', queue='audit-log', routing_key='#')

# Publish UK order
channel.basic_publish(exchange='events', routing_key='order.created.uk', body='...')
#  goes to uk-ops and audit-log (not fraud-detection)

# Publish payment failure
channel.basic_publish(exchange='events', routing_key='payment.failed.eu', body='...')
#  goes to fraud-detection and audit-log (not uk-ops)

Headers Exchange — route by message headers (rarely used):

Routes based on message headers instead of routing key. Useful when the routing decision involves multiple attributes.

Python
channel.exchange_declare(exchange='priority-router', exchange_type='headers')

channel.queue_bind(
    exchange='priority-router',
    queue='high-priority-queue',
    arguments={'x-match': 'all', 'priority': 'high', 'region': 'eu'}
)

Consumer Acknowledgements — The Most Important RabbitMQ Concept

This is where most teams make mistakes.

Python
def process_payment(ch, method, properties, body):
    try:
        data = json.loads(body)
        process(data)
        
        # Acknowledge ONLY after successful processing
        ch.basic_ack(delivery_tag=method.delivery_tag)
        
    except Exception as e:
        # Negative acknowledge  requeue the message for retry
        # requeue=False sends to DLQ instead of retrying forever
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
        log.error(f"Payment processing failed: {e}")

# prefetch_count=1: don't give me a second message until I ack the first
# Without this, RabbitMQ floods the consumer with messages it can't process fast enough
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='payments', on_message_callback=process_payment)
channel.start_consuming()

What happens without proper acknowledgement:

  • Auto-ack mode: message deleted the moment it is delivered, even if processing crashes halfway through. Payment deducted, order never created.
  • Never acking: messages pile up as "unacknowledged", memory fills up, broker crashes.
  • Always requeue on failure: infinite retry loop on a bad message (poison pill) blocks the entire queue.

Dead Letter Exchange (DLQ in RabbitMQ)

Messages that fail processing land in the Dead Letter Exchange and get routed to a dead letter queue for inspection.

Python
# Main queue  messages that fail 3 times go to DLX
channel.queue_declare(
    queue='payments',
    durable=True,
    arguments={
        'x-dead-letter-exchange': 'payments.dlx',
        'x-dead-letter-routing-key': 'dead',
        'x-message-ttl': 300000,  # 5 minutes max in queue
        'x-max-length': 10000     # max 10k messages before dropping
    }
)

# Dead letter exchange and queue
channel.exchange_declare(exchange='payments.dlx', exchange_type='direct')
channel.queue_declare(queue='payments.dead', durable=True)
channel.queue_bind(exchange='payments.dlx', queue='payments.dead', routing_key='dead')

RabbitMQ in .NET with MassTransit

Raw RabbitMQ clients are verbose. MassTransit is the standard .NET abstraction:

C#
// Program.cs
builder.Services.AddMassTransit(x =>
{
    x.AddConsumer<PaymentProcessedConsumer>();
    
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("rabbitmq://localhost", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });
        
        cfg.ReceiveEndpoint("payment-processed", e =>
        {
            e.ConfigureConsumer<PaymentProcessedConsumer>(context);
            e.UseMessageRetry(r => r.Intervals(100, 500, 1000, 5000)); // retry with backoff
            e.UseDeadLetterQueue("payment-processed-dlq");
        });
    });
});

// Publishing a message
public class OrderService
{
    private readonly IPublishEndpoint _publishEndpoint;
    
    public async Task PlaceOrder(Order order)
    {
        await _orderRepository.SaveAsync(order);
        
        await _publishEndpoint.Publish(new OrderPlaced
        {
            OrderId = order.Id,
            CustomerId = order.CustomerId,
            Amount = order.Total,
            PlacedAt = DateTimeOffset.UtcNow
        });
    }
}

// Consumer
public class PaymentProcessedConsumer : IConsumer<PaymentProcessed>
{
    public async Task Consume(ConsumeContext<PaymentProcessed> context)
    {
        var payment = context.Message;
        await _orderService.MarkAsPaidAsync(payment.OrderId);
        // MassTransit auto-acks on success, auto-nacks on exception
    }
}

MassTransit handles: serialisation, retries with exponential backoff, dead letter routing, correlation IDs, and saga state machines — all things you would otherwise build manually.

RabbitMQ Real-World Examples

Shopify — order processing pipeline: When a Shopify order is placed, a message goes to a topic exchange. Multiple consumers process different aspects in parallel: inventory reservation, fraud check, payment capture, fulfilment routing, analytics ingestion. Each runs independently. If the analytics service is slow, it does not delay payment capture.

Healthcare booking (MyBCAT pattern): After a call ends in Amazon Connect → EventBridge event → Lambda → RabbitMQ message published. Three consumers pick it up: transcription service (DeepGram), CRM update service, billing service. The call handler Lambda returns immediately. All three process asynchronously.


Apache Kafka — Event Streaming at Scale

Kafka is fundamentally different from RabbitMQ. RabbitMQ is a message broker — messages are consumed and deleted. Kafka is an event log — events are appended to a log and retained.

RabbitMQ: Queue (messages consumed and gone)
  Producer → [msg1, msg2, msg3] → Consumer (reads, acks, deleted)

Kafka: Log (events retained, multiple consumers, replayable)
  Producer → [evt1, evt2, evt3, evt4, evt5] (retained 7 days)
                                               ↑
                              Consumer A: read up to evt3 (offset 3)
                              Consumer B: read up to evt5 (offset 5)
                              Consumer C: re-reading from evt1 (replaying)

Core Concepts

Topic: A named log. Producers write to a topic. Consumers read from a topic.

Partition: A topic is split into partitions. Each partition is an ordered, immutable sequence of events. Partitions enable parallelism — multiple consumers can read from different partitions simultaneously.

Offset: Every event in a partition has a sequential number (offset). Consumers track their position by offset. This is how Kafka enables replay — a consumer can reset its offset to 0 and re-read everything.

Consumer Group: A group of consumers that share the work of reading a topic. Each partition is assigned to exactly one consumer in the group. Add more consumers = more parallelism (up to the number of partitions).

Topic: payments (3 partitions)

Partition 0: [e0, e1, e2, e3, e4]
Partition 1: [e0, e1, e2]
Partition 2: [e0, e1, e2, e3]

Consumer Group A (payment-processor, 3 instances):
  Instance 1 → reads Partition 0
  Instance 2 → reads Partition 1
  Instance 3 → reads Partition 2

Consumer Group B (analytics, 1 instance):
  Instance 1 → reads ALL 3 partitions (one group, three partitions)

Both groups read independently. Group A processing payments does not affect Group B doing analytics.

Producing and Consuming in Python

Python
from confluent_kafka import Producer, Consumer

# Producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})

def delivery_callback(err, msg):
    if err:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Delivered to {msg.topic()} partition {msg.partition()} offset {msg.offset()}')

producer.produce(
    topic='payments',
    key='customer-123',          # same key  same partition (ordering per customer)
    value='{"amount": 49.99}',
    callback=delivery_callback
)
producer.flush()  # wait for all messages to be delivered

# Consumer
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'payment-processor',
    'auto.offset.reset': 'earliest',  # start from beginning if no committed offset
    'enable.auto.commit': False        # manual commit for exactly-once guarantees
})

consumer.subscribe(['payments'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        
        if msg is None:
            continue
        if msg.error():
            raise Exception(msg.error())
        
        data = json.loads(msg.value())
        process_payment(data)
        
        # Commit offset only after successful processing
        consumer.commit(asynchronous=False)
        
finally:
    consumer.close()

Kafka in .NET with Confluent.Kafka

C#
// Producer
var config = new ProducerConfig { BootstrapServers = "localhost:9092" };

using var producer = new ProducerBuilder<string, string>(config).Build();

var result = await producer.ProduceAsync("payments", new Message<string, string>
{
    Key = customerId,       // partition by customer for ordering
    Value = JsonSerializer.Serialize(paymentEvent)
});

Console.WriteLine($"Delivered to partition {result.Partition}, offset {result.Offset}");

// Consumer
var consumerConfig = new ConsumerConfig
{
    BootstrapServers = "localhost:9092",
    GroupId = "payment-processor",
    AutoOffsetReset = AutoOffsetReset.Earliest,
    EnableAutoCommit = false
};

using var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
consumer.Subscribe("payments");

while (!cancellationToken.IsCancellationRequested)
{
    var result = consumer.Consume(cancellationToken);
    
    var payment = JsonSerializer.Deserialize<PaymentEvent>(result.Message.Value);
    await ProcessPaymentAsync(payment);
    
    consumer.Commit(result);  // only commit after processing
}

When Kafka is the Right Choice

Kafka is the right choice when you need:

  1. Replay — re-process historical events (audit, reindex, debug production issues)
  2. Multiple independent consumers — analytics team reads same events as payment processor
  3. High throughput — millions of events per second
  4. Event sourcing — the event log IS the source of truth

Kafka is overkill when:

  • You have simple task distribution (use RabbitMQ or SQS)
  • Messages should be processed once and deleted
  • You need complex routing logic (RabbitMQ topic exchanges are better)
  • You have low volume (< 10k messages/day)

Real example — LinkedIn (Kafka's origin): LinkedIn built Kafka because they needed to capture every user action (view, click, search, connection request) for real-time analytics and feed ranking. The data volume was too high for a traditional message queue, and they needed multiple teams (analytics, recommendation engine, spam detection) to read the same event stream independently without affecting each other.

Real example — Uber: Uber uses Kafka to track driver and rider location updates. 1 million location updates per second across all drivers worldwide. Multiple consumers: surge pricing calculation, ETA calculation, dispatch matching, analytics. Each consumer reads independently.


Redis Pub/Sub and Redis Streams

Redis is primarily a cache, but it has two messaging features built in.

Redis Pub/Sub — Ephemeral Fan-Out

Redis Pub/Sub is the simplest possible pub/sub system: publish to a channel, all current subscribers receive it instantly. No persistence, no replay, no delivery guarantees.

Python
import redis

r = redis.Redis(host='localhost', port=6379)

# Publisher
r.publish('notifications', json.dumps({
    'type': 'appointment_confirmed',
    'patient_id': 'pat_123',
    'appointment_id': 'apt_456'
}))

# Subscriber
pubsub = r.pubsub()
pubsub.subscribe('notifications')

for message in pubsub.listen():
    if message['type'] == 'message':
        data = json.loads(message['data'])
        handle_notification(data)

The critical limitation: If a subscriber is offline when the message is published, the message is gone. There is no queue, no persistence, no retry.

When to use Redis Pub/Sub:

  • Real-time dashboard updates (if they miss one update, the next update will correct it)
  • Cache invalidation signals ("this key changed, invalidate your local copy")
  • Live chat presence ("user is typing")
  • WebSocket broadcast to connected users

When NOT to use Redis Pub/Sub:

  • Anything where message delivery matters (payments, orders, notifications that must arrive)
  • Long-running background jobs
  • Any situation where the consumer might be offline

Redis Streams — Lightweight Persistent Streaming

Redis Streams (added in Redis 5.0) is a persistent, replayable message log. It is essentially a lightweight Kafka inside Redis.

Python
r = redis.Redis(host='localhost', port=6379)

# Producer  XADD appends to the stream
r.xadd('appointments', {
    'event_type': 'appointment_confirmed',
    'appointment_id': 'apt_456',
    'patient_id': 'pat_123',
    'timestamp': '2026-04-21T09:00:00Z'
})

# Consumer with consumer group (like Kafka consumer groups)
# Create group if not exists
try:
    r.xgroup_create('appointments', 'notification-service', id='0', mkstream=True)
except redis.exceptions.ResponseError:
    pass  # group already exists

# Read messages (XREADGROUP  messages assigned to this consumer)
messages = r.xreadgroup(
    groupname='notification-service',
    consumername='instance-1',
    streams={'appointments': '>'},  # '>' means undelivered messages
    count=10,
    block=5000  # block for 5 seconds if no messages
)

for stream_name, events in messages:
    for event_id, data in events:
        try:
            send_confirmation_notification(data)
            r.xack('appointments', 'notification-service', event_id)  # acknowledge
        except Exception as e:
            log.error(f"Failed to process {event_id}: {e}")
            # Message stays pending  will be reassigned after timeout

Redis Streams vs Kafka:

| | Redis Streams | Kafka | |---|---|---| | Throughput | ~100k msg/sec | Millions/sec | | Retention | Up to Redis memory limit | Configurable (days/TB) | | Replay | Yes, from any ID | Yes, from any offset | | Consumer groups | Yes | Yes | | Cluster | Redis Cluster | Kafka Cluster | | Operational complexity | Low (Redis already in stack) | High (Zookeeper/KRaft) | | Use when | Redis already there, moderate volume | High volume, long retention needed |


NATS — The Lightweight Cloud-Native Broker

NATS is a messaging system built for cloud-native architectures. It is written in Go, extremely fast (tens of millions of messages per second), and has a very small footprint (single binary, ~10MB).

NATS is used heavily in Kubernetes environments for microservice-to-microservice communication.

Core NATS Patterns

Basic Pub/Sub:

Python
import asyncio
import nats

async def main():
    nc = await nats.connect("nats://localhost:4222")
    
    # Subscribe
    async def message_handler(msg):
        data = json.loads(msg.data.decode())
        print(f"Received on {msg.subject}: {data}")
    
    await nc.subscribe("appointments.confirmed", cb=message_handler)
    
    # Publish
    await nc.publish("appointments.confirmed", json.dumps({
        "appointment_id": "apt_456",
        "patient_id": "pat_123"
    }).encode())
    
    await asyncio.sleep(1)
    await nc.close()

asyncio.run(main())

Queue Groups (load balancing):

In NATS, multiple subscribers on the same subject all receive the message (pub/sub). Queue groups change this — only one member of the group receives each message.

Python
# Three instances of the notification service, all in the same queue group
# Each message goes to exactly ONE instance (load balanced)
await nc.subscribe("notifications", queue="notification-workers", cb=handler)

Request/Reply (RPC over NATS):

Python
# Server (handler)
async def handle_slot_check(msg):
    request = json.loads(msg.data)
    available_slots = get_available_slots(request['date'])
    await msg.respond(json.dumps(available_slots).encode())

await nc.subscribe("slots.available", cb=handle_slot_check)

# Client (requester)
response = await nc.request(
    "slots.available",
    json.dumps({"date": "2026-04-22", "practice_id": "prc_123"}).encode(),
    timeout=5.0
)
slots = json.loads(response.data)

JetStream — persistence for NATS:

NATS core is fire-and-forget (like Redis Pub/Sub). JetStream adds persistence, replay, and consumer groups:

Python
js = nc.jetstream()

# Create a stream (persistent log)
await js.add_stream(name="APPOINTMENTS", subjects=["appointments.>"])

# Publish to persistent stream
await js.publish("appointments.confirmed", json.dumps(event).encode())

# Durable consumer (survives restarts)
consumer = await js.subscribe(
    "appointments.confirmed",
    durable="notification-service",
    manual_ack=True
)

async for msg in consumer.messages:
    process(json.loads(msg.data))
    await msg.ack()

NATS real-world use:

  • Kubernetes sidecar communication (Istio-less service mesh)
  • IoT device messaging (millions of devices, lightweight protocol)
  • Financial trading platforms (ultra-low latency pub/sub)
  • Synadia (NATS creators) runs the global NATS cloud infrastructure

Azure Service Bus — Managed Enterprise Messaging

Azure Service Bus is the managed messaging service on Azure. It supports both queues (point-to-point) and topics/subscriptions (pub/sub).

Queues vs Topics

Queue (point-to-point):
  Sender → Queue → One Receiver

Topic (pub/sub):
  Sender → Topic → Subscription 1 → Receiver A
                 → Subscription 2 → Receiver B
                 → Subscription 3 → Receiver C (with filter: only type=urgent)

.NET with Azure.Messaging.ServiceBus

C#
// Sending a message
var client = new ServiceBusClient(connectionString);
var sender = client.CreateSender("appointment-events");

var message = new ServiceBusMessage(JsonSerializer.Serialize(appointmentEvent))
{
    ContentType = "application/json",
    MessageId = Guid.NewGuid().ToString(),   // idempotency
    Subject = "appointment.confirmed",
    TimeToLive = TimeSpan.FromHours(24),     // expire unprocessed messages
    ScheduledEnqueueTime = DateTimeOffset.UtcNow.AddMinutes(5) // delay delivery
};

// Add custom properties for subscription filters
message.ApplicationProperties.Add("region", "uk");
message.ApplicationProperties.Add("priority", "high");

await sender.SendMessageAsync(message);

// Receiving messages (processor — recommended for production)
var processor = client.CreateProcessor("appointment-events", "notification-subscription", new ServiceBusProcessorOptions
{
    MaxConcurrentCalls = 5,
    AutoCompleteMessages = false  // manual complete for control
});

processor.ProcessMessageAsync += async (args) =>
{
    var appointment = JsonSerializer.Deserialize<AppointmentEvent>(args.Message.Body);
    
    try
    {
        await SendConfirmationAsync(appointment);
        await args.CompleteMessageAsync(args.Message);  // remove from queue
    }
    catch (TransientException)
    {
        await args.AbandonMessageAsync(args.Message);   // retry later
    }
    catch (PermanentException)
    {
        await args.DeadLetterMessageAsync(args.Message, "PermanentFailure", ex.Message);
    }
};

processor.ProcessErrorAsync += async (args) =>
{
    log.LogError(args.Exception, "Message processing error");
};

await processor.StartProcessingAsync();

Subscription Filters

Subscription filters are one of Azure Service Bus's most powerful features — they let different subscribers receive different subsets of messages from the same topic.

C#
// Create subscription with SQL filter
var adminClient = new ServiceBusAdministrationClient(connectionString);

// UK team: only UK messages
await adminClient.CreateRuleAsync("appointment-events", "uk-subscription", new CreateRuleOptions
{
    Name = "uk-only",
    Filter = new SqlRuleFilter("region = 'uk'")
});

// High priority: urgent messages
await adminClient.CreateRuleAsync("appointment-events", "urgent-subscription", new CreateRuleOptions
{
    Name = "high-priority",
    Filter = new SqlRuleFilter("priority = 'high' AND event_type = 'appointment.cancelled'")
});

// Correlation filter (faster, index-based):
await adminClient.CreateRuleAsync("appointment-events", "payment-subscription", new CreateRuleOptions
{
    Name = "payments-only",
    Filter = new CorrelationRuleFilter { Subject = "payment.processed" }
});

Azure equivalent of RabbitMQ topic exchange: Azure Service Bus topics with SQL filters.

Sessions — Ordered Processing

Service Bus Sessions guarantee message ordering for a session ID. All messages with the same session ID are delivered to the same consumer in order.

C#
// Sending with session (all messages for one patient go to same consumer)
var message = new ServiceBusMessage(payload)
{
    SessionId = $"patient-{patientId}"  // all this patient's messages → same consumer instance
};

// Session-aware receiver
var sessionProcessor = client.CreateSessionProcessor("appointment-events", new ServiceBusSessionProcessorOptions
{
    MaxConcurrentSessions = 10
});

sessionProcessor.ProcessMessageAsync += async (args) =>
{
    // All messages for the same SessionId arrive here in order
    // args.SessionId contains the session identifier
};

AWS SQS and SNS — Managed Cloud Messaging

(Covered in depth in the Event-Driven Architecture guide. Summary here for completeness.)

SQS Patterns in Code

Python
import boto3

sqs = boto3.client('sqs', region_name='eu-west-1')

# Send message with deduplication (FIFO queue)
response = sqs.send_message(
    QueueUrl='https://sqs.eu-west-1.amazonaws.com/123456789/appointments.fifo',
    MessageBody=json.dumps(appointment_event),
    MessageGroupId=f'practice-{practice_id}',       # ordering per practice
    MessageDeduplicationId=idempotency_key           # prevent duplicates
)

# Receive and process
response = sqs.receive_message(
    QueueUrl=queue_url,
    MaxNumberOfMessages=10,
    WaitTimeSeconds=20,         # long polling (cheaper, faster than short poll)
    VisibilityTimeout=60        # 60s to process before message reappears
)

for message in response.get('Messages', []):
    try:
        process(json.loads(message['Body']))
        sqs.delete_message(
            QueueUrl=queue_url,
            ReceiptHandle=message['ReceiptHandle']
        )
    except Exception:
        # Don't delete → message reappears after VisibilityTimeout → retry
        pass

SNS Fan-Out with Filters

Python
sns = boto3.client('sns', region_name='eu-west-1')

# Publish with message attributes for filtering
sns.publish(
    TopicArn='arn:aws:sns:eu-west-1:123456789:appointment-events',
    Message=json.dumps(event),
    MessageAttributes={
        'event_type': {
            'DataType': 'String',
            'StringValue': 'appointment.confirmed'
        },
        'region': {
            'DataType': 'String',
            'StringValue': 'uk'
        }
    }
)
HCL
# Terraform  SNS subscription filter (only UK confirmations)
resource "aws_sns_topic_subscription" "uk_notifications" {
  topic_arn = aws_sns_topic.appointment_events.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.uk_notifications.arn

  filter_policy = jsonencode({
    event_type = ["appointment.confirmed"]
    region     = ["uk"]
  })
}

ZeroMQ — No-Broker Messaging

ZeroMQ is different from everything above — there is no broker. Messages are sent directly between processes over sockets, using TCP, IPC (inter-process communication), or in-process.

This makes it extremely fast (no network hop to a broker) but also means there is no persistence, no management UI, and no delivery guarantees beyond the socket layer.

Python
import zmq

# Pub side
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")

socket.send_string("temperature 22.5")  # topic prefix
socket.send_string("humidity 65.2")

# Sub side
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt_string(zmq.SUBSCRIBE, "temperature")  # only temperature messages

while True:
    message = socket.recv_string()
    print(f"Received: {message}")

Push/Pull — work distribution without a broker:

Python
# Worker distributor (PUSH)
context = zmq.Context()
pusher = context.socket(zmq.PUSH)
pusher.bind("tcp://*:5557")

for task in tasks:
    pusher.send_json(task)

# Worker (PULL)
puller = context.socket(zmq.PULL)
puller.connect("tcp://localhost:5557")

while True:
    task = puller.recv_json()
    process(task)

When ZeroMQ makes sense:

  • High-frequency sensor data (IoT, trading) where broker overhead is unacceptable
  • Inter-process communication on the same machine
  • Embedded systems where running a broker is not possible
  • When you need microsecond latency

When ZeroMQ is wrong:

  • Any situation where message loss is unacceptable
  • Cross-datacenter messaging
  • When you need routing, filtering, or persistence

MediatR — In-Process Messaging (.NET)

MediatR is not a distributed message broker. It is an in-process mediator that decouples the caller from the handler within a single application. It implements the CQRS pattern inside one process.

C#
// Command (write)
public record BookAppointmentCommand(
    string SlotId, 
    string PatientId, 
    string Reason
) : IRequest<AppointmentResult>;

// Command handler
public class BookAppointmentHandler : IRequestHandler<BookAppointmentCommand, AppointmentResult>
{
    private readonly IAppointmentRepository _repo;
    private readonly IPublishEndpoint _bus;  // RabbitMQ/SBus for external events
    
    public async Task<AppointmentResult> Handle(
        BookAppointmentCommand command, 
        CancellationToken ct)
    {
        var appointment = await _repo.BookSlotAsync(command.SlotId, command.PatientId);
        
        // Publish domain event within same process (in-process)
        await _mediator.Publish(new AppointmentBookedEvent(appointment.Id));
        
        return new AppointmentResult(appointment.Id, appointment.ConfirmationNumber);
    }
}

// Notification handler (in-process subscriber)
public class SendConfirmationOnBooking : INotificationHandler<AppointmentBookedEvent>
{
    public async Task Handle(AppointmentBookedEvent notification, CancellationToken ct)
    {
        // Runs in the same HTTP request lifecycle
        await _emailService.SendConfirmationAsync(notification.AppointmentId);
    }
}

// Controller — just dispatches, knows nothing about handlers
[HttpPost("appointments")]
public async Task<IActionResult> Book([FromBody] BookAppointmentRequest request)
{
    var result = await _mediator.Send(new BookAppointmentCommand(
        request.SlotId, 
        request.PatientId, 
        request.Reason
    ));
    
    return Created($"/appointments/{result.Id}", result);
}

MediatR pipeline behaviors (middleware for commands):

C#
public class ValidationBehavior<TRequest, TResponse> 
    : IPipelineBehavior<TRequest, TResponse>
{
    private readonly IEnumerable<IValidator<TRequest>> _validators;
    
    public async Task<TResponse> Handle(
        TRequest request, 
        RequestHandlerDelegate<TResponse> next, 
        CancellationToken ct)
    {
        var failures = _validators
            .Select(v => v.Validate(request))
            .SelectMany(r => r.Errors)
            .Where(f => f != null)
            .ToList();
        
        if (failures.Any())
            throw new ValidationException(failures);
        
        return await next();  // continue to actual handler
    }
}

// Register behaviors (run in order: logging → validation → handler)
builder.Services.AddMediatR(cfg =>
{
    cfg.RegisterServicesFromAssembly(typeof(Program).Assembly);
    cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(LoggingBehavior<,>));
    cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(ValidationBehavior<,>));
});

Comparing All Messaging Systems

| System | Type | Persistence | Throughput | Routing | Managed | Best For | |--------|------|-------------|------------|---------|---------|----------| | RabbitMQ | Broker | Yes | High | Very flexible (exchanges) | Self-hosted (CloudAMQP managed) | Complex routing, enterprise workflows | | Kafka | Event log | Yes (retained) | Very high | By partition key | Confluent Cloud | Event streaming, replay, multiple consumers | | Azure Service Bus | Broker | Yes | High | SQL filters, sessions | Yes (Azure) | Enterprise .NET, Azure ecosystem | | AWS SQS | Queue | Yes | Very high | None (basic) | Yes (AWS) | Simple decoupling, Lambda triggers | | AWS SNS | Pub/sub | No | Very high | Attribute filters | Yes (AWS) | Fan-out, cross-service broadcast | | Redis Pub/Sub | Pub/sub | No | Very high | Channel name | Redis Cloud | Cache invalidation, ephemeral real-time | | Redis Streams | Event log | Yes (in memory) | High | Consumer groups | Redis Cloud | When Redis already in stack, moderate volume | | NATS | Broker + log | Optional (JetStream) | Very high | Subject wildcards | Synadia Cloud | Cloud-native, Kubernetes, IoT, low latency | | ZeroMQ | Brokerless | No | Extreme | None | Self-hosted | Sensor data, inter-process, embedded | | MediatR | In-process | No | In-process | Handler types | N/A | CQRS within a monolith or service |


Decision Framework

Ask these questions in order:

1. Does it need to survive restarts and be replayable?
   No  → Redis Pub/Sub, NATS core, ZeroMQ
   Yes → continue

2. Is it one message → one processor (task queue)?
   Yes → RabbitMQ, Azure Service Bus Queue, AWS SQS
   No  → continue

3. Is it one event → many consumers (fan-out)?
   Yes, complex routing needed → RabbitMQ topic exchange, Azure Service Bus Topics
   Yes, simple → AWS SNS, Azure Event Grid
   No  → continue

4. Is it a continuous stream with multiple independent consumers and replay?
   Yes, high volume → Apache Kafka, Azure Event Hubs
   Yes, moderate volume → Redis Streams, NATS JetStream

5. Are you on Azure?
   → Azure Service Bus (queues for tasks, topics for events)
   → Azure Event Hubs if Kafka-compatible streaming

6. Are you on AWS?
   → SQS for task queues
   → SNS + SQS for fan-out
   → Kinesis or MSK (managed Kafka) for streaming

7. Is this within a single .NET application?
   → MediatR for in-process CQRS

Production Patterns

Pattern 1: The Outbox + RabbitMQ (Guaranteed Delivery)

Python
# Inside a database transaction:
with db.transaction():
    db.execute("INSERT INTO appointments ...")
    db.execute("""
        INSERT INTO outbox (event_type, payload, status, created_at)
        VALUES ('appointment.confirmed', ?, 'PENDING', NOW())
    """, [json.dumps(event)])

# Separate outbox poller (runs every second):
def publish_outbox_events():
    events = db.query("SELECT * FROM outbox WHERE status = 'PENDING' LIMIT 100")
    
    for event in events:
        try:
            channel.basic_publish(
                exchange='appointments',
                routing_key=event['event_type'],
                body=event['payload']
            )
            db.execute("UPDATE outbox SET status='PUBLISHED' WHERE id=?", [event['id']])
        except Exception as e:
            db.execute("UPDATE outbox SET retries = retries + 1 WHERE id=?", [event['id']])

This guarantees exactly-once publish: the appointment is created AND the event is queued atomically. The broker being down does not lose the event.

Pattern 2: Competing Consumers with Backoff

Python
import time
import random

def process_with_backoff(max_retries=5):
    retries = 0
    
    def callback(ch, method, properties, body):
        nonlocal retries
        try:
            process(json.loads(body))
            ch.basic_ack(delivery_tag=method.delivery_tag)
            retries = 0
            
        except RetryableException as e:
            retries += 1
            if retries > max_retries:
                # Give up, send to DLQ
                ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
                return
            
            # Exponential backoff with jitter
            delay = (2 ** retries) + random.uniform(0, 1)
            time.sleep(delay)
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
            
        except PermanentException:
            # No retry  straight to DLQ
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
    
    return callback

Pattern 3: Saga — Distributed Transactions Over Messaging

A saga coordinates a multi-step transaction where each step is a separate service.

Order Placement Saga:
  1. Payment Service: charge card     → success → continue
  2. Inventory Service: reserve items → success → continue
  3. Fulfilment Service: create order → success → done

  If step 3 fails:
  → Compensation: Inventory Service: release reservation
  → Compensation: Payment Service: refund charge

Using MassTransit Saga State Machine:

C#
public class OrderSaga : MassTransitStateMachine<OrderSagaState>
{
    public State Pending { get; set; }
    public State PaymentTaken { get; set; }
    public State InventoryReserved { get; set; }
    public State Completed { get; set; }
    public State Failed { get; set; }

    public Event<OrderPlaced> OrderPlaced { get; set; }
    public Event<PaymentProcessed> PaymentProcessed { get; set; }
    public Event<PaymentFailed> PaymentFailed { get; set; }
    public Event<InventoryReserved> InventoryReserved { get; set; }

    public OrderSaga()
    {
        InstanceState(x => x.CurrentState);

        Initially(
            When(OrderPlaced)
                .Then(ctx => ctx.Saga.OrderId = ctx.Message.OrderId)
                .PublishAsync(ctx => ctx.Init<ProcessPayment>(new { ctx.Message.Amount }))
                .TransitionTo(Pending)
        );

        During(Pending,
            When(PaymentProcessed)
                .PublishAsync(ctx => ctx.Init<ReserveInventory>(new { ctx.Saga.OrderId }))
                .TransitionTo(PaymentTaken),
            When(PaymentFailed)
                .TransitionTo(Failed)
        );

        During(PaymentTaken,
            When(InventoryReserved)
                .PublishAsync(ctx => ctx.Init<CreateFulfilment>(new { ctx.Saga.OrderId }))
                .TransitionTo(Completed)
        );
    }
}

Interview Answers

"RabbitMQ vs Kafka — when do you use each?"

"The core difference is what happens to a message after it is consumed. In RabbitMQ, a message is deleted after the consumer acknowledges it — it is a task queue. In Kafka, a message is retained in a log for a configured retention period, and multiple independent consumer groups can read it at different offsets — it is an event stream.

I use RabbitMQ when I have discrete units of work to distribute — booking a slot, sending a notification, processing a payment. I use Kafka when I need multiple teams or services to independently consume the same events, when I need to replay events for debugging or reindexing, or when I am processing millions of events per second. For most application-level messaging, RabbitMQ is simpler and sufficient. Kafka is not a drop-in replacement — it is a different tool for a different problem."

"What is the difference between pub/sub and a task queue?"

"In a task queue, each message is processed by exactly one consumer. The queue distributes work — if I have 10 jobs and 3 workers, each job goes to one worker. In pub/sub, each message goes to all subscribers simultaneously. If I publish an OrderPlaced event, the inventory service, the notification service, and the analytics service all receive it independently.

The practical consequence: task queues are for distributing work, pub/sub is for broadcasting events. An order confirmation email is a task — one worker sends it. An order placed event is a pub/sub notification — every service that cares gets a copy."

"How do you guarantee a message is not lost?"

"Three things. First, durable/persistent messages — the message is written to disk on the broker so a broker restart doesn't lose it. Second, manual acknowledgement — the consumer only acknowledges the message after successfully processing it, so a consumer crash before processing is complete doesn't lose it. Third, the Outbox Pattern — publish the event inside the same database transaction that creates the data, then a separate process reads from the outbox and publishes to the broker. This means the data write and the event publication are atomic — they either both happen or neither does."

RabbitMQ & Messaging Knowledge Check

5 questions · Test what you just learned · Instant explanations

Enjoyed this article?

Explore the Backend Systems learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.