.NET & C# Development · Lesson 52 of 92
Outbox Pattern — Never Lose a Message on DB Failure
The Dual-Write Problem
Every time you save to the database and publish a message, you have a race condition:
// BROKEN — these two operations are not atomic
await _db.SaveChangesAsync(ct); // succeeds
await _messageBus.PublishAsync(event); // crashes here → message lost foreverOr the other way:
await _messageBus.PublishAsync(event); // succeeds
await _db.SaveChangesAsync(ct); // crashes here → message sent, DB not savedEither way you get inconsistency. The outbox pattern solves this by making message publishing part of the database transaction.
The Outbox Table
Add an outbox table to the same database as your domain data:
CREATE TABLE outbox_messages (
id UUID PRIMARY KEY,
type VARCHAR(500) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
processed_at TIMESTAMPTZ NULL
);EF Core entity and configuration:
public class OutboxMessage
{
public Guid Id { get; set; }
public string Type { get; set; } = default!;
public string Payload { get; set; } = default!;
public DateTime CreatedAt { get; set; }
public DateTime? ProcessedAt { get; set; }
}
// In AppDbContext.OnModelCreating
modelBuilder.Entity<OutboxMessage>(b =>
{
b.ToTable("outbox_messages");
b.HasIndex(x => x.ProcessedAt); // filter index for unprocessed messages
});Writing to the Outbox in the Same Transaction
public class PlaceOrderHandler : IRequestHandler<PlaceOrderCommand, PlaceOrderResponse>
{
private readonly AppDbContext _db;
public PlaceOrderHandler(AppDbContext db) => _db = db;
public async Task<PlaceOrderResponse> Handle(
PlaceOrderCommand request,
CancellationToken ct)
{
var order = Order.Create(request.CustomerId, request.Lines);
var outboxMessage = new OutboxMessage
{
Id = Guid.NewGuid(),
Type = typeof(OrderPlacedEvent).AssemblyQualifiedName!,
Payload = JsonSerializer.Serialize(new OrderPlacedEvent(
order.Id,
order.CustomerId,
order.Total,
order.CreatedAt)),
CreatedAt = DateTime.UtcNow
};
_db.Orders.Add(order);
_db.OutboxMessages.Add(outboxMessage);
// Single SaveChanges — both the order and the outbox message
// are persisted atomically. If this fails, nothing is saved.
// If it succeeds, the message is guaranteed to be delivered eventually.
await _db.SaveChangesAsync(ct);
return new PlaceOrderResponse(order.Id, order.Total);
}
}Background OutboxProcessor
A BackgroundService polls for unprocessed messages and publishes them:
public class OutboxProcessor : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<OutboxProcessor> _logger;
private static readonly TimeSpan Interval = TimeSpan.FromSeconds(10);
public OutboxProcessor(
IServiceScopeFactory scopeFactory,
ILogger<OutboxProcessor> logger)
{
_scopeFactory = scopeFactory;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await ProcessBatchAsync(stoppingToken);
await Task.Delay(Interval, stoppingToken);
}
}
private async Task ProcessBatchAsync(CancellationToken ct)
{
using var scope = _scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var publisher = scope.ServiceProvider.GetRequiredService<IMessagePublisher>();
var messages = await db.OutboxMessages
.Where(m => m.ProcessedAt == null)
.OrderBy(m => m.CreatedAt)
.Take(20)
.ToListAsync(ct);
foreach (var message in messages)
{
try
{
var eventType = Type.GetType(message.Type)
?? throw new InvalidOperationException($"Unknown type: {message.Type}");
var @event = JsonSerializer.Deserialize(message.Payload, eventType)
?? throw new InvalidOperationException("Failed to deserialize message.");
await publisher.PublishAsync(@event, ct);
message.ProcessedAt = DateTime.UtcNow;
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process outbox message {MessageId}", message.Id);
// Don't mark as processed — will retry on next tick
}
}
await db.SaveChangesAsync(ct);
}
}Register it:
builder.Services.AddHostedService<OutboxProcessor>();Idempotency on the Consumer Side
Because the processor retries on failure, a message may be published more than once (at-least-once delivery). Your consumers must handle duplicate messages:
public class OrderPlacedConsumer : IConsumer<OrderPlacedEvent>
{
private readonly AppDbContext _db;
public OrderPlacedConsumer(AppDbContext db) => _db = db;
public async Task Consume(ConsumeContext<OrderPlacedEvent> context)
{
var messageId = context.MessageId?.ToString() ?? context.Message.OrderId.ToString();
// Check if we've already processed this message
var alreadyProcessed = await _db.ProcessedMessages
.AnyAsync(m => m.MessageId == messageId);
if (alreadyProcessed)
return;
// Do the actual work
await ReserveInventoryAsync(context.Message);
// Record that we've processed it
_db.ProcessedMessages.Add(new ProcessedMessage
{
MessageId = messageId,
ProcessedAt = DateTime.UtcNow
});
await _db.SaveChangesAsync();
}
}Using Quartz for the Processor
BackgroundService with Task.Delay is simple but has no retry scheduling, no distributed locking, and no missed-execution tracking. Quartz.NET solves all three:
dotnet add package Quartz.AspNetCore[DisallowConcurrentExecution] // prevents overlapping jobs if one runs long
public class OutboxJob : IJob
{
private readonly AppDbContext _db;
private readonly IMessagePublisher _publisher;
public OutboxJob(AppDbContext db, IMessagePublisher publisher)
{
_db = db;
_publisher = publisher;
}
public async Task Execute(IJobExecutionContext context)
{
// same batch processing logic as above
}
}
// Program.cs
builder.Services.AddQuartz(q =>
{
q.AddJobAndTrigger<OutboxJob>(opts =>
opts.WithSimpleSchedule(s => s.WithIntervalInSeconds(10).RepeatForever()));
});
builder.Services.AddQuartzHostedService(q => q.WaitForJobsToComplete = true);MassTransit Transactional Outbox
Building this yourself is educational. In production, use MassTransit's built-in transactional outbox:
dotnet add package MassTransit.EntityFrameworkCore// DbContext must implement the outbox interfaces
public class AppDbContext : DbContext, ISagaDbContext
{
// MassTransit adds its own outbox tables via migrations
}
// Program.cs
builder.Services.AddMassTransit(x =>
{
x.AddEntityFrameworkOutbox<AppDbContext>(o =>
{
o.UsePostgres();
o.UseBusOutbox(); // routes publishes through the outbox automatically
});
x.UsingRabbitMq((ctx, cfg) =>
{
cfg.ConfigureEndpoints(ctx);
});
});Now any PublishAsync inside a handler that uses AppDbContext is automatically captured in the outbox. No manual outbox entity, no custom processor — MassTransit handles it.
The hand-rolled version is worth understanding. For new production services, use MassTransit's outbox and spend the saved time on business logic.