Learnixo

Event Sourcing in .NET · Lesson 4 of 6

Snapshots — Optimizing Aggregate Reconstruction

The Event Replay Problem

Event sourcing: current state = replay all events from the beginning.

For a new aggregate: 3 events. Replay takes microseconds.

For a long-lived aggregate:
  A Patient aggregate that has been in the system for 5 years:
  → 1,247 events: admissions, discharges, prescription changes, ward transfers
  → Every read of this patient's state replays 1,247 events
  → At 500 concurrent users: 623,500 event deserializations per second
  → Response time for "get patient current state": 1-2 seconds instead of 5ms

Snapshots solve this:
  → Periodically capture the full current state as a snapshot
  → On next read: load the snapshot, then replay ONLY events after the snapshot
  → Instead of replaying 1,247 events, replay the last 47 (since the last snapshot)

Snapshot Storage Schema

SQL
CREATE TABLE event_store.snapshots (
    stream_id       NVARCHAR(500)   NOT NULL,
    stream_version  INT             NOT NULL,   -- version at which snapshot was taken
    aggregate_type  NVARCHAR(500)   NOT NULL,
    snapshot_data   NVARCHAR(MAX)   NOT NULL,   -- serialized state
    created_at      DATETIME2       NOT NULL DEFAULT SYSUTCDATETIME(),

    CONSTRAINT PK_snapshots PRIMARY KEY (stream_id, stream_version)
);

CREATE INDEX IX_snapshots_stream_latest
    ON event_store.snapshots (stream_id, stream_version DESC);
-- Allows efficient: SELECT TOP 1 ... WHERE stream_id = @id ORDER BY stream_version DESC

Taking a Snapshot

C#
public interface ISnapshotStore
{
    Task SaveAsync<T>(string streamId, int version, T state, CancellationToken ct = default);
    Task<SnapshotResult<T>?> LoadLatestAsync<T>(string streamId, CancellationToken ct = default);
}

public sealed record SnapshotResult<T>(T State, int Version);

public sealed class SqlSnapshotStore : ISnapshotStore
{
    public async Task SaveAsync<T>(
        string streamId, int version, T state, CancellationToken ct = default)
    {
        await _conn.ExecuteAsync("""
            MERGE event_store.snapshots AS target
            USING (VALUES (@streamId, @version, @aggregateType, @snapshotData))
                  AS source (stream_id, stream_version, aggregate_type, snapshot_data)
            ON target.stream_id = source.stream_id
               AND target.stream_version = source.stream_version
            WHEN NOT MATCHED THEN
                INSERT (stream_id, stream_version, aggregate_type, snapshot_data)
                VALUES (source.stream_id, source.stream_version,
                        source.aggregate_type, source.snapshot_data);
            """,
            new
            {
                streamId,
                version,
                aggregateType = typeof(T).Name,
                snapshotData  = JsonSerializer.Serialize(state)
            });
    }

    public async Task<SnapshotResult<T>?> LoadLatestAsync<T>(
        string streamId, CancellationToken ct = default)
    {
        var row = await _conn.QuerySingleOrDefaultAsync<SnapshotRow>("""
            SELECT TOP 1 stream_version, snapshot_data
            FROM   event_store.snapshots
            WHERE  stream_id = @streamId
            ORDER  BY stream_version DESC
            """, new { streamId });

        if (row is null) return null;
        var state = JsonSerializer.Deserialize<T>(row.SnapshotData)!;
        return new SnapshotResult<T>(state, row.StreamVersion);
    }
}

Rehydrating with a Snapshot

C#
public sealed class PrescriptionRepository
{
    private const int SnapshotThreshold = 50; // snapshot every 50 events

    public async Task<Prescription?> GetByIdAsync(PrescriptionId id, CancellationToken ct)
    {
        var streamId = $"prescription-{id.Value}";

        // 1. Try to load the latest snapshot
        var snapshot = await _snapshotStore.LoadLatestAsync<PrescriptionState>(streamId, ct);

        int fromVersion;
        Prescription prescription;

        if (snapshot is not null)
        {
            // Restore state from snapshot
            prescription = Prescription.FromSnapshot(snapshot.State);
            fromVersion  = snapshot.Version + 1;
        }
        else
        {
            // No snapshot — start from version 0
            prescription = new Prescription();
            fromVersion  = 0;
        }

        // 2. Load only events AFTER the snapshot version
        var stream = await _eventStore.ReadStreamFromAsync(streamId, fromVersion, ct);

        if (!stream.Events.Any() && snapshot is null)
            return null;

        // 3. Apply remaining events
        foreach (var recorded in stream.Events)
            prescription.Apply(EventDeserializer.Deserialize(recorded));

        // 4. Take a new snapshot if we've accumulated enough events since last snapshot
        var eventsSinceSnapshot = stream.Events.Count;
        if (eventsSinceSnapshot >= SnapshotThreshold)
            await TakeSnapshotAsync(prescription, streamId, ct);

        return prescription;
    }

    private async Task TakeSnapshotAsync(
        Prescription prescription, string streamId, CancellationToken ct)
    {
        var state = prescription.ToSnapshot();
        await _snapshotStore.SaveAsync(streamId, prescription.Version, state, ct);
    }
}

Snapshot Frequency Strategies

Strategy 1: Every N events (simple)
  → Take a snapshot every 50, 100, or 500 events
  → Worst case: replay N-1 events
  → Easy to implement, predictable cost

Strategy 2: On aggregate save (aggressive)
  → Snapshot every time the aggregate is saved
  → Fastest rehydration (only events since last write)
  → More snapshot writes; snapshot table grows fast for write-heavy aggregates

Strategy 3: Time-based (background job)
  → A background job snapshots long streams every hour/day
  → No snapshot overhead on the write path
  → Rehydration may still be slow if a stream has grown between job runs

Recommendation for clinical systems:
  → Every 100 events for Prescription aggregates (infrequent writes)
  → Every 500 events for Patient aggregates (very frequent, long-lived)
  → Take snapshot on background job, not in the request path

When NOT to Use Snapshots

You don't need snapshots if:
  → Streams are short — aggregates rarely exceed 100 events in their lifetime
  → Aggregates are short-lived — a shopping cart lives hours, not years
  → You have projections serving most read queries — aggregates are only loaded for writes

Snapshots add complexity:
  → Snapshot serialization format must be versioned alongside events
  → If you change the aggregate's structure, old snapshots may not deserialize correctly
  → You need snapshot pruning strategy (keep last 3 snapshots per stream)

Measure first:
  → Add logging to track stream length on rehydration
  → If p95 stream length is under 200, snapshots are premature optimisation
  → If p95 stream length exceeds 500, snapshots are necessary

Production issue I've seen: A clinical platform using event sourcing started noticing patient record load times exceeding 3 seconds under moderate load. Investigation revealed some patient aggregates had over 4,000 events (5+ years of admissions, discharges, ward transfers). The team had not implemented snapshots because "streams seemed short during development." Production data accumulated 40x more events than test scenarios. Adding snapshots with a 200-event threshold and a nightly background snapshot job brought p95 load time from 3,200ms back to under 80ms.


Key Takeaway

Snapshots avoid replaying long event streams by storing the aggregate's state at a specific version, then replaying only events after that point. Store snapshots in a separate table with (stream_id, stream_version). On rehydration: load the latest snapshot, then read only events from snapshot.version + 1 onward. Take snapshots every N events or in a background job — not necessarily on every write. Measure stream lengths in production before adding snapshot complexity; for most systems, streams stay short enough that snapshots aren't needed.