.NET & C# Development · Lesson 52 of 92

Outbox Pattern — Never Lose a Message on DB Failure

The Dual-Write Problem

Every time you save to the database and publish a message, you have a race condition:

C#
// BROKEN — these two operations are not atomic
await _db.SaveChangesAsync(ct);          // succeeds
await _messageBus.PublishAsync(event);   // crashes here → message lost forever

Or the other way:

C#
await _messageBus.PublishAsync(event);   // succeeds
await _db.SaveChangesAsync(ct);          // crashes here → message sent, DB not saved

Either way you get inconsistency. The outbox pattern solves this by making message publishing part of the database transaction.


The Outbox Table

Add an outbox table to the same database as your domain data:

SQL
CREATE TABLE outbox_messages (
    id          UUID PRIMARY KEY,
    type        VARCHAR(500) NOT NULL,
    payload     JSONB NOT NULL,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    processed_at TIMESTAMPTZ NULL
);

EF Core entity and configuration:

C#
public class OutboxMessage
{
    public Guid Id { get; set; }
    public string Type { get; set; } = default!;
    public string Payload { get; set; } = default!;
    public DateTime CreatedAt { get; set; }
    public DateTime? ProcessedAt { get; set; }
}

// In AppDbContext.OnModelCreating
modelBuilder.Entity<OutboxMessage>(b =>
{
    b.ToTable("outbox_messages");
    b.HasIndex(x => x.ProcessedAt); // filter index for unprocessed messages
});

Writing to the Outbox in the Same Transaction

C#
public class PlaceOrderHandler : IRequestHandler<PlaceOrderCommand, PlaceOrderResponse>
{
    private readonly AppDbContext _db;

    public PlaceOrderHandler(AppDbContext db) => _db = db;

    public async Task<PlaceOrderResponse> Handle(
        PlaceOrderCommand request,
        CancellationToken ct)
    {
        var order = Order.Create(request.CustomerId, request.Lines);

        var outboxMessage = new OutboxMessage
        {
            Id = Guid.NewGuid(),
            Type = typeof(OrderPlacedEvent).AssemblyQualifiedName!,
            Payload = JsonSerializer.Serialize(new OrderPlacedEvent(
                order.Id,
                order.CustomerId,
                order.Total,
                order.CreatedAt)),
            CreatedAt = DateTime.UtcNow
        };

        _db.Orders.Add(order);
        _db.OutboxMessages.Add(outboxMessage);

        // Single SaveChanges — both the order and the outbox message
        // are persisted atomically. If this fails, nothing is saved.
        // If it succeeds, the message is guaranteed to be delivered eventually.
        await _db.SaveChangesAsync(ct);

        return new PlaceOrderResponse(order.Id, order.Total);
    }
}

Background OutboxProcessor

A BackgroundService polls for unprocessed messages and publishes them:

C#
public class OutboxProcessor : BackgroundService
{
    private readonly IServiceScopeFactory _scopeFactory;
    private readonly ILogger<OutboxProcessor> _logger;
    private static readonly TimeSpan Interval = TimeSpan.FromSeconds(10);

    public OutboxProcessor(
        IServiceScopeFactory scopeFactory,
        ILogger<OutboxProcessor> logger)
    {
        _scopeFactory = scopeFactory;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            await ProcessBatchAsync(stoppingToken);
            await Task.Delay(Interval, stoppingToken);
        }
    }

    private async Task ProcessBatchAsync(CancellationToken ct)
    {
        using var scope = _scopeFactory.CreateScope();
        var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();
        var publisher = scope.ServiceProvider.GetRequiredService<IMessagePublisher>();

        var messages = await db.OutboxMessages
            .Where(m => m.ProcessedAt == null)
            .OrderBy(m => m.CreatedAt)
            .Take(20)
            .ToListAsync(ct);

        foreach (var message in messages)
        {
            try
            {
                var eventType = Type.GetType(message.Type)
                    ?? throw new InvalidOperationException($"Unknown type: {message.Type}");

                var @event = JsonSerializer.Deserialize(message.Payload, eventType)
                    ?? throw new InvalidOperationException("Failed to deserialize message.");

                await publisher.PublishAsync(@event, ct);

                message.ProcessedAt = DateTime.UtcNow;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Failed to process outbox message {MessageId}", message.Id);
                // Don't mark as processed — will retry on next tick
            }
        }

        await db.SaveChangesAsync(ct);
    }
}

Register it:

C#
builder.Services.AddHostedService<OutboxProcessor>();

Idempotency on the Consumer Side

Because the processor retries on failure, a message may be published more than once (at-least-once delivery). Your consumers must handle duplicate messages:

C#
public class OrderPlacedConsumer : IConsumer<OrderPlacedEvent>
{
    private readonly AppDbContext _db;

    public OrderPlacedConsumer(AppDbContext db) => _db = db;

    public async Task Consume(ConsumeContext<OrderPlacedEvent> context)
    {
        var messageId = context.MessageId?.ToString() ?? context.Message.OrderId.ToString();

        // Check if we've already processed this message
        var alreadyProcessed = await _db.ProcessedMessages
            .AnyAsync(m => m.MessageId == messageId);

        if (alreadyProcessed)
            return;

        // Do the actual work
        await ReserveInventoryAsync(context.Message);

        // Record that we've processed it
        _db.ProcessedMessages.Add(new ProcessedMessage
        {
            MessageId = messageId,
            ProcessedAt = DateTime.UtcNow
        });

        await _db.SaveChangesAsync();
    }
}

Using Quartz for the Processor

BackgroundService with Task.Delay is simple but has no retry scheduling, no distributed locking, and no missed-execution tracking. Quartz.NET solves all three:

Bash
dotnet add package Quartz.AspNetCore
C#
[DisallowConcurrentExecution] // prevents overlapping jobs if one runs long
public class OutboxJob : IJob
{
    private readonly AppDbContext _db;
    private readonly IMessagePublisher _publisher;

    public OutboxJob(AppDbContext db, IMessagePublisher publisher)
    {
        _db = db;
        _publisher = publisher;
    }

    public async Task Execute(IJobExecutionContext context)
    {
        // same batch processing logic as above
    }
}

// Program.cs
builder.Services.AddQuartz(q =>
{
    q.AddJobAndTrigger<OutboxJob>(opts =>
        opts.WithSimpleSchedule(s => s.WithIntervalInSeconds(10).RepeatForever()));
});

builder.Services.AddQuartzHostedService(q => q.WaitForJobsToComplete = true);

MassTransit Transactional Outbox

Building this yourself is educational. In production, use MassTransit's built-in transactional outbox:

Bash
dotnet add package MassTransit.EntityFrameworkCore
C#
// DbContext must implement the outbox interfaces
public class AppDbContext : DbContext, ISagaDbContext
{
    // MassTransit adds its own outbox tables via migrations
}

// Program.cs
builder.Services.AddMassTransit(x =>
{
    x.AddEntityFrameworkOutbox<AppDbContext>(o =>
    {
        o.UsePostgres();
        o.UseBusOutbox(); // routes publishes through the outbox automatically
    });

    x.UsingRabbitMq((ctx, cfg) =>
    {
        cfg.ConfigureEndpoints(ctx);
    });
});

Now any PublishAsync inside a handler that uses AppDbContext is automatically captured in the outbox. No manual outbox entity, no custom processor — MassTransit handles it.

The hand-rolled version is worth understanding. For new production services, use MassTransit's outbox and spend the saved time on business logic.