Learnixo

.NET & C# Development · Lesson 192 of 229

OrderFlow Part 4: Domain Events & Outbox Pattern

OrderFlow: Domain Events and the Outbox Pattern

This is part 4 of the OrderFlow series. CQRS is in place. Now we decouple side effects — sending confirmation emails, updating analytics, notifying fulfilment — from the command handlers that trigger them. Domain events are the mechanism.

Starting point: OrderFlow CQRS complete.


The Problem Domain Events Solve

Without domain events:
  public class CreateOrderCommandHandler(
      IEmailService email,
      IAnalyticsService analytics,
      IFulfilmentService fulfilment,
      ...)
  {
      // The handler knows about 5 downstream services
      // Adding a 6th requires changing this class
      // Testing requires mocking all 5
  }

With domain events:
  public class CreateOrderCommandHandler(OrderFlowDbContext db)
  {
      // Handler knows only about persistence
      // Side effects handled by separate, independent subscribers
  }

Step 1: Define Domain Events

C#
// src/OrderFlow.Core/Events/IDomainEvent.cs
public interface IDomainEvent : INotification
{
    Guid      EventId   { get; }
    DateTime  OccuredAt { get; }
}

// src/OrderFlow.Core/Events/OrderPlacedEvent.cs
public record OrderPlacedEvent(
    int      OrderId,
    int      CustomerId,
    decimal  Total,
    string   CustomerEmail) : IDomainEvent
{
    public Guid     EventId   { get; } = Guid.NewGuid();
    public DateTime OccuredAt { get; } = DateTime.UtcNow;
}

public record OrderCancelledEvent(
    int    OrderId,
    int    CustomerId,
    string Reason,
    bool   WasPaid) : IDomainEvent
{
    public Guid     EventId   { get; } = Guid.NewGuid();
    public DateTime OccuredAt { get; } = DateTime.UtcNow;
}

public record OrderPaidEvent(
    int     OrderId,
    int     CustomerId,
    decimal Amount,
    string  PaymentReference) : IDomainEvent
{
    public Guid     EventId   { get; } = Guid.NewGuid();
    public DateTime OccuredAt { get; } = DateTime.UtcNow;
}

Step 2: Raise Events From the Aggregate

C#
// src/OrderFlow.Core/Entities/Order.cs — aggregate raises its own events
public class Order
{
    public int          Id         { get; set; }
    public int          CustomerId { get; set; }
    public string       Status     { get; set; } = "Pending";
    public decimal      Total      { get; set; }
    public List<OrderLine> Lines   { get; set; } = [];
    public DateTime     CreatedAt  { get; set; }

    // In-memory collection of events raised this session
    private readonly List<IDomainEvent> _events = [];
    public IReadOnlyList<IDomainEvent> DomainEvents => _events.AsReadOnly();

    public void ClearDomainEvents() => _events.Clear();

    // Factory method — raises event as part of creation
    public static Order Place(int customerId, List<OrderLine> lines, string customerEmail)
    {
        var order = new Order
        {
            CustomerId = customerId,
            Lines      = lines,
            Total      = lines.Sum(l => l.Quantity * l.UnitPrice),
            Status     = "Pending",
            CreatedAt  = DateTime.UtcNow,
        };

        order._events.Add(new OrderPlacedEvent(
            order.Id, customerId, order.Total, customerEmail));

        return order;
    }

    public void Cancel(string reason)
    {
        if (Status is "Shipped" or "Delivered")
            throw new ConflictException($"Cannot cancel order in {Status} status.");

        var wasPaid = Status == "Paid";
        Status = "Cancelled";

        _events.Add(new OrderCancelledEvent(Id, CustomerId, reason, wasPaid));
    }
}

Step 3: Dispatch Events After SaveChanges

C#
// src/OrderFlow.Infrastructure/Data/OrderFlowDbContext.cs
// Override SaveChanges to dispatch domain events after persistence
public class OrderFlowDbContext(
    DbContextOptions<OrderFlowDbContext> opts,
    IPublisher publisher)
    : DbContext(opts)
{
    public override async Task<int> SaveChangesAsync(CancellationToken ct = default)
    {
        // Collect events from all tracked aggregates before saving
        var events = ChangeTracker.Entries<Order>()
            .SelectMany(e => e.Entity.DomainEvents)
            .ToList();

        var result = await base.SaveChangesAsync(ct);

        // Dispatch events AFTER successful save — in-process, same transaction scope
        foreach (var @event in events)
            await publisher.Publish(@event, ct);

        // Clear events from aggregates so they don't dispatch again
        ChangeTracker.Entries<Order>()
            .ToList()
            .ForEach(e => e.Entity.ClearDomainEvents());

        return result;
    }
}

Step 4: Event Handlers (Side Effects)

C#
// Email notification — fully decoupled from command handler
public class SendOrderConfirmationEmail(IEmailService email)
    : INotificationHandler<OrderPlacedEvent>
{
    public async Task Handle(OrderPlacedEvent @event, CancellationToken ct)
    {
        await email.SendAsync(new EmailMessage(
            To:      @event.CustomerEmail,
            Subject: $"Order #{@event.OrderId} Confirmed",
            Body:    $"Thank you for your order of {@event.Total:C}. " +
                     $"Order reference: {@event.OrderId}."
        ), ct);
    }
}

