Learnixo
Back to blog
AI Systemsintermediate

Marten — Event Sourcing and Document Storage on PostgreSQL

Use Marten for event sourcing in .NET: configuring the document store, appending events, loading aggregates with live aggregation and snapshots, building projections, and integrating with ASP.NET Core.

Asma Hafeez KhanMay 16, 20265 min read
Event SourcingMartenPostgreSQL.NETArchitecture
Share:𝕏

What Marten Provides

Marten is a .NET library that turns PostgreSQL into:
  → An event store (with stream versioning, optimistic concurrency)
  → A document database (JSON documents stored in PostgreSQL tables)

Why Marten over rolling your own event store:
  → Append events with version checking — built in
  → Live aggregation: rehydrate an aggregate from events without extra code
  → Projections: synchronous (inline), async (daemon), or live
  → Snapshot support: built-in snapshot store
  → Tested in production — no need to implement and debug your own event store

Tradeoff:
  → Requires PostgreSQL (not SQL Server)
  → Adds a library dependency with its own upgrade cycle
  → Some advanced scenarios require understanding Marten internals

Setup and Configuration

C#
// NuGet: Marten, Marten.AspNetCore, Npgsql

builder.Services.AddMarten(options =>
{
    options.Connection(builder.Configuration.GetConnectionString("Clinical")!);

    // Schema for event store tables
    options.Events.DatabaseSchemaName = "event_store";

    // Register aggregate types for live aggregation
    options.Projections.Snapshot<PrescriptionState>(SnapshotLifecycle.Inline);

    // Register async projections
    options.Projections.Add<PrescriptionSummaryProjection>(ProjectionLifecycle.Async);

    // Auto-create schema in development
    if (builder.Environment.IsDevelopment())
        options.AutoCreateSchemaObjects = AutoCreate.All;
})
.UseLightweightSessions()  // Lighter sessions for read-heavy workloads
.AddAsyncDaemon(DaemonMode.HotCold);  // Background projection daemon

Appending Events

C#
public sealed class ApprovePrescriptionHandler
    : IRequestHandler<ApprovePrescriptionCommand, Result>
{
    private readonly IDocumentSession _session;

    public async Task<Result> Handle(
        ApprovePrescriptionCommand command, CancellationToken ct)
    {
        // Load current aggregate state (Marten replays events automatically)
        var prescription = await _session.Events
            .AggregateStreamAsync<PrescriptionState>(command.PrescriptionId, ct);

        if (prescription is null)
            return Result.Failure(Error.NotFound("Prescription", command.PrescriptionId));

        if (prescription.Status != "Draft")
            return Result.Failure(Error.Validation("Status",
                "Only Draft prescriptions can be approved."));

        // Append events to the stream with expected version check
        _session.Events.Append(
            command.PrescriptionId,
            prescription.Version,  // optimistic concurrency
            new PrescriptionApprovedEvent(
                PrescriptionId: command.PrescriptionId,
                ApprovedBy:     command.ApprovedBy,
                ApprovedAt:     DateTime.UtcNow,
                WardId:         prescription.WardId));

        await _session.SaveChangesAsync(ct);
        return Result.Success();
    }
}

Aggregate (Live Aggregation)

C#
// Marten applies events to this class automatically via Apply() methods
// PrescriptionState is the "aggregate projection" — it represents current state

public sealed class PrescriptionState
{
    public Guid     Id         { get; set; }
    public string   Status     { get; set; } = "Draft";
    public string   Medication { get; set; } = string.Empty;
    public decimal  DoseAmount { get; set; }
    public string   DoseUnit   { get; set; } = string.Empty;
    public Guid?    WardId     { get; set; }
    public int      Version    { get; set; }

    // Marten calls Apply for each event in order
    public void Apply(PrescriptionCreatedEvent e)
    {
        Id         = e.PrescriptionId;
        Medication = e.MedicationName;
        DoseAmount = e.DoseAmount;
        DoseUnit   = e.DoseUnit;
        WardId     = e.WardId;
        Status     = "Draft";
    }

    public void Apply(PrescriptionApprovedEvent e)
    {
        Status  = "Approved";
        WardId  = e.WardId ?? WardId;
        Version++;
    }

