Learnixo
Back to blog
Backend Systemsadvanced

Event Sourcing in .NET with Marten — Store Events, Not State

Build event-sourced systems in .NET using the Marten library: append events, rebuild projections, snapshot aggregates, and query event streams with PostgreSQL as the event store.

Asma Hafeez KhanMay 25, 20267 min read
.NETC#Martenevent sourcingCQRSPostgreSQLprojections
Share:𝕏

Event Sourcing in .NET with Marten — Store Events, Not State

Event sourcing stores every state change as an immutable event. Instead of updating a row, you append an event. The current state is derived by replaying all events from the beginning. Marten turns PostgreSQL into a full-featured event store.


Why Event Sourcing?

Traditional (state-based):
  UPDATE orders SET status = 'Shipped', shipped_at = NOW() WHERE id = 42
  → You know the current state. You lose the history.

Event sourcing:
  INSERT event: OrderShipped { orderId: 42, shippedAt: ..., carrier: 'DHL' }
  → You have the full history. Current state is derived by replaying events.

Benefits:
  - Complete audit trail (every change, who, when, why)
  - Time travel — replay to any point in time
  - Event-driven integration — events ARE the integration contract
  - Debugging — reproduce any bug by replaying the exact events

Costs:
  - Eventual consistency on read models (projections)
  - Schema evolution requires event versioning strategy
  - Higher query complexity for current-state reads

Step 1: Install Marten

XML
<PackageReference Include="Marten" Version="7.*" />
<PackageReference Include="Weasel.Postgresql" Version="5.*" />
C#
// Program.cs
builder.Services.AddMarten(opts =>
{
    opts.Connection(builder.Configuration.GetConnectionString("Postgres")!);

    // Schema — Marten creates tables automatically
    opts.DatabaseSchemaName = "events";

    // Event store configuration
    opts.Events.StreamIdentity = StreamIdentity.AsGuid;   // Guid-based stream IDs

    // Register aggregates for inline projection
    opts.Projections.Snapshot<Order>(SnapshotLifecycle.Inline);

    // Register projections
    opts.Projections.Add<OrderSummaryProjection>(ProjectionLifecycle.Inline);
    opts.Projections.Add<CustomerOrderHistoryProjection>(ProjectionLifecycle.Async);
})
.UseLightweightSessions();   // no identity map overhead for event stores

Step 2: Define Events

C#
// Events are plain C# records — immutable by definition
public record OrderPlaced(
    Guid OrderId,
    Guid CustomerId,
    List<OrderLine> Lines,
    decimal Total,
    DateTimeOffset PlacedAt);

public record OrderPaid(
    Guid OrderId,
    string PaymentReference,
    decimal AmountPaid,
    DateTimeOffset PaidAt);

public record OrderShipped(
    Guid OrderId,
    string TrackingNumber,
    string Carrier,
    DateTimeOffset ShippedAt);

public record OrderDelivered(
    Guid OrderId,
    DateTimeOffset DeliveredAt);

public record OrderCancelled(
    Guid OrderId,
    string Reason,
    bool RefundIssued,
    DateTimeOffset CancelledAt);

public record OrderLine(Guid ProductId, string ProductName, int Quantity, decimal UnitPrice);

Step 3: Aggregate (Rebuilds State from Events)

C#
// The aggregate applies events one by one to rebuild current state
public class Order
{
    public Guid   Id        { get; private set; }
    public Guid   CustomerId { get; private set; }
    public string Status    { get; private set; } = "Pending";
    public decimal Total    { get; private set; }
    public List<OrderLine> Lines { get; private set; } = [];
    public string? TrackingNumber { get; private set; }
    public string? PaymentRef    { get; private set; }
    public int Version { get; private set; }

    // Marten calls Apply() methods in sequence to replay events
    public void Apply(OrderPlaced e)
    {
        Id         = e.OrderId;
        CustomerId = e.CustomerId;
        Lines      = e.Lines;
        Total      = e.Total;
        Status     = "Pending";
    }

    public void Apply(OrderPaid e)
    {
        PaymentRef = e.PaymentReference;
        Status     = "Paid";
    }