// Analytics tracking
public class TrackOrderPlacedAnalytics(IAnalyticsService analytics)
    : INotificationHandler<OrderPlacedEvent>
{
    public async Task Handle(OrderPlacedEvent @event, CancellationToken ct)
    {
        await analytics.TrackAsync("order_placed", new
        {
            order_id    = @event.OrderId,
            customer_id = @event.CustomerId,
            total       = @event.Total,
        }, ct);
    }
}

// Refund trigger on cancellation
public class TriggerRefundOnCancellation(IPaymentService payments)
    : INotificationHandler<OrderCancelledEvent>
{
    public async Task Handle(OrderCancelledEvent @event, CancellationToken ct)
    {
        if (!@event.WasPaid) return;   // nothing to refund

        await payments.IssueRefundAsync(@event.OrderId, ct);
    }
}

Step 5: The Outbox Pattern — Survive Crashes

The in-process approach above is great but has a flaw: if the process crashes between SaveChanges and dispatching events, those events are lost.

C#
// Outbox: persist events in the same DB transaction as the aggregate
// A background worker reads and dispatches them durably

// src/OrderFlow.Core/Entities/OutboxMessage.cs
public class OutboxMessage
{
    public Guid      Id          { get; set; } = Guid.NewGuid();
    public string    Type        { get; set; } = "";   // fully qualified type name
    public string    Payload     { get; set; } = "";   // JSON-serialised event
    public DateTime  CreatedAt   { get; set; } = DateTime.UtcNow;
    public DateTime? ProcessedAt { get; set; }
    public string?   Error       { get; set; }
}
C#
// Interceptor — writes to outbox instead of dispatching in-process
public class DomainEventToOutboxInterceptor(IPublisher publisher)
    : SaveChangesInterceptor
{
    public override async ValueTask<int> SavedChangesAsync(
        SaveChangesCompletedEventData eventData,
        int result,
        CancellationToken ct = default)
    {
        var context = eventData.Context;
        if (context is null) return result;

        // Collect events from all tracked aggregates
        var events = context.ChangeTracker
            .Entries<Order>()
            .SelectMany(e => e.Entity.DomainEvents)
            .ToList();

        if (events.Count == 0) return result;

        // Write outbox messages in a second save (within the same connection)
        var messages = events.Select(e => new OutboxMessage
        {
            Type    = e.GetType().AssemblyQualifiedName!,
            Payload = JsonSerializer.Serialize(e, e.GetType()),
        }).ToList();

        context.Set<OutboxMessage>().AddRange(messages);
        await context.SaveChangesAsync(ct);

        // Clear after storing
        context.ChangeTracker.Entries<Order>()
            .ToList()
            .ForEach(e => e.Entity.ClearDomainEvents());

        return result;
    }
}
C#
// Background worker — reads unprocessed outbox messages and dispatches
public class OutboxWorker(
    IServiceScopeFactory scopeFactory,
    ILogger<OutboxWorker> logger)
    : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            await ProcessBatchAsync(stoppingToken);
            await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
        }
    }

    private async Task ProcessBatchAsync(CancellationToken ct)
    {
        await using var scope = scopeFactory.CreateAsyncScope();
        var db        = scope.ServiceProvider.GetRequiredService<OrderFlowDbContext>();
        var publisher = scope.ServiceProvider.GetRequiredService<IPublisher>();

        var messages = await db.Set<OutboxMessage>()
            .Where(m => m.ProcessedAt == null && m.Error == null)
            .OrderBy(m => m.CreatedAt)
            .Take(20)
            .ToListAsync(ct);

        foreach (var msg in messages)
        {
            try
            {
                var type  = Type.GetType(msg.Type)!;
                var @event = (IDomainEvent)JsonSerializer.Deserialize(msg.Payload, type)!;
                await publisher.Publish(@event, ct);
                msg.ProcessedAt = DateTime.UtcNow;
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "Failed to process outbox message {Id}", msg.Id);
                msg.Error = ex.Message;
            }
        }

        await db.SaveChangesAsync(ct);
    }
}
C#
// Register everything
builder.Services.AddDbContext<OrderFlowDbContext>(opts =>
{
    opts.UseNpgsql(connectionString);
    opts.AddInterceptors(new DomainEventToOutboxInterceptor(/* ... */));
});

builder.Services.AddHostedService<OutboxWorker>();

What We Achieved

Command handler (CreateOrderCommandHandler):
  - Knows only about: OrderFlowDbContext
  - Does: creates order, saves it
  - Does NOT know about: email, analytics, refunds, fulfilment

Side effects (separate handlers):
  - SendOrderConfirmationEmail    — handles OrderPlacedEvent
  - TrackOrderPlacedAnalytics     — handles OrderPlacedEvent
  - TriggerRefundOnCancellation   — handles OrderCancelledEvent

Adding a new side effect:
  - Create a new INotificationHandler
  - Register it in DI
  - Done — zero changes to existing code

What's Next

Next: OrderFlow Caching — add Redis caching for hot queries (product catalogue, order summaries) using the cache-aside pattern with automatic invalidation via domain events.