Learnixo

Event Sourcing in .NET · Lesson 2 of 6

Event Store Design — Appending and Reading Events

State vs Events as Source of Truth

Traditional persistence (state store):
  → Store the CURRENT state of an entity
  → Update the row when state changes
  → History is lost — you know where you are, not how you got there

  prescriptions table: { id, status: "Approved", inr_value: 2.3, updated_at: ... }
  Question: "Was this prescription ever suspended?" → Cannot answer.

Event sourcing (event store):
  → Store every EVENT that changed the entity's state
  → Current state is DERIVED by replaying events
  → Full history is preserved forever

  prescription-stream for id abc:
    PrescriptionCreated    { medication: "Warfarin", dose: 5mg }
    PrescriptionSuspended  { reason: "INR above range", inr: 4.1 }
    PrescriptionResumed    { new_dose: 3mg, inr: 2.4 }
    PrescriptionApproved   { approved_by: "Dr Smith" }

  Question: "Was this prescription ever suspended?" → Yes, with full context.
  Question: "What was the INR when it was suspended?" → 4.1
  This is essential for MHRA audit in a clinical system.

Event Store Schema

SQL
-- Minimal event store table in SQL Server
CREATE TABLE event_store.events (
    id              BIGINT           IDENTITY(1,1) NOT NULL PRIMARY KEY,
    stream_id       NVARCHAR(500)    NOT NULL,   -- e.g. "prescription-abc-123"
    stream_version  INT              NOT NULL,   -- 0-based version within stream
    event_type      NVARCHAR(500)    NOT NULL,   -- fully qualified type name
    event_data      NVARCHAR(MAX)    NOT NULL,   -- JSON payload
    metadata        NVARCHAR(MAX)    NULL,       -- correlation ID, user, timestamp
    created_at      DATETIME2        NOT NULL DEFAULT SYSUTCDATETIME(),

    CONSTRAINT UQ_stream_version UNIQUE (stream_id, stream_version)
    -- Unique constraint enforces optimistic concurrency
);

CREATE INDEX IX_events_stream_id ON event_store.events (stream_id, stream_version);

Appending Events

C#
public interface IEventStore
{
    Task AppendAsync(
        string            streamId,
        IReadOnlyList<object> events,
        int               expectedVersion,
        CancellationToken ct = default);

    Task<EventStream> ReadStreamAsync(
        string            streamId,
        CancellationToken ct = default);
}

public sealed class SqlEventStore : IEventStore
{
    private readonly string _connectionString;

    public async Task AppendAsync(
        string streamId,
        IReadOnlyList<object> events,
        int expectedVersion,
        CancellationToken ct = default)
    {
        await using var conn = new SqlConnection(_connectionString);
        await conn.OpenAsync(ct);
        await using var tx = await conn.BeginTransactionAsync(ct);

        // Check current version (optimistic concurrency)
        var currentVersion = await conn.QuerySingleOrDefaultAsync<int?>(
            "SELECT MAX(stream_version) FROM event_store.events WHERE stream_id = @streamId",
            new { streamId }, tx);

        var actualVersion = currentVersion ?? -1;
        if (actualVersion != expectedVersion)
            throw new OptimisticConcurrencyException(
                $"Expected version {expectedVersion} but stream is at {actualVersion}.");

        // Append each event
        var version = actualVersion + 1;
        foreach (var @event in events)
        {
            await conn.ExecuteAsync("""
                INSERT INTO event_store.events
                    (stream_id, stream_version, event_type, event_data, created_at)
                VALUES (@streamId, @version, @eventType, @eventData, SYSUTCDATETIME())
                """,
                new
                {
                    streamId,
                    version = version++,
                    eventType = @event.GetType().AssemblyQualifiedName,
                    eventData = JsonSerializer.Serialize(@event, @event.GetType())
                }, tx);
        }

        await tx.CommitAsync(ct);
    }
}

Reading a Stream and Rehydrating an Aggregate

C#
public async Task<EventStream> ReadStreamAsync(string streamId, CancellationToken ct = default)
{
    await using var conn = new SqlConnection(_connectionString);
    var rows = await conn.QueryAsync<EventRow>("""
        SELECT stream_version, event_type, event_data, created_at
        FROM   event_store.events
        WHERE  stream_id = @streamId
        ORDER  BY stream_version ASC
        """, new { streamId });

    var recordedEvents = rows.Select(r => new RecordedEvent(
        r.StreamVersion,
        r.EventType,
        r.EventData,
        r.CreatedAt)).ToList();

    return new EventStream(streamId, recordedEvents);
}

// Aggregate rehydration — replay events to rebuild current state
public sealed class Prescription
{
    public PrescriptionId     Id       { get; private set; } = null!;
    public PrescriptionStatus Status   { get; private set; }
    public DosageValue?       Dose     { get; private set; }
    private int               _version = -1;

    // Reconstitute from events
    public static Prescription Rehydrate(EventStream stream)
    {
        var prescription = new Prescription();
        foreach (var recorded in stream.Events)
        {
            var @event = Deserialize(recorded);
            prescription.Apply(@event);
            prescription._version = recorded.Version;
        }
        return prescription;
    }

    private void Apply(object @event)
    {
        switch (@event)
        {
            case PrescriptionCreatedEvent e:
                Id     = PrescriptionId.Of(e.Id);
                Status = PrescriptionStatus.Draft;
                Dose   = DosageValue.Of(e.DoseAmount, e.DoseUnit);
                break;
            case PrescriptionApprovedEvent e:
                Status = PrescriptionStatus.Approved;
                break;
            case PrescriptionSuspendedEvent e:
                Status = PrescriptionStatus.Suspended;
                break;
        }
    }
}

Optimistic Concurrency

The UNIQUE constraint on (stream_id, stream_version) enforces concurrency at the DB level.

Scenario: two concurrent writes to prescription stream at version 5
  Writer A reads stream, sees version 5, tries to append version 6
  Writer B reads stream, sees version 5, tries to append version 6
  → Only one INSERT can succeed
  → The second throws a unique constraint violation
  → Application catches and retries with fresh state

This prevents two users from simultaneously approving the same prescription
and both succeeding — only the first commit wins.

In the event store, pass the last known version as expectedVersion:
  await _eventStore.AppendAsync("prescription-abc", newEvents, expectedVersion: 5, ct);
  // Throws OptimisticConcurrencyException if stream is not at version 5

Production issue I've seen: An event store implementation I reviewed did not enforce stream versioning — it just appended events with INSERT and no version check. Two concurrent requests both appended a PrescriptionApprovedEvent to the same stream. The aggregate now had two approval events. Projections that assumed single approval broke. The audit log showed double approvals. Retroactively cleaning up duplicate events required a custom migration that ran for hours. The unique constraint on (stream_id, stream_version) takes 30 seconds to add and prevents the entire class of concurrency bugs.


Key Takeaway

An event store appends immutable events and never updates or deletes rows. Current state is derived by replaying events — the event stream is the source of truth. Enforce optimistic concurrency with a UNIQUE constraint on (stream_id, stream_version) and pass the expected version on every append. Each aggregate has its own stream, keyed by type and ID (e.g., prescription-abc-123). Rehydration replays the stream in order through Apply() methods to rebuild current state.