Learnixo

Event Sourcing in .NET · Lesson 5 of 6

CQRS + Event Sourcing — Command and Query Separation

Why Event Sourcing and CQRS Belong Together

Event sourcing without CQRS:
  → Write: append events to event store
  → Read: replay all events to answer a query
  → Querying by replaying is slow and inflexible

CQRS without event sourcing:
  → Write: update the database row
  → Read: query from a separate read model
  → No history — you cannot audit past state changes

Together:
  → Write: append events (event sourcing provides the history)
  → Read: query projections (CQRS provides the optimised read model)
  → Each side is optimised independently
  → Audit trail is free — the event store is the audit log

Separation:
  Commands: go to the aggregate → produce events → events stored → projections updated
  Queries:  go directly to the read model (projection table) — never touch the event store

Command Side — Appending Events

C#
// ApprovePrescriptionCommand: write side
public sealed record ApprovePrescriptionCommand(
    Guid PrescriptionId,
    Guid ApprovedBy) : IRequest<Result>;

public sealed class ApprovePrescriptionHandler
    : IRequestHandler<ApprovePrescriptionCommand, Result>
{
    private readonly IPrescriptionRepository _repository;

    public async Task<Result> Handle(
        ApprovePrescriptionCommand command, CancellationToken ct)
    {
        // 1. Load aggregate by replaying events (or snapshot + delta)
        var prescription = await _repository.GetByIdAsync(
            PrescriptionId.Of(command.PrescriptionId), ct);

        if (prescription is null)
            return Result.Failure(Error.NotFound("Prescription", command.PrescriptionId));

        // 2. Execute domain behaviour — produces domain events internally
        var result = prescription.Approve(command.ApprovedBy, DateTime.UtcNow);
        if (result.IsFailure) return result;

        // 3. Persist by appending events to the event store
        await _repository.SaveAsync(prescription, ct);
        // SaveAsync extracts uncommitted events from the aggregate and appends them

        return Result.Success();
    }
}

// Repository saves uncommitted domain events as event store records
public sealed class PrescriptionRepository : IPrescriptionRepository
{
    public async Task SaveAsync(Prescription prescription, CancellationToken ct)
    {
        var streamId = $"prescription-{prescription.Id.Value}";
        var events   = prescription.UncommittedEvents; // events raised since rehydration

        await _eventStore.AppendAsync(streamId, events, prescription.Version, ct);
        prescription.ClearUncommittedEvents();
    }
}

Query Side — Reading from Projections

C#
// GetActivePrescriptionsQuery: read side — never touches the event store
public sealed record GetActivePrescriptionsQuery(
    Guid WardId,
    int  Page,
    int  PageSize) : IRequest<PagedList<PrescriptionSummaryDto>>;

public sealed class GetActivePrescriptionsHandler
    : IRequestHandler<GetActivePrescriptionsQuery, PagedList<PrescriptionSummaryDto>>
{
    private readonly IDbConnection _db;

    public async Task<PagedList<PrescriptionSummaryDto>> Handle(
        GetActivePrescriptionsQuery query, CancellationToken ct)
    {
        // Query the projection table — NOT the event store
        // This table is maintained by the PrescriptionSummaryProjection
        var totalCount = await _db.QuerySingleAsync<int>("""
            SELECT COUNT(*) FROM prescriptions.prescription_summaries
            WHERE  ward_id = @wardId AND status = 'Approved'
            """, new { wardId = query.WardId });

        var items = await _db.QueryAsync<PrescriptionSummaryDto>("""
            SELECT id, medication_name, patient_name, dose_display,
                   approved_at, approved_by_name
            FROM   prescriptions.prescription_summaries
            WHERE  ward_id = @wardId AND status = 'Approved'
            ORDER  BY approved_at DESC
            OFFSET @offset ROWS FETCH NEXT @pageSize ROWS ONLY
            """,
            new
            {
                wardId   = query.WardId,
                offset   = (query.Page - 1) * query.PageSize,
                pageSize = query.PageSize
            });

        return new PagedList<PrescriptionSummaryDto>(
            items.ToList(), query.Page, query.PageSize, totalCount);
    }
}

Eventual Consistency Between Write and Read

After ApprovePrescriptionCommand succeeds:
  → Event appended to event store (synchronous, atomic)
  → Projection update:
      Option A (inline): happens in the same transaction — immediately consistent
      Option B (async):  background worker updates projection within milliseconds — eventually consistent

Client impact with async projection:
  → Command: "Approve prescription X" → 200 OK
  → Immediate GET: prescription may still show "Draft" (projection not yet updated)
  → After 100ms: projection catches up, GET shows "Approved"

Handling this in the UI:
  → Show optimistic UI update immediately (don't wait for projection)
  → Or: use "Read your own writes" — after a successful command, client reads
    directly from the event store for the next N seconds

When inline projection is required:
  → Safety-critical displays (medication dose on a clinical screen must be current)
  → Use inline projection (same transaction) for those specific projections
  → Accept async for non-safety-critical projections (audit log, reporting)

Event Schema Evolution

C#
// Events are immutable historical records — you cannot change them.
// When a field is added, use nullable with a default:

// Version 1 (original)
public sealed record PrescriptionApprovedEvent(
    Guid     PrescriptionId,
    Guid     ApprovedBy,
    DateTime ApprovedAt);

// Version 2 (added WardId — backward compatible: nullable, default null)
public sealed record PrescriptionApprovedEvent(
    Guid     PrescriptionId,
    Guid     ApprovedBy,
    DateTime ApprovedAt,
    Guid?    WardId = null);  // old events deserialize with WardId = null

// Version 3 (added InrValueAtApproval — nullable)
public sealed record PrescriptionApprovedEvent(
    Guid     PrescriptionId,
    Guid     ApprovedBy,
    DateTime ApprovedAt,
    Guid?    WardId            = null,
    decimal? InrValueAtApproval = null);

// What to NEVER do:
//   Remove a field (breaks deserialization of old events)
//   Rename a field (breaks deserialization of old events)
//   Change field type (breaks deserialization of old events)
// If you need a breaking change: create a new event type and upcaster

Production issue I've seen: A CQRS + event sourcing system had read-side queries that were hitting the event store directly — "because the projection wasn't updated yet." The developer justified this as a temporary workaround during testing. It shipped to production. Under load, the event store was being scanned for read queries (250+ times per second) while also receiving write appends. Event store I/O saturated. The rule is absolute: queries go to projections, never to the event store. If you need more consistent reads, invest in inline projections or a "read your own writes" cache — not direct event store reads.


Key Takeaway

Event sourcing and CQRS are natural partners: commands drive aggregates that produce events (stored in the event store), while queries read from projections (pre-built, indexed read models). The two sides evolve independently. Queries must never read directly from the event store — always from a projection. Handle eventual consistency explicitly: inline projections for safety-critical displays, async projections for reporting and non-critical reads. Event schema changes must be backward-compatible; use nullable fields for additions, upcasters for structural changes.