Learnixo
Back to blog
Backend Systemsadvanced

Message Broker Patterns in .NET — MassTransit, Competing Consumers, Dead Letters

Production messaging patterns in .NET with MassTransit: competing consumers, publish-subscribe, dead letter queues, sagas, outbox pattern, and RabbitMQ vs Azure Service Bus configuration.

Asma Hafeez KhanMay 25, 20266 min read
.NETC#MassTransitRabbitMQAzure Service Busmessagingmicroservices
Share:𝕏

Message Broker Patterns in .NET — MassTransit, Competing Consumers, Dead Letters

Async messaging decouples services: the sender doesn't wait for the receiver. MassTransit is the de-facto .NET messaging library — it abstracts RabbitMQ, Azure Service Bus, Amazon SQS, and others behind a consistent API.


Core Messaging Patterns

Pattern 1 — Command (point-to-point):
  OrderService → [order.process] → PaymentService
  One sender, one receiver. Used to tell a specific service to do something.

Pattern 2 — Event (publish-subscribe):
  OrderService publishes OrderPlaced
  → PaymentService subscribes
  → NotificationService subscribes
  → AnalyticsService subscribes
  One sender, many receivers. Used to announce that something happened.

Pattern 3 — Request-Response:
  ApiService sends GetOrderStatus, waits for OrderStatusResponse
  Like an HTTP call but over the message bus. Use sparingly.

Pattern 4 — Competing Consumers:
  Multiple instances of a service all consume from the same queue
  → Load balancing with durability
  → Scale by adding more consumer instances

Step 1: Install MassTransit

XML
<PackageReference Include="MassTransit"              Version="8.*" />
<PackageReference Include="MassTransit.RabbitMQ"     Version="8.*" />
<!-- OR Azure Service Bus: -->
<PackageReference Include="MassTransit.Azure.ServiceBus.Core" Version="8.*" />

Step 2: Register with RabbitMQ

