Learnixo
Back to blog
Backend Systemsintermediate

Message Queues in .NET: RabbitMQ, Azure Service Bus, and MassTransit

Add asynchronous messaging to your .NET backend. Covers message queue fundamentals, RabbitMQ setup, Azure Service Bus, MassTransit abstractions, consumers, sagas, and production patterns.

LearnixoJune 3, 20268 min read
.NETC#RabbitMQAzure Service BusMassTransitMessagingMicroservices
Share:𝕏

Why Message Queues?

Synchronous HTTP calls couple services tightly: if the downstream service is slow or down, the upstream call fails. Message queues decouple producers from consumers.

Without queues (tight coupling):
OrderService → HTTP → EmailService
              ↳ EmailService down → order fails

With a queue (loose coupling):
OrderService → Queue → EmailService
              ↳ EmailService down → message waits in queue, order succeeds

When to use message queues:

  • Work that doesn't need to complete before the response (send email, update analytics)
  • Fan-out — one event consumed by multiple services
  • Rate smoothing — absorb traffic spikes, consumers process at their own pace
  • Reliability — guaranteed delivery, retries, dead-letter queues

Core Concepts

| Concept | Meaning | |---|---| | Producer | Sends messages to the queue/topic | | Consumer | Receives and processes messages | | Queue | Point-to-point: one consumer gets each message | | Topic / Exchange | Pub/sub: message goes to all subscribers | | Acknowledgement | Consumer tells broker "I processed it" | | Dead Letter Queue (DLQ) | Messages that failed processing after retries | | Poison message | Message that always fails processing | | Idempotency | Processing the same message twice has the same effect as once |


RabbitMQ

RabbitMQ uses the AMQP protocol. Producers publish to exchanges, which route to queues via bindings.

Exchange Types

| Type | Routing | Use case | |---|---|---| | Direct | Exact routing key match | Point-to-point | | Fanout | Broadcast to all bound queues | Notifications | | Topic | Wildcard routing key | Category-based routing | | Headers | Header attributes | Complex routing |

Setup

Bash
# Run RabbitMQ locally
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
# Management UI: http://localhost:15672 (guest/guest)
Bash
dotnet add package RabbitMQ.Client

Basic Publish

C#
using RabbitMQ.Client;
using System.Text.Json;

public class OrderPublisher : IDisposable
{
    private readonly IConnection _connection;
    private readonly IChannel _channel;

    public OrderPublisher(string connectionString)
    {
        var factory = new ConnectionFactory { Uri = new Uri(connectionString) };
        _connection = factory.CreateConnectionAsync().GetAwaiter().GetResult();
        _channel    = _connection.CreateChannelAsync().GetAwaiter().GetResult();

        _channel.QueueDeclareAsync("orders", durable: true, exclusive: false,
            autoDelete: false).GetAwaiter().GetResult();
    }

    public async Task PublishAsync(OrderCreatedEvent @event)
    {
        var body = JsonSerializer.SerializeToUtf8Bytes(@event);
        var props = new BasicProperties { Persistent = true }; // survive broker restart

        await _channel.BasicPublishAsync(
            exchange: "",
            routingKey: "orders",
            mandatory: false,
            basicProperties: props,
            body: body);
    }

    public void Dispose()
    {
        _channel.Dispose();
        _connection.Dispose();
    }
}

Basic Consumer

C#
using RabbitMQ.Client.Events;

public class OrderConsumer : BackgroundService
{
    private readonly IConnectionFactory _factory;
    private readonly ILogger<OrderConsumer> _logger;

    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        var connection = await _factory.CreateConnectionAsync(ct);
        var channel    = await connection.CreateChannelAsync(cancellationToken: ct);

        await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 10, global: false, ct);

        var consumer = new AsyncEventingBasicConsumer(channel);
        consumer.ReceivedAsync += async (_, ea) =>
        {
            try
            {
                var @event = JsonSerializer.Deserialize<OrderCreatedEvent>(ea.Body.Span);
                await HandleAsync(@event!, ct);
                await channel.BasicAckAsync(ea.DeliveryTag, multiple: false, ct);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Failed to process message");
                // Nack and requeue=false → goes to DLQ if configured
                await channel.BasicNackAsync(ea.DeliveryTag, multiple: false, requeue: false, ct);
            }
        };

        await channel.BasicConsumeAsync("orders", autoAck: false, consumer, ct);
        await Task.Delay(Timeout.Infinite, ct); // keep running
    }

    private Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
    {
        // business logic
        return Task.CompletedTask;
    }
}

