Learnixo

Event Sourcing in .NET · Lesson 3 of 6

Projections and Read Models — Building Query Views

What Projections Do

In event sourcing, the event stream is the write model.
You cannot query it efficiently for display purposes.

"Show all active Warfarin prescriptions for Ward 4B" requires:
  → Scanning every event stream for every prescription
  → Replaying all events to find current status
  → Filtering by medication name and ward

This is impossibly slow for read queries.

Projections solve this:
  → Listen to events as they are appended
  → Maintain a denormalised, queryable read model (a table, a document)
  → Update the read model incrementally as events arrive

Result:
  Write: append events to the event store (fast, append-only)
  Read:  query the projection table (fast, pre-computed, indexed)

The projection table is a derived view — it can always be rebuilt
by replaying all events through the projection logic.

Inline Projection (Same Transaction as Write)

C#
// Simplest approach: update the read model in the same DB transaction as the event append
// Works when event store and read model are in the same database

public sealed class PrescriptionApprovedProjection
{
    private readonly DbConnection _conn;

    public async Task HandleAsync(
        PrescriptionApprovedEvent @event,
        SqlTransaction transaction,
        CancellationToken ct)
    {
        await _conn.ExecuteAsync("""
            UPDATE prescriptions.prescription_summaries
            SET    status          = 'Approved',
                   approved_at     = @approvedAt,
                   approved_by     = @approvedBy
            WHERE  id = @prescriptionId
            """,
            new
            {
                prescriptionId = @event.PrescriptionId,
                approvedAt     = @event.ApprovedAt,
                approvedBy     = @event.ApprovedBy
            },
            transaction);
    }
}

// Called immediately after appending the event, within the same transaction:
await _eventStore.AppendAsync(streamId, new[] { approvedEvent }, expectedVersion, ct);
await _projection.HandleAsync(approvedEvent, currentTransaction, ct);
await transaction.CommitAsync(ct);
// Both event and read model update are atomic.

Asynchronous Projection (Background Worker)

C#
// For projections that can be eventually consistent:
// A background worker reads new events and updates the read model

public sealed class ProjectionWorker : BackgroundService
{
    private readonly IEventStore           _eventStore;
    private readonly PrescriptionProjection _projection;
    private readonly IProjectionCheckpoint  _checkpoint;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var lastProcessed = await _checkpoint.GetAsync("prescription-projection");

            var newEvents = await _eventStore.ReadAllFromAsync(
                fromPosition: lastProcessed + 1, stoppingToken);

            foreach (var recorded in newEvents)
            {
                await _projection.HandleAsync(recorded, stoppingToken);
                await _checkpoint.SaveAsync("prescription-projection", recorded.GlobalPosition);
            }

            // Poll for new events every 500ms
            await Task.Delay(500, stoppingToken);
        }
    }
}

// Checkpoint table: stores the last processed global event position per projection
CREATE TABLE projections.checkpoints (
    projection_name  NVARCHAR(200) NOT NULL PRIMARY KEY,
    last_position    BIGINT        NOT NULL DEFAULT -1,
    updated_at       DATETIME2     NOT NULL DEFAULT SYSUTCDATETIME()
);

Projection Definition

C#
// A projection handles multiple event types and maintains one read table

public sealed class PrescriptionSummaryProjection
{
    private readonly DbConnection _conn;

    public Task HandleAsync(RecordedEvent recorded, CancellationToken ct)
    {
        var @event = EventDeserializer.Deserialize(recorded);

        return @event switch
        {
            PrescriptionCreatedEvent e   => OnCreated(e, ct),
            PrescriptionApprovedEvent e  => OnApproved(e, ct),
            PrescriptionSuspendedEvent e => OnSuspended(e, ct),
            PrescriptionExpiredEvent e   => OnExpired(e, ct),
            _                           => Task.CompletedTask
        };
    }

    private Task OnCreated(PrescriptionCreatedEvent e, CancellationToken ct) =>
        _conn.ExecuteAsync("""
            INSERT INTO prescriptions.prescription_summaries
                (id, patient_id, medication_name, dose_amount, dose_unit, status, created_at)
            VALUES
                (@id, @patientId, @medicationName, @doseAmount, @doseUnit, 'Draft', @createdAt)
            """,
            new { e.Id, e.PatientId, e.MedicationName, e.DoseAmount, e.DoseUnit, e.CreatedAt });

    private Task OnApproved(PrescriptionApprovedEvent e, CancellationToken ct) =>
        _conn.ExecuteAsync("""
            UPDATE prescriptions.prescription_summaries
            SET    status = 'Approved', approved_at = @approvedAt
            WHERE  id = @id
            """,
            new { e.Id, e.ApprovedAt });
}

Rebuilding a Projection

C#
// When projection logic changes, rebuild from scratch by replaying all events
// Safe because the event store is immutable — events never change

public sealed class ProjectionRebuilder
{
    public async Task RebuildAsync(
        string                projectionName,
        IProjectionHandler    projection,
        CancellationToken     ct)
    {
        // 1. Truncate the read model table
        await _conn.ExecuteAsync(
            $"TRUNCATE TABLE prescriptions.prescription_summaries");

        // 2. Reset checkpoint
        await _checkpoint.ResetAsync(projectionName);

        // 3. Read all events from position 0
        long position = 0;
        await foreach (var batch in _eventStore.ReadAllInBatchesAsync(batchSize: 1000, ct))
        {
            foreach (var recorded in batch)
                await projection.HandleAsync(recorded, ct);

            await _checkpoint.SaveAsync(projectionName, batch.Last().GlobalPosition);
        }
    }
}

// Projection rebuild can be run:
//   → On startup in a migration-style command
//   → As a CLI command during deployment
//   → In a feature flag-gated background job while the old projection stays live

Multiple Projections from the Same Events

The same events can feed multiple independent projections:

PrescriptionCreatedEvent feeds:
  → PrescriptionSummaryProjection    (for the prescription list UI)
  → PatientPrescriptionHistoryProjection  (for the patient timeline UI)
  → PharmacyDispenseQueueProjection  (for the pharmacy dashboard)
  → MhraAuditProjection              (for regulatory reporting)

Each projection is an independent read model, optimised for its consumer.
Adding a new projection requires no changes to the write side.
The event store replays the same events through a new projection.

This is the key benefit: the write model is append-only and never changes.
New requirements only add new projections, not new columns or schema changes.

Production issue I've seen: A team's projection background worker had no checkpoint persistence — it tracked position in memory only. When the worker restarted (deploy, crash, scale event), it replayed all events from position 0. A projection that normally updated 50 rows per minute was suddenly processing 200,000 historic events. The database CPU spiked to 100%, taking the read model offline for 45 minutes. Adding a persistent checkpoint table (5 rows, one per projection) and saving position every batch fixed the restart behaviour permanently.


Key Takeaway

Projections are read models derived from events — they make event-sourced data queryable. Inline projections update in the same transaction as the event append (consistent, simpler). Async projections run in a background worker with a persistent checkpoint (eventually consistent, more scalable). One event stream can feed many projections. Projections are always rebuildable by replaying the event store — never lose data, never fear schema changes in the read model.