The Event Store — Persisting Events as the Source of Truth
Build and use an event store in .NET: appending events, reading streams, optimistic concurrency with expected version, and why events are the source of truth instead of current state.
State vs Events as Source of Truth
Traditional persistence (state store):
→ Store the CURRENT state of an entity
→ Update the row when state changes
→ History is lost — you know where you are, not how you got there
prescriptions table: { id, status: "Approved", inr_value: 2.3, updated_at: ... }
Question: "Was this prescription ever suspended?" → Cannot answer.
Event sourcing (event store):
→ Store every EVENT that changed the entity's state
→ Current state is DERIVED by replaying events
→ Full history is preserved forever
prescription-stream for id abc:
PrescriptionCreated { medication: "Warfarin", dose: 5mg }
PrescriptionSuspended { reason: "INR above range", inr: 4.1 }
PrescriptionResumed { new_dose: 3mg, inr: 2.4 }
PrescriptionApproved { approved_by: "Dr Smith" }
Question: "Was this prescription ever suspended?" → Yes, with full context.
Question: "What was the INR when it was suspended?" → 4.1
This is essential for MHRA audit in a clinical system.Event Store Schema
-- Minimal event store table in SQL Server
CREATE TABLE event_store.events (
id BIGINT IDENTITY(1,1) NOT NULL PRIMARY KEY,
stream_id NVARCHAR(500) NOT NULL, -- e.g. "prescription-abc-123"
stream_version INT NOT NULL, -- 0-based version within stream
event_type NVARCHAR(500) NOT NULL, -- fully qualified type name
event_data NVARCHAR(MAX) NOT NULL, -- JSON payload
metadata NVARCHAR(MAX) NULL, -- correlation ID, user, timestamp
created_at DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(),
CONSTRAINT UQ_stream_version UNIQUE (stream_id, stream_version)
-- Unique constraint enforces optimistic concurrency
);
CREATE INDEX IX_events_stream_id ON event_store.events (stream_id, stream_version);Appending Events
public interface IEventStore
{
Task AppendAsync(
string streamId,
IReadOnlyList<object> events,
int expectedVersion,
CancellationToken ct = default);
Task<EventStream> ReadStreamAsync(
string streamId,
CancellationToken ct = default);
}
public sealed class SqlEventStore : IEventStore
{
private readonly string _connectionString;
public async Task AppendAsync(
string streamId,
IReadOnlyList<object> events,
int expectedVersion,
CancellationToken ct = default)
{
await using var conn = new SqlConnection(_connectionString);
await conn.OpenAsync(ct);
await using var tx = await conn.BeginTransactionAsync(ct);
// Check current version (optimistic concurrency)
var currentVersion = await conn.QuerySingleOrDefaultAsync<int?>(
"SELECT MAX(stream_version) FROM event_store.events WHERE stream_id = @streamId",
new { streamId }, tx);
var actualVersion = currentVersion ?? -1;
if (actualVersion != expectedVersion)
throw new OptimisticConcurrencyException(
$"Expected version {expectedVersion} but stream is at {actualVersion}.");
// Append each event
var version = actualVersion + 1;
foreach (var @event in events)
{
await conn.ExecuteAsync("""
INSERT INTO event_store.events
(stream_id, stream_version, event_type, event_data, created_at)
VALUES (@streamId, @version, @eventType, @eventData, SYSUTCDATETIME())
""",
new
{
streamId,
version = version++,
eventType = @event.GetType().AssemblyQualifiedName,
eventData = JsonSerializer.Serialize(@event, @event.GetType())
}, tx);
}
await tx.CommitAsync(ct);
}
}Reading a Stream and Rehydrating an Aggregate
public async Task<EventStream> ReadStreamAsync(string streamId, CancellationToken ct = default)
{
await using var conn = new SqlConnection(_connectionString);
var rows = await conn.QueryAsync<EventRow>("""
SELECT stream_version, event_type, event_data, created_at
FROM event_store.events
WHERE stream_id = @streamId
ORDER BY stream_version ASC
""", new { streamId });
var recordedEvents = rows.Select(r => new RecordedEvent(
r.StreamVersion,
r.EventType,
r.EventData,
r.CreatedAt)).ToList();
return new EventStream(streamId, recordedEvents);
}
// Aggregate rehydration — replay events to rebuild current state
public sealed class Prescription
{
public PrescriptionId Id { get; private set; } = null!;
public PrescriptionStatus Status { get; private set; }
public DosageValue? Dose { get; private set; }
private int _version = -1;
// Reconstitute from events
public static Prescription Rehydrate(EventStream stream)
{
var prescription = new Prescription();
foreach (var recorded in stream.Events)
{
var @event = Deserialize(recorded);
prescription.Apply(@event);
prescription._version = recorded.Version;
}
return prescription;
}
private void Apply(object @event)
{
switch (@event)
{
case PrescriptionCreatedEvent e:
Id = PrescriptionId.Of(e.Id);
Status = PrescriptionStatus.Draft;
Dose = DosageValue.Of(e.DoseAmount, e.DoseUnit);
break;
case PrescriptionApprovedEvent e:
Status = PrescriptionStatus.Approved;
break;
case PrescriptionSuspendedEvent e:
Status = PrescriptionStatus.Suspended;
break;
}
}
}Optimistic Concurrency
The UNIQUE constraint on (stream_id, stream_version) enforces concurrency at the DB level.
Scenario: two concurrent writes to prescription stream at version 5
Writer A reads stream, sees version 5, tries to append version 6
Writer B reads stream, sees version 5, tries to append version 6
→ Only one INSERT can succeed
→ The second throws a unique constraint violation
→ Application catches and retries with fresh state
This prevents two users from simultaneously approving the same prescription
and both succeeding — only the first commit wins.
In the event store, pass the last known version as expectedVersion:
await _eventStore.AppendAsync("prescription-abc", newEvents, expectedVersion: 5, ct);
// Throws OptimisticConcurrencyException if stream is not at version 5Production issue I've seen: An event store implementation I reviewed did not enforce stream versioning — it just appended events with
INSERTand no version check. Two concurrent requests both appended aPrescriptionApprovedEventto the same stream. The aggregate now had two approval events. Projections that assumed single approval broke. The audit log showed double approvals. Retroactively cleaning up duplicate events required a custom migration that ran for hours. The unique constraint on(stream_id, stream_version)takes 30 seconds to add and prevents the entire class of concurrency bugs.
Key Takeaway
An event store appends immutable events and never updates or deletes rows. Current state is derived by replaying events — the event stream is the source of truth. Enforce optimistic concurrency with a UNIQUE constraint on
(stream_id, stream_version)and pass the expected version on every append. Each aggregate has its own stream, keyed by type and ID (e.g.,prescription-abc-123). Rehydration replays the stream in order throughApply()methods to rebuild current state.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.