    public void Apply(OrderShipped e)
    {
        TrackingNumber = e.TrackingNumber;
        Status         = "Shipped";
    }

    public void Apply(OrderDelivered _) => Status = "Delivered";

    public void Apply(OrderCancelled _) => Status = "Cancelled";

    // Business rules on the aggregate
    public OrderShipped Ship(string trackingNumber, string carrier)
    {
        if (Status != "Paid")
            throw new InvalidOperationException($"Cannot ship order in {Status} status.");

        return new OrderShipped(Id, trackingNumber, carrier, DateTimeOffset.UtcNow);
    }

    public OrderCancelled Cancel(string reason, bool refundIssued = false)
    {
        if (Status is "Shipped" or "Delivered")
            throw new InvalidOperationException($"Cannot cancel order in {Status} status.");

        return new OrderCancelled(Id, reason, refundIssued, DateTimeOffset.UtcNow);
    }
}

Step 4: Write Events (Command Side)

C#
// Command handler — appends events to the stream
public class OrderCommandHandler(IDocumentSession session)
{
    // Place a new order — starts a new event stream
    public async Task<Guid> PlaceOrderAsync(PlaceOrderCommand cmd, CancellationToken ct)
    {
        var orderId = Guid.NewGuid();

        var @event = new OrderPlaced(
            OrderId:    orderId,
            CustomerId: cmd.CustomerId,
            Lines:      cmd.Lines.Select(l => new OrderLine(l.ProductId, l.ProductName, l.Quantity, l.UnitPrice)).ToList(),
            Total:      cmd.Lines.Sum(l => l.Quantity * l.UnitPrice),
            PlacedAt:   DateTimeOffset.UtcNow);

        // StartStream creates a new event stream
        session.Events.StartStream<Order>(orderId, @event);
        await session.SaveChangesAsync(ct);

        return orderId;
    }

    // Ship an order — load aggregate, validate, append new event
    public async Task ShipOrderAsync(Guid orderId, string trackingNumber, string carrier, CancellationToken ct)
    {
        // Load current aggregate by replaying all events
        var order = await session.Events.AggregateStreamAsync<Order>(orderId, token: ct)
            ?? throw new NotFoundException(nameof(Order), orderId);

        // Business logic on the aggregate — returns the event to append
        var @event = order.Ship(trackingNumber, carrier);

        // Optimistic concurrency — reject if another process modified the stream
        session.Events.Append(orderId, order.Version, @event);
        await session.SaveChangesAsync(ct);
    }

    // Cancel an order
    public async Task CancelOrderAsync(Guid orderId, string reason, CancellationToken ct)
    {
        var order = await session.Events.AggregateStreamAsync<Order>(orderId, token: ct)
            ?? throw new NotFoundException(nameof(Order), orderId);

        var @event = order.Cancel(reason);
        session.Events.Append(orderId, order.Version, @event);
        await session.SaveChangesAsync(ct);
    }
}

Step 5: Projections (Read Models)

C#
// Inline projection — updated synchronously when events are saved
public class OrderSummaryProjection : SingleStreamProjection<OrderSummary>
{
    public OrderSummary Create(OrderPlaced e) => new()
    {
        Id         = e.OrderId,
        CustomerId = e.CustomerId,
        Status     = "Pending",
        Total      = e.Total,
        PlacedAt   = e.PlacedAt,
    };

    public void Apply(OrderPaid e, OrderSummary summary)
    {
        summary.Status    = "Paid";
        summary.PaymentRef = e.PaymentReference;
    }

    public void Apply(OrderShipped e, OrderSummary summary)
    {
        summary.Status        = "Shipped";
        summary.TrackingNumber = e.TrackingNumber;
        summary.ShippedAt     = e.ShippedAt;
    }

    public void Apply(OrderDelivered e, OrderSummary summary) => summary.Status = "Delivered";
    public void Apply(OrderCancelled e, OrderSummary summary) => summary.Status = "Cancelled";
}