    public void Apply(PrescriptionSuspendedEvent e) => Status = "Suspended";
    public void Apply(PrescriptionExpiredEvent e)   => Status = "Expired";
}

// Registration (in AddMarten configuration):
// options.Projections.Snapshot<PrescriptionState>(SnapshotLifecycle.Inline);
// Marten stores the aggregate state as a document alongside the event stream.
// On next load: reads the document directly instead of replaying all events.

Async Projection with Marten Daemon

C#
// Multi-stream projection: listens to all prescription streams
public sealed class PrescriptionSummaryProjection
    : MultiStreamProjection<PrescriptionSummary, Guid>
{
    public PrescriptionSummaryProjection()
    {
        // Tell Marten which events this projection handles
        Identity<PrescriptionCreatedEvent>(e => e.PrescriptionId);
        Identity<PrescriptionApprovedEvent>(e => e.PrescriptionId);
        Identity<PrescriptionSuspendedEvent>(e => e.PrescriptionId);
    }

    public PrescriptionSummary Create(PrescriptionCreatedEvent e) =>
        new()
        {
            Id             = e.PrescriptionId,
            MedicationName = e.MedicationName,
            DoseDisplay    = $"{e.DoseAmount} {e.DoseUnit}",
            Status         = "Draft",
            WardId         = e.WardId,
            CreatedAt      = e.CreatedAt
        };

    public void Apply(PrescriptionSummary summary, PrescriptionApprovedEvent e)
    {
        summary.Status     = "Approved";
        summary.ApprovedAt = e.ApprovedAt;
    }

    public void Apply(PrescriptionSummary summary, PrescriptionSuspendedEvent e)
    {
        summary.Status = "Suspended";
    }
}

// Querying the projection (Marten stores it as a document):
public async Task<IReadOnlyList<PrescriptionSummary>> GetByWardAsync(
    Guid wardId, IQuerySession session, CancellationToken ct)
{
    return await session.Query<PrescriptionSummary>()
        .Where(s => s.WardId == wardId && s.Status == "Approved")
        .OrderByDescending(s => s.ApprovedAt)
        .ToListAsync(token: ct);
}

Reading Event History for Audit

C#
// Full prescription history for MHRA audit
public async Task<IReadOnlyList<AuditEventDto>> GetAuditHistoryAsync(
    Guid prescriptionId, IQuerySession session, CancellationToken ct)
{
    var stream = await session.Events
        .FetchStreamAsync(prescriptionId, token: ct);

    return stream.Select(e => new AuditEventDto(
        EventType:  e.EventTypeName,
        OccurredAt: e.Timestamp.UtcDateTime,
        Data:       e.Data?.ToString() ?? string.Empty
    )).ToList();
}

// Output for a Warfarin prescription:
// PrescriptionCreated  | 2026-03-01 09:00 | {medication: Warfarin, dose: 5mg}
// PrescriptionSuspended| 2026-03-03 14:22 | {reason: INR above range, inr: 4.1}
// PrescriptionResumed  | 2026-03-05 10:05 | {new_dose: 3mg, inr: 2.4}
// PrescriptionApproved | 2026-03-05 10:06 | {approved_by: Dr. Smith}
// This is your MHRA audit trail — no additional audit table needed.

Production issue I've seen: A team used Marten's Async Daemon without configuring the daemon properly in multi-instance deployments. Two instances of the API ran simultaneously, both with the daemon in HotCold mode — but they hadn't configured distributed locking via PostgreSQL advisory locks. Both daemons ran at the same time, processing the same events and creating duplicate projection rows. Setting DaemonMode.HotCold and ensuring Marten uses PostgreSQL advisory locks for coordination (built-in) resolved the duplication. The default DaemonMode.Disabled is safe for single-instance; always configure HotCold for load-balanced deployments.


Key Takeaway

Marten turns PostgreSQL into a production-grade event store with built-in live aggregation, snapshots, and async projections. Configure aggregates with Apply() methods — Marten calls them in order to reconstitute state. Use SnapshotLifecycle.Inline for write-heavy aggregates that need fast rehydration. Use MultiStreamProjection with the Async Daemon for denormalised read models. The event stream is always the audit trail — FetchStreamAsync retrieves the full history of any aggregate.

Enjoyed this article?

Explore the AI Systems learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.