C#
// Program.cs
builder.Services.AddMassTransit(x =>
{
    // Register all consumers in this assembly
    x.AddConsumers(typeof(Program).Assembly);

    x.UsingRabbitMq((ctx, cfg) =>
    {
        cfg.Host("rabbitmq://localhost", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        // Configure message retry at the transport level
        cfg.UseMessageRetry(r =>
        {
            r.Exponential(5,
                minInterval:  TimeSpan.FromSeconds(1),
                maxInterval:  TimeSpan.FromMinutes(5),
                intervalDelta: TimeSpan.FromSeconds(2));

            // Don't retry on domain exceptions
            r.Ignore<ArgumentException>();
            r.Ignore<InvalidOperationException>();
        });

        // Inboxes and outboxes for exactly-once processing
        cfg.UseInMemoryOutbox(ctx);

        // Map consumers to queues
        cfg.ConfigureEndpoints(ctx);
    });
});

Step 3: Competing Consumers

C#
// Multiple instances of this consumer all read from the same queue
// RabbitMQ delivers each message to exactly one consumer — load balanced
[Consumer("order-processing")]
public class ProcessOrderConsumer(
    IOrderRepository orders,
    IInventoryService inventory,
    IPublishEndpoint bus)
    : IConsumer<ProcessOrder>
{
    public async Task Consume(ConsumeContext<ProcessOrder> context)
    {
        var cmd = context.Message;

        // Idempotency check — message may be delivered more than once
        if (await orders.ExistsAsync(cmd.OrderId, context.CancellationToken))
        {
            // Already processed — acknowledge without re-processing
            return;
        }

        await orders.CreateAsync(new Order
        {
            Id         = cmd.OrderId,
            CustomerId = cmd.CustomerId,
            Lines      = cmd.Lines,
            Total      = cmd.Total,
        }, context.CancellationToken);

        await bus.Publish(new OrderCreated(cmd.OrderId, cmd.CustomerId), context.CancellationToken);
    }
}

// Scale consumers via DI — each service instance gets its own consumer
// The queue is shared; messages are distributed across instances

Step 4: Dead Letter Queue

C#
// Messages that fail all retries go to the dead letter queue (DLQ)
// In RabbitMQ: queue name is "{queue}_error"
// In Azure Service Bus: subqueue named "$DeadLetterQueue"

public class OrderProcessingConsumer : IConsumer<ProcessOrder>
{
    public async Task Consume(ConsumeContext<ProcessOrder> context)
    {
        try
        {
            // ... process
        }
        catch (UnrecoverableOrderException ex)
        {
            // Move to DLQ immediately — don't retry
            throw new Exception($"Unrecoverable: {ex.Message}", ex);
        }
    }
}

// DLQ consumer — reprocess or alert
[Consumer("order-processing_error")]
public class OrderDlqConsumer(
    ILogger<OrderDlqConsumer> logger,
    IAlertService alerts)
    : IConsumer<ProcessOrder>
{
    public async Task Consume(ConsumeContext<ProcessOrder> context)
    {
        // Log the dead-lettered message with full context
        logger.LogError(
            "Dead-lettered: OrderId={OrderId} Reason={Reason} RetryCount={Retries}",
            context.Message.OrderId,
            context.Headers.Get<string>("MT-Fault-Message"),
            context.Headers.Get<int>("MT-Fault-Retry-Count"));

        // Alert the on-call team
        await alerts.SendAsync(
            $"Order {context.Message.OrderId} dead-lettered: check DLQ",
            context.CancellationToken);
    }
}

Step 5: Transactional Outbox Pattern

C#
// Outbox: publish events atomically with your database transaction
// Prevents lost messages if the service crashes after SaveChanges but before Publish

builder.Services.AddMassTransit(x =>
{
    x.AddConsumers(typeof(Program).Assembly);

    // Outbox stores messages in your DB, delivers them after commit
    x.AddEntityFrameworkOutbox<AppDbContext>(o =>
    {
        o.UsePostgres();
        o.UseBusOutbox();   // deliver via the bus background service
    });

    x.UsingRabbitMq((ctx, cfg) =>
    {
        cfg.Host("rabbitmq://localhost");
        cfg.UseMessageRetry(r => r.Exponential(5,
            TimeSpan.FromSeconds(1), TimeSpan.FromMinutes(5), TimeSpan.FromSeconds(2)));
        cfg.UseEntityFrameworkOutbox<AppDbContext>(ctx);
        cfg.ConfigureEndpoints(ctx);
    });
});

// In your command handler — publish is safe now
public class CreateOrderHandler(AppDbContext db, IPublishEndpoint bus)
{
    public async Task HandleAsync(CreateOrderCommand cmd, CancellationToken ct)
    {
        var order = new Order { Id = cmd.OrderId, /* ... */ };
        db.Orders.Add(order);

        // This doesn't actually publish yet — writes to outbox table
        await bus.Publish(new OrderCreated(order.Id, order.CustomerId), ct);

        // Both the order AND the outbox message save atomically
        await db.SaveChangesAsync(ct);
        // → After commit, the outbox worker delivers the message to RabbitMQ
    }
}

// Add outbox tables in EF Core migration
public class AppDbContext : DbContext
{
    protected override void OnModelCreating(ModelBuilder model)
    {
        model.AddInboxStateEntity();     // tracks consumed message IDs (exactly-once)
        model.AddOutboxStateEntity();    // tracks outbox state
        model.AddOutboxMessageEntity();  // stores pending outbound messages
    }
}

Step 6: Request-Response

C#
// Synchronous-style communication over the bus
// Useful for internal service queries where HTTP would be overkill
public class GetOrderStatusConsumer(IOrderRepository orders)
    : IConsumer<GetOrderStatus>
{
    public async Task Consume(ConsumeContext<GetOrderStatus> context)
    {
        var order = await orders.GetByIdAsync(context.Message.OrderId, context.CancellationToken);

        await context.RespondAsync(order is null
            ? new OrderStatusResponse(context.Message.OrderId, "NotFound", null)
            : new OrderStatusResponse(order.Id, order.Status, order.UpdatedAt));
    }
}

// Caller (API layer)
public class OrderApiService(IRequestClient<GetOrderStatus> client)
{
    public async Task<OrderStatusResponse> GetStatusAsync(int orderId, CancellationToken ct)
    {
        var response = await client.GetResponse<OrderStatusResponse>(
            new GetOrderStatus(orderId), ct,
            timeout: RequestTimeout.After(s: 10));

        return response.Message;
    }
}

Step 7: Azure Service Bus Configuration

C#
// Drop-in swap for RabbitMQ — same consumer code, different transport
builder.Services.AddMassTransit(x =>
{
    x.AddConsumers(typeof(Program).Assembly);

    x.UsingAzureServiceBus((ctx, cfg) =>
    {
        cfg.Host(builder.Configuration.GetConnectionString("AzureServiceBus"));

        // Topics for pub/sub, queues for competing consumers
        cfg.Message<OrderCreated>(m => m.SetEntityName("order-created"));   // → topic
        cfg.Message<ProcessOrder>(m => m.SetEntityName("order-processing")); // → queue

        // Service Bus specific: sessions for ordered processing
        cfg.ReceiveEndpoint("order-processing", e =>
        {
            e.RequiresSession = true;   // FIFO per SessionId (e.g., per customer)
            e.ConfigureConsumer<ProcessOrderConsumer>(ctx);
        });

        cfg.ConfigureEndpoints(ctx);
    });
});

Retry vs Dead Letter Decision

Retry:          Transient failure — network blip, database timeout, rate limit
Dead Letter:    Permanent failure — bad message format, business rule violation,
                consumer bug requiring code fix before reprocessing

Rules:
  - Never retry ArgumentException, InvalidOperationException
  - Always retry HttpRequestException, TaskCanceledException, SqlException (transient)
  - Cap retries at 5 with exponential backoff (max 5 minutes between attempts)
  - After max retries → DLQ → alert → investigate → requeue when fixed

Interview Answer

"MassTransit is the standard .NET messaging abstraction — same code works with RabbitMQ, Azure Service Bus, or Amazon SQS. Key patterns: competing consumers (multiple instances share a queue, each message processed once — natural horizontal scaling), publish-subscribe (event reaches all interested consumers via exchange/topic), and the transactional outbox (publish atomically with your DB write — prevents lost messages on crash). Dead letter queues catch messages that fail all retries: distinguish transient failures (retry) from permanent ones (DLQ immediately). The outbox pattern: MassTransit writes messages to an outbox table in the same transaction as your SaveChanges, then a background worker delivers them after commit — guarantees at-least-once delivery without two-phase commit. For exactly-once processing add inbox state: each consumed message ID is stored; duplicates are silently dropped. Retry policy: exponential backoff up to 5 attempts, ignore domain exceptions (ArgumentException, InvalidOperationException) — these won't succeed on retry."

RabbitMQ & Messaging Knowledge Check

5 questions · Test what you just learned · Instant explanations

Enjoyed this article?

Explore the Backend Systems learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.