.NET & C# Development · Lesson 223 of 229
Event-Driven Architecture Patterns — Outbox, Choreography, and Idempotency
Event Types — Not All Events Are Equal
The word "event" is overloaded. In a mature event-driven system, three distinct concepts exist and conflating them causes design errors.
Domain Events
A domain event represents something that happened within the boundary of a single aggregate. It is a fact — immutable, past-tense, internal to the bounded context.
OrderPlacedInventoryDecrementedPaymentAuthorised
Domain events are raised by the domain model and handled within the same process or same transaction. They are not published to external message brokers. They drive side effects like sending a confirmation email or updating a read model — but those effects can be part of the same unit of work.
public class Order : AggregateRoot
{
private readonly List<IDomainEvent> _events = [];
public IReadOnlyList<IDomainEvent> DomainEvents => _events;
public void Place(Guid customerId, IReadOnlyList<OrderLine> lines)
{
// Validate invariants...
Status = OrderStatus.Placed;
_events.Add(new OrderPlaced(Id, customerId, lines, DateTimeOffset.UtcNow));
}
}
public record OrderPlaced(
Guid OrderId,
Guid CustomerId,
IReadOnlyList<OrderLine> Lines,
DateTimeOffset OccurredAt) : IDomainEvent;Integration Events
An integration event crosses a service boundary. It is published to a message broker (RabbitMQ, Azure Service Bus, Kafka) so other bounded contexts can react. Integration events must be versioned — external consumers cannot be redeployed atomically with the producer.
OrderConfirmedV1(published by Order service, consumed by Shipping and Email services)
Integration events are typically created from domain events but are not the same thing. The Order service listens to its own OrderPlaced domain event and, as a side effect, publishes OrderConfirmedV1 to the broker via the outbox.
Commands
A command is a request — imperative, directed at a specific handler, may fail. Unlike events (which notify that something happened), commands request that something should happen.
ShipOrder(directed to Shipping service)RefundPayment(directed to Payment service)
Commands are sent point-to-point, not broadcast. If the command fails, the sender can retry, compensate, or surface the error. Do not use commands where events belong — a command implies the sender cares about the outcome; an event implies the publisher does not.
Choreography vs Orchestration
Both are approaches to coordinating multi-step workflows across services. The distinction is who knows the overall flow.
Choreography — Reactive, Decentralised
Each service reacts to events from other services. No service knows the overall flow. Services are loosely coupled — the Order service publishes OrderConfirmed without knowing that Shipping, Inventory, and Email each listen.
Scenario: B2B order processing
OrderService ──────── OrderConfirmed ──────────────────────────┐
▼
InventoryService ─ StockAllocated ──┐
▼
ShippingService ─ ShipmentScheduled
│
EmailService ◄─────────────┘
(listens to ShipmentScheduled)Pros:
- Services are independently deployable and testable.
- No single point of failure (no orchestrator to crash).
- Natural fit for pub/sub infrastructure.
Cons:
- The overall business flow is implicit — it lives in the sum of all subscriptions across all services. Debugging a failed flow requires tracing across multiple services.
- Adding a new step (e.g., fraud check between Order and Inventory) requires modifying the event chain, which can cascade through consumers.
- Monitoring end-to-end workflow completion is hard without additional tooling (correlation IDs, distributed tracing).
Orchestration — Centralised, Explicit
A saga orchestrator (or process manager) knows every step and directs services via commands. Services respond with events, and the orchestrator decides what to do next.
Same scenario with orchestration:
┌────────────────────────────┐
│ OrderSagaOrchestrator │
│ 1. Reserve inventory │
│ 2. If ok → schedule ship │
│ 3. If ok → send email │
│ 4. If any fail → compensate│
└────────────────────────────┘
│ │ │
ReserveStock ScheduleShip SendEmail
│ │ │
Inventory Shipping EmailPros:
- The business flow is explicit and readable in one place.
- Compensation logic (rollback) is clear — the orchestrator knows which steps completed and calls their compensating commands.
- Easier to add steps without touching existing services.
Cons:
- The orchestrator is a central dependency — it must be highly available.
- Orchestrators can become a "god service" that knows too much about other bounded contexts.
- Tighter coupling between orchestrator and participant services.
In practice: use choreography for simple, well-understood flows with two or three steps. Switch to orchestration (with MassTransit's Saga state machine or NServiceBus saga) when compensation logic becomes complex or when you need observable workflow state.
The Dual-Write Problem
The most dangerous anti-pattern in event-driven architecture is the dual write: updating the database and publishing to the broker in the same method, without atomicity.
// DANGEROUS — do not do this
public async Task PlaceOrderAsync(PlaceOrderCommand cmd)
{
var order = Order.Create(cmd);
await _db.Orders.AddAsync(order);
await _db.SaveChangesAsync(); // ← step 1
await _bus.Publish(new OrderConfirmed(order.Id)); // ← step 2
// If the process crashes here, the event is never sent.
// If the broker is unavailable, the event is lost.
// The DB and broker are now inconsistent.
}If the application crashes, the network drops, or the broker is temporarily unavailable between steps 1 and 2, the database has the order but no integration event was published. Downstream services never learn of the order. The inconsistency is permanent and silent.
Outbox Pattern — Guaranteed At-Least-Once Delivery
The outbox pattern solves the dual-write problem by writing the event to the same database transaction as the domain change. A separate background worker (the outbox processor) then reads unpublished events and sends them to the broker. If the processor crashes, it retries — the event will be sent at least once.
Step 1 — Outbox Table
CREATE TABLE outbox_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
sent_at TIMESTAMPTZ,
type TEXT NOT NULL,
payload JSONB NOT NULL
);Step 2 — EF Core Interceptor
An EF Core SaveChangesInterceptor inspects the change tracker for aggregates that have domain events, serialises those events into OutboxMessage rows, and adds them to the context — all within the same SaveChangesAsync call.
public class OutboxInterceptor : SaveChangesInterceptor
{
public override async ValueTask<InterceptionResult<int>> SavingChangesAsync(
DbContextEventData eventData,
InterceptionResult<int> result,
CancellationToken ct = default)
{
var ctx = eventData.Context;
if (ctx is null) return result;
// Find all aggregate roots with pending domain events
var aggregates = ctx.ChangeTracker
.Entries<AggregateRoot>()
.Where(e => e.Entity.DomainEvents.Any())
.Select(e => e.Entity)
.ToList();
// Convert domain events to outbox messages
foreach (var aggregate in aggregates)
{
foreach (var domainEvent in aggregate.DomainEvents)
{
var message = new OutboxMessage
{
Id = Guid.NewGuid(),
CreatedAt = DateTimeOffset.UtcNow,
Type = domainEvent.GetType().AssemblyQualifiedName!,
Payload = JsonSerializer.Serialize(domainEvent,
domainEvent.GetType(),
JsonSerializerOptions.Default)
};
ctx.Set<OutboxMessage>().Add(message);
}
aggregate.ClearDomainEvents();
}
return result;
}
}Register the interceptor:
services.AddDbContext<AppDbContext>((sp, opts) =>
{
opts.UseNpgsql(connectionString)
.AddInterceptors(sp.GetRequiredService<OutboxInterceptor>());
});
services.AddSingleton<OutboxInterceptor>();Now SaveChangesAsync atomically persists both the domain change and the outbox messages. If the transaction rolls back, neither is persisted.
Step 3 — OutboxProcessor Background Service
public class OutboxProcessor(IServiceScopeFactory scopeFactory, ILogger<OutboxProcessor> logger)
: BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await ProcessBatchAsync(stoppingToken);
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
private async Task ProcessBatchAsync(CancellationToken ct)
{
await using var scope = scopeFactory.CreateAsyncScope();
var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var bus = scope.ServiceProvider.GetRequiredService<IPublishEndpoint>();
// Fetch unsent messages, ordered by creation — preserves ordering
var messages = await db.OutboxMessages
.Where(m => m.SentAt == null)
.OrderBy(m => m.CreatedAt)
.Take(50)
.ToListAsync(ct);
foreach (var msg in messages)
{
try
{
var type = Type.GetType(msg.Type)!;
var payload = JsonSerializer.Deserialize(msg.Payload, type)!;
await bus.Publish(payload, type, ct);
msg.SentAt = DateTimeOffset.UtcNow;
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to publish outbox message {Id}", msg.Id);
// Do not mark as sent — will retry on next cycle
}
}
await db.SaveChangesAsync(ct);
}
}This processor guarantees at-least-once delivery. Consumers must be idempotent (see below). MassTransit has a built-in outbox implementation (UseInMemoryOutbox and UseEntityFrameworkCoreOutbox) that handles all of this automatically — prefer it over hand-rolling in production.
Event Sourcing with Marten
Traditional CRUD stores the current state of an aggregate. Event sourcing stores the sequence of events that produced that state. The current state is derived by replaying events.
Benefits:
- Complete audit trail — you can answer "what was the state of this order at 14:32 last Tuesday?"
- Projections — derive any read model from the same event stream, rebuild it at any time.
- Temporal decoupling — new services can subscribe to historical events.
Trade-offs:
- Querying is done through projections, not direct row queries.
- Schema evolution (changing event shapes) requires a versioning strategy.
- Eventual consistency between the write model and read projections.
Marten Event Store Setup
Marten is a PostgreSQL-backed document/event-store library for .NET. It handles event storage, optimistic concurrency, and projection rebuilding.
// Program.cs
builder.Services.AddMarten(opts =>
{
opts.Connection(builder.Configuration.GetConnectionString("Postgres")!);
// Register inline projections — updated synchronously on append
opts.Projections.Add<OrderSummaryProjection>(ProjectionLifecycle.Inline);
// Register async projections — updated by a background daemon
opts.Projections.Add<OrderHistoryProjection>(ProjectionLifecycle.Async);
})
.UseLightweightSessions()
.AddAsyncDaemon(DaemonMode.HotCold); // One active daemon at a time via leader electionDefining Events and the Aggregate
// Events — immutable records
public record OrderPlaced(Guid OrderId, Guid CustomerId, DateTimeOffset At);
public record OrderLineAdded(Guid OrderId, Guid ProductId, int Qty, decimal Price);
public record OrderShipped(Guid OrderId, string TrackingCode, DateTimeOffset At);
// Aggregate — rebuilt from events by Marten
public class Order
{
public Guid Id { get; private set; }
public Guid CustomerId { get; private set; }
public List<OrderLine> Lines { get; private set; } = [];
public string? TrackingCode { get; private set; }
public OrderStatus Status { get; private set; }
// Marten calls these Apply methods when loading the aggregate
public void Apply(OrderPlaced e)
{
Id = e.OrderId;
CustomerId = e.CustomerId;
Status = OrderStatus.Placed;
}
public void Apply(OrderLineAdded e)
=> Lines.Add(new OrderLine(e.ProductId, e.Qty, e.Price));
public void Apply(OrderShipped e)
{
TrackingCode = e.TrackingCode;
Status = OrderStatus.Shipped;
}
}Writing and Reading Events
public class OrderCommandHandler(IDocumentSession session)
{
public async Task PlaceOrderAsync(PlaceOrderCommand cmd)
{
var placed = new OrderPlaced(cmd.OrderId, cmd.CustomerId, DateTimeOffset.UtcNow);
var lines = cmd.Lines.Select(l => new OrderLineAdded(
cmd.OrderId, l.ProductId, l.Quantity, l.Price));
// Start a new stream — the stream ID is the aggregate ID
session.Events.StartStream<Order>(cmd.OrderId, placed);
foreach (var line in lines)
session.Events.Append(cmd.OrderId, line);
await session.SaveChangesAsync();
}
public async Task ShipOrderAsync(Guid orderId, string trackingCode)
{
// Load the current state — Marten replays all events for this stream
var order = await session.Events.AggregateStreamAsync<Order>(orderId)
?? throw new KeyNotFoundException($"Order {orderId} not found");
if (order.Status != OrderStatus.Placed)
throw new InvalidOperationException("Cannot ship an order that is not in Placed status.");
var shipped = new OrderShipped(orderId, trackingCode, DateTimeOffset.UtcNow);
// Optimistic concurrency — only append if the stream version hasn't changed
session.Events.Append(orderId, shipped);
await session.SaveChangesAsync();
}
}Projection
public class OrderSummaryProjection : SingleStreamProjection<OrderSummary>
{
public OrderSummary Create(OrderPlaced e)
=> new() { Id = e.OrderId, CustomerId = e.CustomerId, Status = "Placed" };
public void Apply(OrderShipped e, OrderSummary summary)
{
summary.Status = "Shipped";
summary.TrackingCode = e.TrackingCode;
summary.ShippedAt = e.At;
}
}
public class OrderSummary
{
public Guid Id { get; set; }
public Guid CustomerId { get; set; }
public string Status { get; set; } = "";
public string? TrackingCode { get; set; }
public DateTimeOffset? ShippedAt { get; set; }
}With ProjectionLifecycle.Inline, the OrderSummary document is updated in the same transaction as the event append. ProjectionLifecycle.Async updates it via the background daemon — eventually consistent but lower write latency.
Idempotent Consumers — At-Least-Once Becomes Exactly-Once
Message brokers guarantee at-least-once delivery, not exactly-once. Network retries, consumer restarts, and broker redeliveries all mean a consumer may receive the same message more than once. If the handler charges a credit card or sends an email, duplicates cause real harm.
The solution: track which messages have been processed. Before processing, check if the message ID has been seen; if so, skip. After processing, record the message ID.
public class OrderConfirmedConsumer(AppDbContext db, IEmailService email)
: IConsumer<OrderConfirmedV1>
{
public async Task Consume(ConsumeContext<OrderConfirmedV1> context)
{
var messageId = context.MessageId
?? throw new InvalidOperationException("MessageId is required for idempotency.");
// Check if already processed — use a short SELECT before taking the lock
var alreadyProcessed = await db.ProcessedMessages
.AnyAsync(m => m.MessageId == messageId);
if (alreadyProcessed)
{
// Duplicate delivery — safe to ignore
return;
}
// Process the business logic
await email.SendOrderConfirmationAsync(context.Message.OrderId);
// Record that we processed this message — in the same transaction as any DB writes
db.ProcessedMessages.Add(new ProcessedMessage
{
MessageId = messageId,
ProcessedAt = DateTimeOffset.UtcNow,
ConsumerType = nameof(OrderConfirmedConsumer)
});
await db.SaveChangesAsync();
}
}Register the consumer with MassTransit:
services.AddMassTransit(x =>
{
x.AddConsumer<OrderConfirmedConsumer>();
x.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host("rabbitmq://localhost");
cfg.ConfigureEndpoints(ctx);
});
});Important: the check-and-insert above has a race condition between the AnyAsync and the Add if two threads process the same message simultaneously. Prevent this with a unique constraint on processed_messages(message_id) and catch the DbUpdateException on unique violation:
try
{
await db.SaveChangesAsync();
}
catch (DbUpdateException ex) when (IsUniqueViolation(ex))
{
// Another thread already processed this message — idempotent, ignore
}
static bool IsUniqueViolation(DbUpdateException ex)
=> ex.InnerException is Npgsql.PostgresException pg && pg.SqlState == "23505";MassTransit's built-in inbox (part of its saga persistence or outbox middleware) automates this deduplication pattern and is the preferred approach in production.
Key Takeaways
- Domain events, integration events, and commands are distinct concepts. Domain events are internal facts; integration events cross service boundaries; commands are requests that may fail.
- Choreography gives loose coupling at the cost of implicit flow and harder debugging. Orchestration makes the flow explicit at the cost of a central coordinator. Choose based on the complexity of compensation logic.
- The dual-write problem (update DB + publish to broker without atomicity) is a fundamental correctness hazard. Solve it with the outbox pattern: write events to the same transaction as domain changes, then relay them with a background processor.
- Event sourcing stores the sequence of events rather than the current state. Marten makes this practical on PostgreSQL with optimistic concurrency, projection rebuilding, and an async daemon.
- Message brokers deliver at-least-once. Make consumers idempotent by tracking processed message IDs, backed by a unique database constraint to handle the race condition correctly.