Azure Service Bus

Microsoft's managed messaging service. Two main primitives: Queues (point-to-point) and Topics + Subscriptions (pub/sub).

Bash
dotnet add package Azure.Messaging.ServiceBus

Send a Message

C#
public class OrderPublisher
{
    private readonly ServiceBusSender _sender;

    public OrderPublisher(ServiceBusClient client)
    {
        _sender = client.CreateSender("orders-queue");
    }

    public async Task PublishAsync(OrderCreatedEvent @event, CancellationToken ct)
    {
        var body    = JsonSerializer.SerializeToUtf8Bytes(@event);
        var message = new ServiceBusMessage(body)
        {
            ContentType    = "application/json",
            MessageId      = @event.OrderId.ToString(),   // for deduplication
            CorrelationId  = @event.CorrelationId,
            Subject        = nameof(OrderCreatedEvent)
        };

        await _sender.SendMessageAsync(message, ct);
    }
}

Receive Messages

C#
public class OrderConsumer : BackgroundService
{
    private readonly ServiceBusProcessor _processor;
    private readonly ILogger<OrderConsumer> _logger;

    public OrderConsumer(ServiceBusClient client, ILogger<OrderConsumer> logger)
    {
        _logger    = logger;
        _processor = client.CreateProcessor("orders-queue", new ServiceBusProcessorOptions
        {
            MaxConcurrentCalls   = 10,
            AutoCompleteMessages = false  // we'll ack manually
        });
    }

    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        _processor.ProcessMessageAsync += HandleMessageAsync;
        _processor.ProcessErrorAsync   += HandleErrorAsync;
        await _processor.StartProcessingAsync(ct);
        await Task.Delay(Timeout.Infinite, ct);
    }

    private async Task HandleMessageAsync(ProcessMessageEventArgs args)
    {
        var @event = args.Message.Body.ToObjectFromJson<OrderCreatedEvent>();
        await ProcessOrderAsync(@event, args.CancellationToken);
        await args.CompleteMessageAsync(args.Message);
    }

    private Task HandleErrorAsync(ProcessErrorEventArgs args)
    {
        _logger.LogError(args.Exception, "Service Bus error: {Source}", args.ErrorSource);
        return Task.CompletedTask;
    }

    public override async Task StopAsync(CancellationToken ct)
    {
        await _processor.StopProcessingAsync(ct);
        await base.StopAsync(ct);
    }
}

Register in DI

C#
builder.Services.AddSingleton(new ServiceBusClient(
    builder.Configuration["ServiceBus:ConnectionString"]));
builder.Services.AddHostedService<OrderConsumer>();

MassTransit — The Abstraction Layer

Working directly with RabbitMQ or Service Bus is verbose. MassTransit provides a consistent API over both, plus built-in retry, sagas, and outbox.

Bash
dotnet add package MassTransit
dotnet add package MassTransit.RabbitMQ
# or
dotnet add package MassTransit.Azure.ServiceBus.Core

Setup

C#
builder.Services.AddMassTransit(x =>
{
    x.AddConsumer<OrderCreatedConsumer>();

    x.UsingRabbitMq((ctx, cfg) =>
    {
        cfg.Host("rabbitmq://localhost");

        cfg.ReceiveEndpoint("orders", e =>
        {
            e.ConfigureConsumer<OrderCreatedConsumer>(ctx);
            e.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5)));
            e.UseInMemoryOutbox(ctx);
        });
    });
});

Define Messages as Interfaces

C#
// Messages are interfaces — MassTransit generates the implementation
public interface OrderCreated
{
    Guid   OrderId    { get; }
    string CustomerId { get; }
    decimal Amount    { get; }
}

Publish

C#
public class OrderService
{
    private readonly IPublishEndpoint _publish;

    public OrderService(IPublishEndpoint publish) => _publish = publish;

    public async Task CreateOrderAsync(CreateOrderCommand cmd, CancellationToken ct)
    {
        // ... save order ...
        await _publish.Publish<OrderCreated>(new
        {
            OrderId    = Guid.NewGuid(),
            cmd.CustomerId,
            cmd.Amount
        }, ct);
    }
}

Consumer

C#
public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
    private readonly IEmailService _email;

    public OrderCreatedConsumer(IEmailService email) => _email = email;

    public async Task Consume(ConsumeContext<OrderCreated> context)
    {
        await _email.SendOrderConfirmationAsync(
            context.Message.CustomerId,
            context.Message.OrderId,
            context.CancellationToken);
    }
}