public class OrderSummary
{
    public Guid   Id            { get; set; }
    public Guid   CustomerId    { get; set; }
    public string Status        { get; set; } = "";
    public decimal Total        { get; set; }
    public string? PaymentRef   { get; set; }
    public string? TrackingNumber { get; set; }
    public DateTimeOffset PlacedAt  { get; set; }
    public DateTimeOffset? ShippedAt { get; set; }
}
C#
// Async projection — eventually consistent, runs in a background daemon
public class CustomerOrderHistoryProjection : MultiStreamProjection<CustomerOrderHistory, Guid>
{
    public CustomerOrderHistoryProjection()
    {
        // Group events by CustomerId (not OrderId)
        Identity<OrderPlaced>(e => e.CustomerId);
        Identity<OrderCancelled>(e => e.OrderId);   // need to look up CustomerId
    }

    public void Apply(OrderPlaced e, CustomerOrderHistory history)
    {
        history.Id = e.CustomerId;
        history.Orders.Add(new OrderSummaryItem(e.OrderId, e.Total, "Pending", e.PlacedAt));
        history.TotalOrders++;
        history.TotalSpend += e.Total;
    }

    public void Apply(OrderCancelled e, CustomerOrderHistory history)
    {
        var order = history.Orders.FirstOrDefault(o => o.OrderId == e.OrderId);
        if (order is not null) order.Status = "Cancelled";
    }
}

public class CustomerOrderHistory
{
    public Guid Id { get; set; }
    public List<OrderSummaryItem> Orders { get; set; } = [];
    public int     TotalOrders { get; set; }
    public decimal TotalSpend  { get; set; }
}

public record OrderSummaryItem(Guid OrderId, decimal Total, string Status, DateTimeOffset PlacedAt)
{
    public string Status { get; set; } = Status;
}

Step 6: Query the Read Model

C#
// Query against projections — fast, no event replay needed
public class OrderQueryHandler(IQuerySession session)
{
    public Task<OrderSummary?> GetOrderAsync(Guid orderId, CancellationToken ct)
        => session.LoadAsync<OrderSummary>(orderId, ct);

    public Task<List<OrderSummary>> GetActiveOrdersAsync(CancellationToken ct)
        => session.Query<OrderSummary>()
            .Where(o => o.Status != "Delivered" && o.Status != "Cancelled")
            .OrderByDescending(o => o.PlacedAt)
            .ToListAsync(ct);

    public Task<CustomerOrderHistory?> GetCustomerHistoryAsync(Guid customerId, CancellationToken ct)
        => session.LoadAsync<CustomerOrderHistory>(customerId, ct);

    // Time travel — what was the state of this order on a specific date?
    public async Task<Order?> GetOrderAtAsync(Guid orderId, DateTimeOffset asOf, CancellationToken ct)
        => await session.Events.AggregateStreamAsync<Order>(orderId, timestamp: asOf.UtcDateTime, token: ct);
}

Step 7: Snapshots (Skip Full Replay)

C#
// For aggregates with thousands of events, snapshots skip replaying old events
// Marten handles snapshots automatically when configured:

opts.Projections.Snapshot<Order>(SnapshotLifecycle.Inline);

// You can also create them on-demand for long-lived streams:
// session.Events.AppendExclusive(orderId) will snapshot automatically
// after every 100 events (configurable)

Interview Answer

"Event sourcing persists state changes as an immutable sequence of events rather than updating rows. The current state is derived by replaying events — called an aggregate. In .NET, Marten uses PostgreSQL as the event store: StartStream creates a new stream, Append adds events, and AggregateStreamAsync replays all events to rebuild the current aggregate. Optimistic concurrency: pass the expected version to Append — Marten throws a ConcurrencyException if another process appended first. Projections are read models derived from events: inline projections are updated synchronously when events are saved (consistent but slower writes), async projections run in a background daemon (eventually consistent, faster writes). For long-running aggregates, snapshots skip replaying old events — Marten stores a point-in-time snapshot and replays only events after it. Time travel is built in: AggregateStreamAsync with a timestamp parameter rebuilds the state as of any past point. The main challenge is schema evolution — you can't change past events, so you need an upcaster strategy for old event formats."

Enjoyed this article?

Explore the Backend Systems learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.