Message Broker Patterns in .NET — MassTransit, Competing Consumers, Dead Letters
Production messaging patterns in .NET with MassTransit: competing consumers, publish-subscribe, dead letter queues, sagas, outbox pattern, and RabbitMQ vs Azure Service Bus configuration.
Message Broker Patterns in .NET — MassTransit, Competing Consumers, Dead Letters
Async messaging decouples services: the sender doesn't wait for the receiver. MassTransit is the de-facto .NET messaging library — it abstracts RabbitMQ, Azure Service Bus, Amazon SQS, and others behind a consistent API.
Core Messaging Patterns
Pattern 1 — Command (point-to-point):
OrderService → [order.process] → PaymentService
One sender, one receiver. Used to tell a specific service to do something.
Pattern 2 — Event (publish-subscribe):
OrderService publishes OrderPlaced
→ PaymentService subscribes
→ NotificationService subscribes
→ AnalyticsService subscribes
One sender, many receivers. Used to announce that something happened.
Pattern 3 — Request-Response:
ApiService sends GetOrderStatus, waits for OrderStatusResponse
Like an HTTP call but over the message bus. Use sparingly.
Pattern 4 — Competing Consumers:
Multiple instances of a service all consume from the same queue
→ Load balancing with durability
→ Scale by adding more consumer instancesStep 1: Install MassTransit
<PackageReference Include="MassTransit" Version="8.*" />
<PackageReference Include="MassTransit.RabbitMQ" Version="8.*" />
<!-- OR Azure Service Bus: -->
<PackageReference Include="MassTransit.Azure.ServiceBus.Core" Version="8.*" />Step 2: Register with RabbitMQ
// Program.cs
builder.Services.AddMassTransit(x =>
{
// Register all consumers in this assembly
x.AddConsumers(typeof(Program).Assembly);
x.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host("rabbitmq://localhost", h =>
{
h.Username("guest");
h.Password("guest");
});
// Configure message retry at the transport level
cfg.UseMessageRetry(r =>
{
r.Exponential(5,
minInterval: TimeSpan.FromSeconds(1),
maxInterval: TimeSpan.FromMinutes(5),
intervalDelta: TimeSpan.FromSeconds(2));
// Don't retry on domain exceptions
r.Ignore<ArgumentException>();
r.Ignore<InvalidOperationException>();
});
// Inboxes and outboxes for exactly-once processing
cfg.UseInMemoryOutbox(ctx);
// Map consumers to queues
cfg.ConfigureEndpoints(ctx);
});
});Step 3: Competing Consumers
// Multiple instances of this consumer all read from the same queue
// RabbitMQ delivers each message to exactly one consumer — load balanced
[Consumer("order-processing")]
public class ProcessOrderConsumer(
IOrderRepository orders,
IInventoryService inventory,
IPublishEndpoint bus)
: IConsumer<ProcessOrder>
{
public async Task Consume(ConsumeContext<ProcessOrder> context)
{
var cmd = context.Message;
// Idempotency check — message may be delivered more than once
if (await orders.ExistsAsync(cmd.OrderId, context.CancellationToken))
{
// Already processed — acknowledge without re-processing
return;
}
await orders.CreateAsync(new Order
{
Id = cmd.OrderId,
CustomerId = cmd.CustomerId,
Lines = cmd.Lines,
Total = cmd.Total,
}, context.CancellationToken);
await bus.Publish(new OrderCreated(cmd.OrderId, cmd.CustomerId), context.CancellationToken);
}
}
// Scale consumers via DI — each service instance gets its own consumer
// The queue is shared; messages are distributed across instancesStep 4: Dead Letter Queue
// Messages that fail all retries go to the dead letter queue (DLQ)
// In RabbitMQ: queue name is "{queue}_error"
// In Azure Service Bus: subqueue named "$DeadLetterQueue"
public class OrderProcessingConsumer : IConsumer<ProcessOrder>
{
public async Task Consume(ConsumeContext<ProcessOrder> context)
{
try
{
// ... process
}
catch (UnrecoverableOrderException ex)
{
// Move to DLQ immediately — don't retry
throw new Exception($"Unrecoverable: {ex.Message}", ex);
}
}
}
// DLQ consumer — reprocess or alert
[Consumer("order-processing_error")]
public class OrderDlqConsumer(
ILogger<OrderDlqConsumer> logger,
IAlertService alerts)
: IConsumer<ProcessOrder>
{
public async Task Consume(ConsumeContext<ProcessOrder> context)
{
// Log the dead-lettered message with full context
logger.LogError(
"Dead-lettered: OrderId={OrderId} Reason={Reason} RetryCount={Retries}",
context.Message.OrderId,
context.Headers.Get<string>("MT-Fault-Message"),
context.Headers.Get<int>("MT-Fault-Retry-Count"));
// Alert the on-call team
await alerts.SendAsync(
$"Order {context.Message.OrderId} dead-lettered: check DLQ",
context.CancellationToken);
}
}Step 5: Transactional Outbox Pattern
// Outbox: publish events atomically with your database transaction
// Prevents lost messages if the service crashes after SaveChanges but before Publish
builder.Services.AddMassTransit(x =>
{
x.AddConsumers(typeof(Program).Assembly);
// Outbox stores messages in your DB, delivers them after commit
x.AddEntityFrameworkOutbox<AppDbContext>(o =>
{
o.UsePostgres();
o.UseBusOutbox(); // deliver via the bus background service
});
x.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host("rabbitmq://localhost");
cfg.UseMessageRetry(r => r.Exponential(5,
TimeSpan.FromSeconds(1), TimeSpan.FromMinutes(5), TimeSpan.FromSeconds(2)));
cfg.UseEntityFrameworkOutbox<AppDbContext>(ctx);
cfg.ConfigureEndpoints(ctx);
});
});
// In your command handler — publish is safe now
public class CreateOrderHandler(AppDbContext db, IPublishEndpoint bus)
{
public async Task HandleAsync(CreateOrderCommand cmd, CancellationToken ct)
{
var order = new Order { Id = cmd.OrderId, /* ... */ };
db.Orders.Add(order);
// This doesn't actually publish yet — writes to outbox table
await bus.Publish(new OrderCreated(order.Id, order.CustomerId), ct);
// Both the order AND the outbox message save atomically
await db.SaveChangesAsync(ct);
// → After commit, the outbox worker delivers the message to RabbitMQ
}
}
// Add outbox tables in EF Core migration
public class AppDbContext : DbContext
{
protected override void OnModelCreating(ModelBuilder model)
{
model.AddInboxStateEntity(); // tracks consumed message IDs (exactly-once)
model.AddOutboxStateEntity(); // tracks outbox state
model.AddOutboxMessageEntity(); // stores pending outbound messages
}
}Step 6: Request-Response
// Synchronous-style communication over the bus
// Useful for internal service queries where HTTP would be overkill
public class GetOrderStatusConsumer(IOrderRepository orders)
: IConsumer<GetOrderStatus>
{
public async Task Consume(ConsumeContext<GetOrderStatus> context)
{
var order = await orders.GetByIdAsync(context.Message.OrderId, context.CancellationToken);
await context.RespondAsync(order is null
? new OrderStatusResponse(context.Message.OrderId, "NotFound", null)
: new OrderStatusResponse(order.Id, order.Status, order.UpdatedAt));
}
}
// Caller (API layer)
public class OrderApiService(IRequestClient<GetOrderStatus> client)
{
public async Task<OrderStatusResponse> GetStatusAsync(int orderId, CancellationToken ct)
{
var response = await client.GetResponse<OrderStatusResponse>(
new GetOrderStatus(orderId), ct,
timeout: RequestTimeout.After(s: 10));
return response.Message;
}
}Step 7: Azure Service Bus Configuration
// Drop-in swap for RabbitMQ — same consumer code, different transport
builder.Services.AddMassTransit(x =>
{
x.AddConsumers(typeof(Program).Assembly);
x.UsingAzureServiceBus((ctx, cfg) =>
{
cfg.Host(builder.Configuration.GetConnectionString("AzureServiceBus"));
// Topics for pub/sub, queues for competing consumers
cfg.Message<OrderCreated>(m => m.SetEntityName("order-created")); // → topic
cfg.Message<ProcessOrder>(m => m.SetEntityName("order-processing")); // → queue
// Service Bus specific: sessions for ordered processing
cfg.ReceiveEndpoint("order-processing", e =>
{
e.RequiresSession = true; // FIFO per SessionId (e.g., per customer)
e.ConfigureConsumer<ProcessOrderConsumer>(ctx);
});
cfg.ConfigureEndpoints(ctx);
});
});Retry vs Dead Letter Decision
Retry: Transient failure — network blip, database timeout, rate limit
Dead Letter: Permanent failure — bad message format, business rule violation,
consumer bug requiring code fix before reprocessing
Rules:
- Never retry ArgumentException, InvalidOperationException
- Always retry HttpRequestException, TaskCanceledException, SqlException (transient)
- Cap retries at 5 with exponential backoff (max 5 minutes between attempts)
- After max retries → DLQ → alert → investigate → requeue when fixedInterview Answer
"MassTransit is the standard .NET messaging abstraction — same code works with RabbitMQ, Azure Service Bus, or Amazon SQS. Key patterns: competing consumers (multiple instances share a queue, each message processed once — natural horizontal scaling), publish-subscribe (event reaches all interested consumers via exchange/topic), and the transactional outbox (publish atomically with your DB write — prevents lost messages on crash). Dead letter queues catch messages that fail all retries: distinguish transient failures (retry) from permanent ones (DLQ immediately). The outbox pattern: MassTransit writes messages to an outbox table in the same transaction as your SaveChanges, then a background worker delivers them after commit — guarantees at-least-once delivery without two-phase commit. For exactly-once processing add inbox state: each consumed message ID is stored; duplicates are silently dropped. Retry policy: exponential backoff up to 5 attempts, ignore domain exceptions (ArgumentException, InvalidOperationException) — these won't succeed on retry."
RabbitMQ & Messaging Knowledge Check
5 questions · Test what you just learned · Instant explanations
Enjoyed this article?
Explore the Backend Systems learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.