Retry and Dead Letter Queues

MassTransit Retry Policy

C#
cfg.ReceiveEndpoint("orders", e =>
{
    // Immediate retries — fast transient faults
    e.UseMessageRetry(r => r
        .Immediate(3)
        .Ignore<ArgumentException>());   // don't retry argument errors

    // Exponential backoff for downstream dependency issues
    e.UseMessageRetry(r => r
        .Exponential(5, TimeSpan.FromSeconds(1), TimeSpan.FromMinutes(5), TimeSpan.FromSeconds(2)));
});

After all retries are exhausted, MassTransit moves the message to orders_error (the dead-letter queue).

Checking the DLQ

C#
// Requeue from DLQ after fixing the bug
cfg.ReceiveEndpoint("orders_error", e =>
{
    e.ConfigureConsumer<DeadLetterReprocessConsumer>(ctx);
});

Idempotency — Handle Duplicate Messages

Networks retry. Brokers retry. Your consumers will receive the same message more than once. Design for it.

C#
public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
    private readonly AppDbContext _db;

    public async Task Consume(ConsumeContext<OrderCreated> context)
    {
        var messageId = context.MessageId ?? Guid.NewGuid();

        // Check if already processed
        var alreadyProcessed = await _db.ProcessedMessages
            .AnyAsync(m => m.MessageId == messageId);

        if (alreadyProcessed) return; // idempotent — skip duplicate

        await ProcessOrderAsync(context.Message);

        _db.ProcessedMessages.Add(new ProcessedMessage { MessageId = messageId });
        await _db.SaveChangesAsync();
    }
}

Outbox Pattern (Transactional Messaging)

The problem: you save to the database and then publish a message. If the publish fails, your database is updated but no event was sent. The outbox pattern solves this by writing the message to the database in the same transaction, then publishing from there.

C#
// MassTransit has built-in outbox with EF Core
builder.Services.AddMassTransit(x =>
{
    x.AddEntityFrameworkOutbox<AppDbContext>(o =>
    {
        o.UseSqlServer();
        o.DuplicateDetectionWindow = TimeSpan.FromMinutes(30);
    });
});

// In your service — message is saved to DB atomically with your entity
public async Task CreateOrderAsync(CreateOrderCommand cmd)
{
    var order = new Order(cmd);
    _db.Orders.Add(order);

    await _publishEndpoint.Publish<OrderCreated>(new { order.Id, cmd.CustomerId });

    await _db.SaveChangesAsync(); // message + order saved in one transaction
}

Production Checklist

  • Durable queues — survive broker restarts
  • Persistent messages — survive broker restarts
  • Manual acknowledgement — don't ack until processing succeeds
  • Prefetch count — limit messages in-flight per consumer
  • Dead letter queue — capture poison messages
  • Retry with backoff — don't hammer failing dependencies
  • Idempotent consumers — handle duplicates safely
  • Outbox pattern — avoid losing events on DB+publish failures
  • Schema versioning — add fields as optional, never remove or rename
  • Correlation IDs — trace messages across services

Interview Questions

Q: What is the difference between a queue and a topic? A queue is point-to-point: each message is consumed by exactly one consumer. A topic is pub/sub: each message is delivered to all subscribers. Use queues for work distribution; use topics for event broadcasting.

Q: What is a dead letter queue? A queue where messages are moved after failing processing (exhausted retries, message expiry, or explicit rejection). It prevents poison messages from blocking the main queue and gives you a place to investigate and reprocess failures.

Q: What is idempotency and why does it matter in messaging? Processing the same message twice produces the same result as processing it once. It matters because message brokers guarantee at-least-once delivery — consumers will occasionally receive duplicates. Without idempotency, duplicates cause double-charges, duplicate emails, or corrupted state.

Q: What is the outbox pattern? Writing outbound messages to a database table in the same transaction as your business data. A background relay then reads from the outbox and publishes to the broker. This guarantees you never update the database without sending the event, and vice versa.

Q: When would you choose Azure Service Bus over RabbitMQ? Azure Service Bus when: you're in Azure and want a managed service, you need sessions (ordered processing per key), message scheduling, or long message TTL. RabbitMQ when: you need self-hosted control, complex exchange routing, or lower latency with high throughput and you manage your own infrastructure.

RabbitMQ & Messaging Knowledge Check

5 questions · Test what you just learned · Instant explanations

Enjoyed this article?

Explore the Backend Systems learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.