Event Sourcing in .NET with Marten — Store Events, Not State
Build event-sourced systems in .NET using the Marten library: append events, rebuild projections, snapshot aggregates, and query event streams with PostgreSQL as the event store.
Event Sourcing in .NET with Marten — Store Events, Not State
Event sourcing stores every state change as an immutable event. Instead of updating a row, you append an event. The current state is derived by replaying all events from the beginning. Marten turns PostgreSQL into a full-featured event store.
Why Event Sourcing?
Traditional (state-based):
UPDATE orders SET status = 'Shipped', shipped_at = NOW() WHERE id = 42
→ You know the current state. You lose the history.
Event sourcing:
INSERT event: OrderShipped { orderId: 42, shippedAt: ..., carrier: 'DHL' }
→ You have the full history. Current state is derived by replaying events.
Benefits:
- Complete audit trail (every change, who, when, why)
- Time travel — replay to any point in time
- Event-driven integration — events ARE the integration contract
- Debugging — reproduce any bug by replaying the exact events
Costs:
- Eventual consistency on read models (projections)
- Schema evolution requires event versioning strategy
- Higher query complexity for current-state readsStep 1: Install Marten
<PackageReference Include="Marten" Version="7.*" />
<PackageReference Include="Weasel.Postgresql" Version="5.*" />// Program.cs
builder.Services.AddMarten(opts =>
{
opts.Connection(builder.Configuration.GetConnectionString("Postgres")!);
// Schema — Marten creates tables automatically
opts.DatabaseSchemaName = "events";
// Event store configuration
opts.Events.StreamIdentity = StreamIdentity.AsGuid; // Guid-based stream IDs
// Register aggregates for inline projection
opts.Projections.Snapshot<Order>(SnapshotLifecycle.Inline);
// Register projections
opts.Projections.Add<OrderSummaryProjection>(ProjectionLifecycle.Inline);
opts.Projections.Add<CustomerOrderHistoryProjection>(ProjectionLifecycle.Async);
})
.UseLightweightSessions(); // no identity map overhead for event storesStep 2: Define Events
// Events are plain C# records — immutable by definition
public record OrderPlaced(
Guid OrderId,
Guid CustomerId,
List<OrderLine> Lines,
decimal Total,
DateTimeOffset PlacedAt);
public record OrderPaid(
Guid OrderId,
string PaymentReference,
decimal AmountPaid,
DateTimeOffset PaidAt);
public record OrderShipped(
Guid OrderId,
string TrackingNumber,
string Carrier,
DateTimeOffset ShippedAt);
public record OrderDelivered(
Guid OrderId,
DateTimeOffset DeliveredAt);
public record OrderCancelled(
Guid OrderId,
string Reason,
bool RefundIssued,
DateTimeOffset CancelledAt);
public record OrderLine(Guid ProductId, string ProductName, int Quantity, decimal UnitPrice);Step 3: Aggregate (Rebuilds State from Events)
// The aggregate applies events one by one to rebuild current state
public class Order
{
public Guid Id { get; private set; }
public Guid CustomerId { get; private set; }
public string Status { get; private set; } = "Pending";
public decimal Total { get; private set; }
public List<OrderLine> Lines { get; private set; } = [];
public string? TrackingNumber { get; private set; }
public string? PaymentRef { get; private set; }
public int Version { get; private set; }
// Marten calls Apply() methods in sequence to replay events
public void Apply(OrderPlaced e)
{
Id = e.OrderId;
CustomerId = e.CustomerId;
Lines = e.Lines;
Total = e.Total;
Status = "Pending";
}
public void Apply(OrderPaid e)
{
PaymentRef = e.PaymentReference;
Status = "Paid";
}
public void Apply(OrderShipped e)
{
TrackingNumber = e.TrackingNumber;
Status = "Shipped";
}
public void Apply(OrderDelivered _) => Status = "Delivered";
public void Apply(OrderCancelled _) => Status = "Cancelled";
// Business rules on the aggregate
public OrderShipped Ship(string trackingNumber, string carrier)
{
if (Status != "Paid")
throw new InvalidOperationException($"Cannot ship order in {Status} status.");
return new OrderShipped(Id, trackingNumber, carrier, DateTimeOffset.UtcNow);
}
public OrderCancelled Cancel(string reason, bool refundIssued = false)
{
if (Status is "Shipped" or "Delivered")
throw new InvalidOperationException($"Cannot cancel order in {Status} status.");
return new OrderCancelled(Id, reason, refundIssued, DateTimeOffset.UtcNow);
}
}Step 4: Write Events (Command Side)
// Command handler — appends events to the stream
public class OrderCommandHandler(IDocumentSession session)
{
// Place a new order — starts a new event stream
public async Task<Guid> PlaceOrderAsync(PlaceOrderCommand cmd, CancellationToken ct)
{
var orderId = Guid.NewGuid();
var @event = new OrderPlaced(
OrderId: orderId,
CustomerId: cmd.CustomerId,
Lines: cmd.Lines.Select(l => new OrderLine(l.ProductId, l.ProductName, l.Quantity, l.UnitPrice)).ToList(),
Total: cmd.Lines.Sum(l => l.Quantity * l.UnitPrice),
PlacedAt: DateTimeOffset.UtcNow);
// StartStream creates a new event stream
session.Events.StartStream<Order>(orderId, @event);
await session.SaveChangesAsync(ct);
return orderId;
}
// Ship an order — load aggregate, validate, append new event
public async Task ShipOrderAsync(Guid orderId, string trackingNumber, string carrier, CancellationToken ct)
{
// Load current aggregate by replaying all events
var order = await session.Events.AggregateStreamAsync<Order>(orderId, token: ct)
?? throw new NotFoundException(nameof(Order), orderId);
// Business logic on the aggregate — returns the event to append
var @event = order.Ship(trackingNumber, carrier);
// Optimistic concurrency — reject if another process modified the stream
session.Events.Append(orderId, order.Version, @event);
await session.SaveChangesAsync(ct);
}
// Cancel an order
public async Task CancelOrderAsync(Guid orderId, string reason, CancellationToken ct)
{
var order = await session.Events.AggregateStreamAsync<Order>(orderId, token: ct)
?? throw new NotFoundException(nameof(Order), orderId);
var @event = order.Cancel(reason);
session.Events.Append(orderId, order.Version, @event);
await session.SaveChangesAsync(ct);
}
}Step 5: Projections (Read Models)
// Inline projection — updated synchronously when events are saved
public class OrderSummaryProjection : SingleStreamProjection<OrderSummary>
{
public OrderSummary Create(OrderPlaced e) => new()
{
Id = e.OrderId,
CustomerId = e.CustomerId,
Status = "Pending",
Total = e.Total,
PlacedAt = e.PlacedAt,
};
public void Apply(OrderPaid e, OrderSummary summary)
{
summary.Status = "Paid";
summary.PaymentRef = e.PaymentReference;
}
public void Apply(OrderShipped e, OrderSummary summary)
{
summary.Status = "Shipped";
summary.TrackingNumber = e.TrackingNumber;
summary.ShippedAt = e.ShippedAt;
}
public void Apply(OrderDelivered e, OrderSummary summary) => summary.Status = "Delivered";
public void Apply(OrderCancelled e, OrderSummary summary) => summary.Status = "Cancelled";
}
public class OrderSummary
{
public Guid Id { get; set; }
public Guid CustomerId { get; set; }
public string Status { get; set; } = "";
public decimal Total { get; set; }
public string? PaymentRef { get; set; }
public string? TrackingNumber { get; set; }
public DateTimeOffset PlacedAt { get; set; }
public DateTimeOffset? ShippedAt { get; set; }
}// Async projection — eventually consistent, runs in a background daemon
public class CustomerOrderHistoryProjection : MultiStreamProjection<CustomerOrderHistory, Guid>
{
public CustomerOrderHistoryProjection()
{
// Group events by CustomerId (not OrderId)
Identity<OrderPlaced>(e => e.CustomerId);
Identity<OrderCancelled>(e => e.OrderId); // need to look up CustomerId
}
public void Apply(OrderPlaced e, CustomerOrderHistory history)
{
history.Id = e.CustomerId;
history.Orders.Add(new OrderSummaryItem(e.OrderId, e.Total, "Pending", e.PlacedAt));
history.TotalOrders++;
history.TotalSpend += e.Total;
}
public void Apply(OrderCancelled e, CustomerOrderHistory history)
{
var order = history.Orders.FirstOrDefault(o => o.OrderId == e.OrderId);
if (order is not null) order.Status = "Cancelled";
}
}
public class CustomerOrderHistory
{
public Guid Id { get; set; }
public List<OrderSummaryItem> Orders { get; set; } = [];
public int TotalOrders { get; set; }
public decimal TotalSpend { get; set; }
}
public record OrderSummaryItem(Guid OrderId, decimal Total, string Status, DateTimeOffset PlacedAt)
{
public string Status { get; set; } = Status;
}Step 6: Query the Read Model
// Query against projections — fast, no event replay needed
public class OrderQueryHandler(IQuerySession session)
{
public Task<OrderSummary?> GetOrderAsync(Guid orderId, CancellationToken ct)
=> session.LoadAsync<OrderSummary>(orderId, ct);
public Task<List<OrderSummary>> GetActiveOrdersAsync(CancellationToken ct)
=> session.Query<OrderSummary>()
.Where(o => o.Status != "Delivered" && o.Status != "Cancelled")
.OrderByDescending(o => o.PlacedAt)
.ToListAsync(ct);
public Task<CustomerOrderHistory?> GetCustomerHistoryAsync(Guid customerId, CancellationToken ct)
=> session.LoadAsync<CustomerOrderHistory>(customerId, ct);
// Time travel — what was the state of this order on a specific date?
public async Task<Order?> GetOrderAtAsync(Guid orderId, DateTimeOffset asOf, CancellationToken ct)
=> await session.Events.AggregateStreamAsync<Order>(orderId, timestamp: asOf.UtcDateTime, token: ct);
}Step 7: Snapshots (Skip Full Replay)
// For aggregates with thousands of events, snapshots skip replaying old events
// Marten handles snapshots automatically when configured:
opts.Projections.Snapshot<Order>(SnapshotLifecycle.Inline);
// You can also create them on-demand for long-lived streams:
// session.Events.AppendExclusive(orderId) will snapshot automatically
// after every 100 events (configurable)Interview Answer
"Event sourcing persists state changes as an immutable sequence of events rather than updating rows. The current state is derived by replaying events — called an aggregate. In .NET, Marten uses PostgreSQL as the event store: StartStream creates a new stream, Append adds events, and AggregateStreamAsync replays all events to rebuild the current aggregate. Optimistic concurrency: pass the expected version to Append — Marten throws a ConcurrencyException if another process appended first. Projections are read models derived from events: inline projections are updated synchronously when events are saved (consistent but slower writes), async projections run in a background daemon (eventually consistent, faster writes). For long-running aggregates, snapshots skip replaying old events — Marten stores a point-in-time snapshot and replays only events after it. Time travel is built in: AggregateStreamAsync with a timestamp parameter rebuilds the state as of any past point. The main challenge is schema evolution — you can't change past events, so you need an upcaster strategy for old event formats."
Enjoyed this article?
Explore the Backend Systems learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.