.NET & C# Development · Lesson 192 of 229
OrderFlow Part 4: Domain Events & Outbox Pattern
OrderFlow: Domain Events and the Outbox Pattern
This is part 4 of the OrderFlow series. CQRS is in place. Now we decouple side effects — sending confirmation emails, updating analytics, notifying fulfilment — from the command handlers that trigger them. Domain events are the mechanism.
Starting point: OrderFlow CQRS complete.
The Problem Domain Events Solve
Without domain events:
public class CreateOrderCommandHandler(
IEmailService email,
IAnalyticsService analytics,
IFulfilmentService fulfilment,
...)
{
// The handler knows about 5 downstream services
// Adding a 6th requires changing this class
// Testing requires mocking all 5
}
With domain events:
public class CreateOrderCommandHandler(OrderFlowDbContext db)
{
// Handler knows only about persistence
// Side effects handled by separate, independent subscribers
}Step 1: Define Domain Events
C#
// src/OrderFlow.Core/Events/IDomainEvent.cs
public interface IDomainEvent : INotification
{
Guid EventId { get; }
DateTime OccuredAt { get; }
}
// src/OrderFlow.Core/Events/OrderPlacedEvent.cs
public record OrderPlacedEvent(
int OrderId,
int CustomerId,
decimal Total,
string CustomerEmail) : IDomainEvent
{
public Guid EventId { get; } = Guid.NewGuid();
public DateTime OccuredAt { get; } = DateTime.UtcNow;
}
public record OrderCancelledEvent(
int OrderId,
int CustomerId,
string Reason,
bool WasPaid) : IDomainEvent
{
public Guid EventId { get; } = Guid.NewGuid();
public DateTime OccuredAt { get; } = DateTime.UtcNow;
}
public record OrderPaidEvent(
int OrderId,
int CustomerId,
decimal Amount,
string PaymentReference) : IDomainEvent
{
public Guid EventId { get; } = Guid.NewGuid();
public DateTime OccuredAt { get; } = DateTime.UtcNow;
}Step 2: Raise Events From the Aggregate
C#
// src/OrderFlow.Core/Entities/Order.cs — aggregate raises its own events
public class Order
{
public int Id { get; set; }
public int CustomerId { get; set; }
public string Status { get; set; } = "Pending";
public decimal Total { get; set; }
public List<OrderLine> Lines { get; set; } = [];
public DateTime CreatedAt { get; set; }
// In-memory collection of events raised this session
private readonly List<IDomainEvent> _events = [];
public IReadOnlyList<IDomainEvent> DomainEvents => _events.AsReadOnly();
public void ClearDomainEvents() => _events.Clear();
// Factory method — raises event as part of creation
public static Order Place(int customerId, List<OrderLine> lines, string customerEmail)
{
var order = new Order
{
CustomerId = customerId,
Lines = lines,
Total = lines.Sum(l => l.Quantity * l.UnitPrice),
Status = "Pending",
CreatedAt = DateTime.UtcNow,
};
order._events.Add(new OrderPlacedEvent(
order.Id, customerId, order.Total, customerEmail));
return order;
}
public void Cancel(string reason)
{
if (Status is "Shipped" or "Delivered")
throw new ConflictException($"Cannot cancel order in {Status} status.");
var wasPaid = Status == "Paid";
Status = "Cancelled";
_events.Add(new OrderCancelledEvent(Id, CustomerId, reason, wasPaid));
}
}Step 3: Dispatch Events After SaveChanges
C#
// src/OrderFlow.Infrastructure/Data/OrderFlowDbContext.cs
// Override SaveChanges to dispatch domain events after persistence
public class OrderFlowDbContext(
DbContextOptions<OrderFlowDbContext> opts,
IPublisher publisher)
: DbContext(opts)
{
public override async Task<int> SaveChangesAsync(CancellationToken ct = default)
{
// Collect events from all tracked aggregates before saving
var events = ChangeTracker.Entries<Order>()
.SelectMany(e => e.Entity.DomainEvents)
.ToList();
var result = await base.SaveChangesAsync(ct);
// Dispatch events AFTER successful save — in-process, same transaction scope
foreach (var @event in events)
await publisher.Publish(@event, ct);
// Clear events from aggregates so they don't dispatch again
ChangeTracker.Entries<Order>()
.ToList()
.ForEach(e => e.Entity.ClearDomainEvents());
return result;
}
}Step 4: Event Handlers (Side Effects)
C#
// Email notification — fully decoupled from command handler
public class SendOrderConfirmationEmail(IEmailService email)
: INotificationHandler<OrderPlacedEvent>
{
public async Task Handle(OrderPlacedEvent @event, CancellationToken ct)
{
await email.SendAsync(new EmailMessage(
To: @event.CustomerEmail,
Subject: $"Order #{@event.OrderId} Confirmed",
Body: $"Thank you for your order of {@event.Total:C}. " +
$"Order reference: {@event.OrderId}."
), ct);
}
}
// Analytics tracking
public class TrackOrderPlacedAnalytics(IAnalyticsService analytics)
: INotificationHandler<OrderPlacedEvent>
{
public async Task Handle(OrderPlacedEvent @event, CancellationToken ct)
{
await analytics.TrackAsync("order_placed", new
{
order_id = @event.OrderId,
customer_id = @event.CustomerId,
total = @event.Total,
}, ct);
}
}
// Refund trigger on cancellation
public class TriggerRefundOnCancellation(IPaymentService payments)
: INotificationHandler<OrderCancelledEvent>
{
public async Task Handle(OrderCancelledEvent @event, CancellationToken ct)
{
if (!@event.WasPaid) return; // nothing to refund
await payments.IssueRefundAsync(@event.OrderId, ct);
}
}Step 5: The Outbox Pattern — Survive Crashes
The in-process approach above is great but has a flaw: if the process crashes between SaveChanges and dispatching events, those events are lost.
C#
// Outbox: persist events in the same DB transaction as the aggregate
// A background worker reads and dispatches them durably
// src/OrderFlow.Core/Entities/OutboxMessage.cs
public class OutboxMessage
{
public Guid Id { get; set; } = Guid.NewGuid();
public string Type { get; set; } = ""; // fully qualified type name
public string Payload { get; set; } = ""; // JSON-serialised event
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public DateTime? ProcessedAt { get; set; }
public string? Error { get; set; }
}C#
// Interceptor — writes to outbox instead of dispatching in-process
public class DomainEventToOutboxInterceptor(IPublisher publisher)
: SaveChangesInterceptor
{
public override async ValueTask<int> SavedChangesAsync(
SaveChangesCompletedEventData eventData,
int result,
CancellationToken ct = default)
{
var context = eventData.Context;
if (context is null) return result;
// Collect events from all tracked aggregates
var events = context.ChangeTracker
.Entries<Order>()
.SelectMany(e => e.Entity.DomainEvents)
.ToList();
if (events.Count == 0) return result;
// Write outbox messages in a second save (within the same connection)
var messages = events.Select(e => new OutboxMessage
{
Type = e.GetType().AssemblyQualifiedName!,
Payload = JsonSerializer.Serialize(e, e.GetType()),
}).ToList();
context.Set<OutboxMessage>().AddRange(messages);
await context.SaveChangesAsync(ct);
// Clear after storing
context.ChangeTracker.Entries<Order>()
.ToList()
.ForEach(e => e.Entity.ClearDomainEvents());
return result;
}
}C#
// Background worker — reads unprocessed outbox messages and dispatches
public class OutboxWorker(
IServiceScopeFactory scopeFactory,
ILogger<OutboxWorker> logger)
: BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await ProcessBatchAsync(stoppingToken);
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
private async Task ProcessBatchAsync(CancellationToken ct)
{
await using var scope = scopeFactory.CreateAsyncScope();
var db = scope.ServiceProvider.GetRequiredService<OrderFlowDbContext>();
var publisher = scope.ServiceProvider.GetRequiredService<IPublisher>();
var messages = await db.Set<OutboxMessage>()
.Where(m => m.ProcessedAt == null && m.Error == null)
.OrderBy(m => m.CreatedAt)
.Take(20)
.ToListAsync(ct);
foreach (var msg in messages)
{
try
{
var type = Type.GetType(msg.Type)!;
var @event = (IDomainEvent)JsonSerializer.Deserialize(msg.Payload, type)!;
await publisher.Publish(@event, ct);
msg.ProcessedAt = DateTime.UtcNow;
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to process outbox message {Id}", msg.Id);
msg.Error = ex.Message;
}
}
await db.SaveChangesAsync(ct);
}
}C#
// Register everything
builder.Services.AddDbContext<OrderFlowDbContext>(opts =>
{
opts.UseNpgsql(connectionString);
opts.AddInterceptors(new DomainEventToOutboxInterceptor(/* ... */));
});
builder.Services.AddHostedService<OutboxWorker>();What We Achieved
Command handler (CreateOrderCommandHandler):
- Knows only about: OrderFlowDbContext
- Does: creates order, saves it
- Does NOT know about: email, analytics, refunds, fulfilment
Side effects (separate handlers):
- SendOrderConfirmationEmail — handles OrderPlacedEvent
- TrackOrderPlacedAnalytics — handles OrderPlacedEvent
- TriggerRefundOnCancellation — handles OrderCancelledEvent
Adding a new side effect:
- Create a new INotificationHandler
- Register it in DI
- Done — zero changes to existing code What's Next
Next: OrderFlow Caching — add Redis caching for hot queries (product catalogue, order summaries) using the cache-aside pattern with automatic invalidation via domain events.