Learnixo
Back to blog
System Designadvanced

System Design: Event Journal and Audit Trail in .NET — Append-Only Storage, Retention, and Point-in-Time Queries

Design a high-throughput audit journal in .NET: append-only ledger with PostgreSQL, write batching, retention policies with table partitioning, point-in-time entity reconstruction, hash-chain tamper detection, and the event sourcing vs audit log trade-off.

Asma Hafeez KhanMay 26, 202620 min read
C#.NETAuditEvent SourcingJournalAppend-OnlyPostgreSQLSystem DesignCase Study
Share:𝕏

Event Journal and Audit Trail in .NET

Every production system eventually gets this requirement: "We need to know who changed what, and when." Sometimes it comes from compliance. Sometimes it comes after an incident where no one can explain how an order ended up in a corrupted state. Sometimes a regulator asks for a 7-year history of financial transactions.

The naive response is to add CreatedBy, ModifiedBy, and ModifiedAt columns to every table. That tells you the last change. It tells you nothing about the 47 changes before it.

A proper audit journal is a different beast. It is an append-only ledger of every state transition in your system. You never update or delete journal entries — not for performance, not for correctness, not for any reason. The journal is the forensic record. It is also the foundation for point-in-time queries, tamper detection, and regulatory retention.

This article walks through a production-grade design for a high-throughput audit journal in .NET 9 with PostgreSQL, covering the data model, hash-chain integrity, write batching at 10K events per second, table partitioning for retention, and point-in-time entity reconstruction.


The Fundamental Rule: Immutability Is Not Optional

Before writing a single line of code, internalize this constraint. Journal entries are never updated or deleted. Not by the application. Not by a migration script. Not by a hotfix at 2 AM.

The reason is forensic integrity. The moment you allow updates, you lose the ability to prove what happened. A tampered audit log is worse than no audit log, because it creates false confidence. Courts, regulators, and incident post-mortems all depend on the log being the ground truth.

In practice this means:

  • No UPDATE statements touch the journal table — enforced in code and by PostgreSQL row-level security if needed
  • No DELETE statements touch the journal table — retention is handled by dropping whole partitions, not deleting individual rows
  • The EF Core model for JournalEntry has saves intercepted to block any mutation
  • Schema migrations never alter existing rows

The hash chain (covered below) provides mathematical proof that no row was tampered with after insertion.


Data Model

Every state change in the system maps to a JournalEntry. The fields are chosen deliberately:

C#
public sealed class JournalEntry
{
    public long Id { get; private set; }             // auto-increment sequence, never a GUID
    public Guid TenantId { get; private set; }
    public string EntityType { get; private set; }   // "Order", "Invoice", "User"
    public Guid EntityId { get; private set; }
    public string EventType { get; private set; }    // "OrderPlaced", "OrderCancelled"
    public Guid ActorId { get; private set; }        // user or service account that triggered it
    public string ActorKind { get; private set; }    // "User", "ServiceAccount", "System"
    public string PayloadJson { get; private set; }  // serialized diff or full snapshot
    public string? SourceIp { get; private set; }
    public string? CorrelationId { get; private set; }
    public DateTime OccurredAt { get; private set; } // UTC, immutable
    public long SequenceNumber { get; private set; } // monotonic per entity
    public string? PreviousHash { get; private set; } // hash of the previous entry for this entity
    public string Hash { get; private set; }         // SHA-256 of PreviousHash + PayloadJson

    // Private constructor — created only through the factory
    private JournalEntry() { }

    public static JournalEntry Create(
        Guid tenantId,
        string entityType,
        Guid entityId,
        string eventType,
        Guid actorId,
        string actorKind,
        string payloadJson,
        string? sourceIp,
        string? correlationId,
        DateTime occurredAt,
        long sequenceNumber,
        string? previousHash)
    {
        var entry = new JournalEntry
        {
            TenantId = tenantId,
            EntityType = entityType,
            EntityId = entityId,
            EventType = eventType,
            ActorId = actorId,
            ActorKind = actorKind,
            PayloadJson = payloadJson,
            SourceIp = sourceIp,
            CorrelationId = correlationId,
            OccurredAt = occurredAt,
            SequenceNumber = sequenceNumber,
            PreviousHash = previousHash,
        };

        entry.Hash = entry.ComputeHash();
        return entry;
    }

    private string ComputeHash()
    {
        // The hash covers the chain link (previous hash) plus the full payload.
        // Changing any field in a past entry breaks the chain from that point forward.
        var input = $"{PreviousHash ?? "GENESIS"}|{EntityType}|{EntityId}|{EventType}" +
                    $"|{SequenceNumber}|{OccurredAt:O}|{PayloadJson}";

        var bytes = SHA256.HashData(Encoding.UTF8.GetBytes(input));
        return Convert.ToHexString(bytes).ToLowerInvariant();
    }
}

The Id is a long, not a Guid. At 10K inserts per second a sequential integer is significantly cheaper for PostgreSQL B-tree indexes. Use a BIGSERIAL or a sequence shared across partitions.

SequenceNumber is per entity, not global. It lets you detect gaps in an entity's history without scanning the entire table.


EF Core Configuration: Enforcing Append-Only

EF Core will happily issue UPDATE and DELETE for any entity you track. You must block this at the interceptor level so a future developer does not accidentally break the invariant.

C#
public sealed class JournalEntryConfiguration : IEntityTypeConfiguration<JournalEntry>
{
    public void Configure(EntityTypeBuilder<JournalEntry> builder)
    {
        builder.ToTable("journal_entries");

        builder.HasKey(e => e.Id);

        builder.Property(e => e.Id)
            .UseIdentityAlwaysColumn(); // PostgreSQL GENERATED ALWAYS AS IDENTITY

        builder.Property(e => e.TenantId).IsRequired();
        builder.Property(e => e.EntityType).HasMaxLength(100).IsRequired();
        builder.Property(e => e.EntityId).IsRequired();
        builder.Property(e => e.EventType).HasMaxLength(100).IsRequired();
        builder.Property(e => e.ActorId).IsRequired();
        builder.Property(e => e.ActorKind).HasMaxLength(50).IsRequired();
        builder.Property(e => e.PayloadJson).HasColumnType("jsonb").IsRequired();
        builder.Property(e => e.SourceIp).HasMaxLength(45);
        builder.Property(e => e.CorrelationId).HasMaxLength(100);
        builder.Property(e => e.OccurredAt).IsRequired();
        builder.Property(e => e.SequenceNumber).IsRequired();
        builder.Property(e => e.PreviousHash).HasMaxLength(64);
        builder.Property(e => e.Hash).HasMaxLength(64).IsRequired();

        // Composite index for entity history queries
        builder.HasIndex(e => new { e.EntityType, e.EntityId, e.OccurredAt });

        // Index for tenant-scoped queries
        builder.HasIndex(e => new { e.TenantId, e.OccurredAt });
    }
}

Now the interceptor that blocks mutations:

C#
public sealed class AppendOnlyJournalInterceptor : SaveChangesInterceptor
{
    public override InterceptionResult<int> SavingChanges(
        DbContextEventData eventData,
        InterceptionResult<int> result)
    {
        ThrowIfJournalMutated(eventData.Context);
        return base.SavingChanges(eventData, result);
    }

    public override ValueTask<InterceptionResult<int>> SavingChangesAsync(
        DbContextEventData eventData,
        InterceptionResult<int> result,
        CancellationToken cancellationToken = default)
    {
        ThrowIfJournalMutated(eventData.Context);
        return base.SavingChangesAsync(eventData, result, cancellationToken);
    }

    private static void ThrowIfJournalMutated(DbContext? context)
    {
        if (context is null) return;

        var illegal = context.ChangeTracker
            .Entries<JournalEntry>()
            .Where(e => e.State is EntityState.Modified or EntityState.Deleted)
            .ToList();

        if (illegal.Count > 0)
        {
            throw new InvalidOperationException(
                $"Journal entries are immutable. Attempted to {illegal[0].State} " +
                $"entry with Id={illegal[0].Entity.Id}. " +
                "Audit journal entries may only be inserted, never modified or deleted.");
        }
    }
}

Register the interceptor when configuring EF Core:

C#
services.AddDbContext<JournalDbContext>((sp, options) =>
{
    options.UseNpgsql(connectionString)
           .AddInterceptors(sp.GetRequiredService<AppendOnlyJournalInterceptor>());
});

services.AddSingleton<AppendOnlyJournalInterceptor>();

The Hash Chain: Tamper Detection Without a Blockchain

Each entry's Hash covers the previous entry's hash plus the current payload. If anyone modifies row 500 in the database directly, the hash stored in row 501 no longer matches, and the chain is broken from that point forward.

Validation is straightforward and can run as an offline audit job:

C#
public sealed class HashChainValidator
{
    private readonly ILogger<HashChainValidator> _logger;

    public HashChainValidator(ILogger<HashChainValidator> logger)
    {
        _logger = logger;
    }

    public async Task<HashChainValidationResult> ValidateEntityChainAsync(
        IAsyncEnumerable<JournalEntry> entriesInOrder,
        CancellationToken ct)
    {
        string? expectedPreviousHash = null;
        long entryCount = 0;
        long? firstBreakAtSequence = null;

        await foreach (var entry in entriesInOrder.WithCancellation(ct))
        {
            entryCount++;

            // Check the link to the previous entry
            if (entry.PreviousHash != expectedPreviousHash)
            {
                firstBreakAtSequence ??= entry.SequenceNumber;
                _logger.LogWarning(
                    "Hash chain break detected. Entity={EntityType}/{EntityId} " +
                    "Sequence={Sequence}. Expected PreviousHash={Expected} but found {Actual}",
                    entry.EntityType, entry.EntityId, entry.SequenceNumber,
                    expectedPreviousHash, entry.PreviousHash);
            }

            // Recompute the hash to verify the entry itself was not tampered with
            var recomputed = RecomputeHash(entry);
            if (recomputed != entry.Hash)
            {
                firstBreakAtSequence ??= entry.SequenceNumber;
                _logger.LogWarning(
                    "Hash mismatch for entry. Entity={EntityType}/{EntityId} " +
                    "Sequence={Sequence}. Stored={Stored} Computed={Computed}",
                    entry.EntityType, entry.EntityId, entry.SequenceNumber,
                    entry.Hash, recomputed);
            }

            expectedPreviousHash = entry.Hash;
        }

        return new HashChainValidationResult(
            IsValid: firstBreakAtSequence is null,
            EntriesChecked: entryCount,
            FirstBreakAtSequence: firstBreakAtSequence);
    }

    private static string RecomputeHash(JournalEntry entry)
    {
        var input = $"{entry.PreviousHash ?? "GENESIS"}|{entry.EntityType}|{entry.EntityId}" +
                    $"|{entry.EventType}|{entry.SequenceNumber}|{entry.OccurredAt:O}|{entry.PayloadJson}";

        var bytes = SHA256.HashData(Encoding.UTF8.GetBytes(input));
        return Convert.ToHexString(bytes).ToLowerInvariant();
    }
}

public sealed record HashChainValidationResult(
    bool IsValid,
    long EntriesChecked,
    long? FirstBreakAtSequence);

The validation job can run nightly against a random sample of entities or against partitions that are about to be archived. If the chain is broken, you have forensic proof of tampering.


Write Path: High-Frequency Batching with Channel

At 10,000 events per second, inserting individual rows is not viable. A single PostgreSQL INSERT round trip takes 1–5 ms. At 10K/s you need batching.

The design uses Channel<JournalEntry> as a bounded in-memory queue. Business logic writes to the channel without blocking. A background writer drains the channel and flushes to PostgreSQL using the COPY protocol via Npgsql, which is the fastest possible bulk insert path.

C#
public interface IJournalWriter
{
    ValueTask WriteAsync(JournalEntry entry, CancellationToken ct = default);
}

public sealed class JournalWriter : IJournalWriter, IAsyncDisposable
{
    private readonly Channel<JournalEntry> _channel;
    private readonly ILogger<JournalWriter> _logger;
    private readonly Task _flushLoop;
    private readonly CancellationTokenSource _cts = new();

    // DLQ callback — called when the channel is full and entries are dropped
    private readonly Action<JournalEntry>? _deadLetterCallback;

    public JournalWriter(
        ILogger<JournalWriter> logger,
        IJournalRepository repository,
        Action<JournalEntry>? deadLetterCallback = null)
    {
        _logger = logger;
        _deadLetterCallback = deadLetterCallback;

        // Bounded channel: 10,000 entries in memory max.
        // If the writer falls behind, new writes drop rather than block business logic.
        _channel = Channel.CreateBounded<JournalEntry>(new BoundedChannelOptions(10_000)
        {
            FullMode = BoundedChannelFullMode.DropWrite,
            SingleWriter = false,
            SingleReader = true
        });

        _flushLoop = RunFlushLoopAsync(repository, _cts.Token);
    }

    public ValueTask WriteAsync(JournalEntry entry, CancellationToken ct = default)
    {
        if (!_channel.Writer.TryWrite(entry))
        {
            // Channel is full — business logic must not be blocked.
            // Route to DLQ so the event is not silently lost.
            _logger.LogError(
                "Journal channel full — dropping entry. EntityType={EntityType} EntityId={EntityId} " +
                "EventType={EventType}. Configure alerting on this log event.",
                entry.EntityType, entry.EntityId, entry.EventType);

            _deadLetterCallback?.Invoke(entry);
        }

        return ValueTask.CompletedTask;
    }

    private async Task RunFlushLoopAsync(
        IJournalRepository repository,
        CancellationToken ct)
    {
        var batch = new List<JournalEntry>(capacity: 1_000);

        while (!ct.IsCancellationRequested)
        {
            batch.Clear();

            // Wait for at least one entry, then drain up to 1,000 or 100ms, whichever first
            try
            {
                // Block until an entry is available
                var first = await _channel.Reader.ReadAsync(ct);
                batch.Add(first);

                // Drain the rest without blocking
                using var timeout = new CancellationTokenSource(TimeSpan.FromMilliseconds(100));
                using var linked = CancellationTokenSource.CreateLinkedTokenSource(ct, timeout.Token);

                while (batch.Count < 1_000)
                {
                    if (!_channel.Reader.TryRead(out var next)) break;
                    batch.Add(next);
                }
            }
            catch (OperationCanceledException) when (!ct.IsCancellationRequested)
            {
                // Timeout expired — flush whatever we have
            }

            if (batch.Count > 0)
            {
                await FlushBatchAsync(repository, batch, ct);
            }
        }

        // Drain remaining entries on shutdown
        while (_channel.Reader.TryRead(out var remaining))
        {
            batch.Add(remaining);
        }

        if (batch.Count > 0)
        {
            await FlushBatchAsync(repository, batch, CancellationToken.None);
        }
    }

    private async Task FlushBatchAsync(
        IJournalRepository repository,
        List<JournalEntry> batch,
        CancellationToken ct)
    {
        try
        {
            await repository.BulkInsertAsync(batch, ct);
            _logger.LogDebug("Flushed {Count} journal entries", batch.Count);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to flush journal batch of {Count} entries", batch.Count);
            // In a real system: write to a local file DLQ, retry with backoff,
            // or publish to a dead-letter topic. Do not silently drop.
        }
    }

    public async ValueTask DisposeAsync()
    {
        _channel.Writer.Complete();
        await _cts.CancelAsync();
        await _flushLoop;
        _cts.Dispose();
    }
}

The Npgsql COPY-based bulk insert:

C#
public sealed class PostgresJournalRepository : IJournalRepository
{
    private readonly string _connectionString;

    public PostgresJournalRepository(string connectionString)
    {
        _connectionString = connectionString;
    }

    public async Task BulkInsertAsync(IReadOnlyList<JournalEntry> entries, CancellationToken ct)
    {
        await using var conn = new NpgsqlConnection(_connectionString);
        await conn.OpenAsync(ct);

        // COPY is 10-20x faster than batched INSERTs for large volumes
        await using var writer = await conn.BeginBinaryImportAsync(
            "COPY journal_entries " +
            "(tenant_id, entity_type, entity_id, event_type, actor_id, actor_kind, " +
            "payload_json, source_ip, correlation_id, occurred_at, sequence_number, " +
            "previous_hash, hash) " +
            "FROM STDIN (FORMAT BINARY)",
            ct);

        foreach (var e in entries)
        {
            await writer.StartRowAsync(ct);
            await writer.WriteAsync(e.TenantId, NpgsqlDbType.Uuid, ct);
            await writer.WriteAsync(e.EntityType, NpgsqlDbType.Varchar, ct);
            await writer.WriteAsync(e.EntityId, NpgsqlDbType.Uuid, ct);
            await writer.WriteAsync(e.EventType, NpgsqlDbType.Varchar, ct);
            await writer.WriteAsync(e.ActorId, NpgsqlDbType.Uuid, ct);
            await writer.WriteAsync(e.ActorKind, NpgsqlDbType.Varchar, ct);
            await writer.WriteAsync(e.PayloadJson, NpgsqlDbType.Jsonb, ct);
            await writer.WriteAsync(e.SourceIp ?? (object)DBNull.Value, NpgsqlDbType.Varchar, ct);
            await writer.WriteAsync(e.CorrelationId ?? (object)DBNull.Value, NpgsqlDbType.Varchar, ct);
            await writer.WriteAsync(e.OccurredAt, NpgsqlDbType.TimestampTz, ct);
            await writer.WriteAsync(e.SequenceNumber, NpgsqlDbType.Bigint, ct);
            await writer.WriteAsync(e.PreviousHash ?? (object)DBNull.Value, NpgsqlDbType.Varchar, ct);
            await writer.WriteAsync(e.Hash, NpgsqlDbType.Varchar, ct);
        }

        await writer.CompleteAsync(ct);
    }
}

Journal writes must never block business logic

This is the critical operational constraint. The JournalWriter.WriteAsync call is fire-and-forget at the channel level. Business logic registers the journal event and moves on. If the channel is full — because the flush loop is behind — the entry is routed to a dead-letter callback rather than blocking the HTTP request.

Configure an alert on the LogError in the full-channel path. If that fires in production, your flush loop is too slow and you need to scale the writer or increase the batch size.

The DLQ callback can write to a local file, a Redis list, or a message broker topic. A reasonable minimum is:

C#
Action<JournalEntry> deadLetter = entry =>
{
    // Write to a local overflow file so entries survive a restart
    var line = JsonSerializer.Serialize(entry);
    File.AppendAllText("/var/log/journal-dlq.jsonl", line + "\n");
};

A separate recovery job replays the DLQ into the journal once the flush loop catches up.


Table Partitioning for Retention

A journal table accumulates data indefinitely. At 10K events/second that is 864 million rows per day. You cannot query that without partitioning.

SQL
CREATE TABLE journal_entries (
    id              BIGINT GENERATED ALWAYS AS IDENTITY,
    tenant_id       UUID NOT NULL,
    entity_type     VARCHAR(100) NOT NULL,
    entity_id       UUID NOT NULL,
    event_type      VARCHAR(100) NOT NULL,
    actor_id        UUID NOT NULL,
    actor_kind      VARCHAR(50) NOT NULL,
    payload_json    JSONB NOT NULL,
    source_ip       VARCHAR(45),
    correlation_id  VARCHAR(100),
    occurred_at     TIMESTAMPTZ NOT NULL,
    sequence_number BIGINT NOT NULL,
    previous_hash   VARCHAR(64),
    hash            VARCHAR(64) NOT NULL
) PARTITION BY RANGE (occurred_at);

-- Create partitions monthly
CREATE TABLE journal_entries_2025_01
    PARTITION OF journal_entries
    FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');

CREATE TABLE journal_entries_2025_02
    PARTITION OF journal_entries
    FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');

PostgreSQL's partition pruning means a query with WHERE occurred_at BETWEEN '2025-01-01' AND '2025-01-31' only scans the January partition. A full-table sequential scan never happens for time-bounded queries.

The PartitionMaintenanceWorker creates next month's partition and drops or archives old ones:

C#
public sealed class PartitionMaintenanceWorker : BackgroundService
{
    private readonly ILogger<PartitionMaintenanceWorker> _logger;
    private readonly NpgsqlDataSource _dataSource;
    private readonly RetentionConfiguration _retention;
    private readonly IBlobArchiver _archiver;

    public PartitionMaintenanceWorker(
        ILogger<PartitionMaintenanceWorker> logger,
        NpgsqlDataSource dataSource,
        RetentionConfiguration retention,
        IBlobArchiver archiver)
    {
        _logger = logger;
        _dataSource = dataSource;
        _retention = retention;
        _archiver = archiver;
    }

    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        // Run once a day at 02:00 UTC
        while (!ct.IsCancellationRequested)
        {
            var now = DateTime.UtcNow;
            var nextRun = now.Date.AddDays(1).AddHours(2);
            await Task.Delay(nextRun - now, ct);

            await CreateNextMonthPartitionAsync(ct);
            await ArchiveAndDropExpiredPartitionsAsync(ct);
        }
    }

    private async Task CreateNextMonthPartitionAsync(CancellationToken ct)
    {
        // Always keep two months ahead
        var target = new DateTime(DateTime.UtcNow.Year, DateTime.UtcNow.Month, 1).AddMonths(2);
        var partitionName = $"journal_entries_{target:yyyy_MM}";
        var from = target.ToString("yyyy-MM-dd");
        var to = target.AddMonths(1).ToString("yyyy-MM-dd");

        await using var conn = await _dataSource.OpenConnectionAsync(ct);
        await using var cmd = conn.CreateCommand();
        cmd.CommandText = $"""
            CREATE TABLE IF NOT EXISTS {partitionName}
                PARTITION OF journal_entries
                FOR VALUES FROM ('{from}') TO ('{to}');
            """;

        await cmd.ExecuteNonQueryAsync(ct);
        _logger.LogInformation("Ensured partition {Name} exists", partitionName);
    }

    private async Task ArchiveAndDropExpiredPartitionsAsync(CancellationToken ct)
    {
        // Retention: financial entries kept 7 years, default 1 year
        var cutoff = DateTime.UtcNow.AddYears(-_retention.DefaultRetentionYears);

        // List all partitions older than the cutoff
        await using var conn = await _dataSource.OpenConnectionAsync(ct);
        await using var listCmd = conn.CreateCommand();
        listCmd.CommandText = """
            SELECT child.relname
            FROM pg_inherits
            JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
            JOIN pg_class child  ON pg_inherits.inhrelid  = child.oid
            WHERE parent.relname = 'journal_entries'
            ORDER BY child.relname;
            """;

        var partitions = new List<string>();
        await using var reader = await listCmd.ExecuteReaderAsync(ct);
        while (await reader.ReadAsync(ct))
        {
            partitions.Add(reader.GetString(0));
        }

        foreach (var partition in partitions)
        {
            // Parse partition date from name, e.g. journal_entries_2023_01
            if (!TryParsePartitionDate(partition, out var partitionMonth)) continue;
            if (partitionMonth >= cutoff) continue;

            _logger.LogInformation("Archiving expired partition {Partition}", partition);
            await _archiver.ArchivePartitionAsync(partition, ct);

            await using var dropCmd = conn.CreateCommand();
            dropCmd.CommandText = $"DROP TABLE {partition};";
            await dropCmd.ExecuteNonQueryAsync(ct);

            _logger.LogInformation("Dropped expired partition {Partition}", partition);
        }
    }

    private static bool TryParsePartitionDate(string partitionName, out DateTime date)
    {
        // journal_entries_2023_01 → 2023-01-01
        var parts = partitionName.Split('_');
        if (parts.Length >= 4
            && int.TryParse(parts[^2], out var year)
            && int.TryParse(parts[^1], out var month))
        {
            date = new DateTime(year, month, 1, 0, 0, 0, DateTimeKind.Utc);
            return true;
        }

        date = default;
        return false;
    }
}

Point-in-Time Queries: What Was the State at a Given Timestamp?

"What was the state of Order abc-123 at 2025-03-15 14:32:00 UTC?" is a question no ModifiedAt column can answer. The journal can.

The naive approach replays all events from the beginning of time up to the target timestamp. For entities with thousands of events this is slow. The solution is a snapshot table: periodically checkpoint the entity's state at a point in time, then replay only forward from the nearest snapshot.

C#
public sealed class PointInTimeReconstructor<TEntity>
    where TEntity : class, IReconstructable<TEntity>
{
    private readonly IJournalQueryRepository _journal;
    private readonly ISnapshotRepository<TEntity> _snapshots;

    public PointInTimeReconstructor(
        IJournalQueryRepository journal,
        ISnapshotRepository<TEntity> snapshots)
    {
        _journal = journal;
        _snapshots = snapshots;
    }

    public async Task<TEntity?> ReconstructAtAsync(
        string entityType,
        Guid entityId,
        DateTime pointInTime,
        CancellationToken ct)
    {
        // 1. Find the most recent snapshot at or before the target time
        var snapshot = await _snapshots.GetLatestBeforeAsync(entityType, entityId, pointInTime, ct);

        DateTime replayFrom;
        TEntity? state;

        if (snapshot is not null)
        {
            state = snapshot.State;
            replayFrom = snapshot.SnapshotAt;
        }
        else
        {
            // No snapshot — replay from the very beginning
            state = null;
            replayFrom = DateTime.MinValue;
        }

        // 2. Replay journal entries from the snapshot forward to pointInTime
        var events = _journal.GetEventsAsync(
            entityType, entityId,
            from: replayFrom,
            to: pointInTime,
            ct: ct);

        await foreach (var entry in events.WithCancellation(ct))
        {
            state = TEntity.Apply(state, entry);
        }

        return state;
    }
}

// The entity type implements this interface to define how events are applied
public interface IReconstructable<TSelf> where TSelf : class
{
    static abstract TSelf? Apply(TSelf? current, JournalEntry entry);
}

An example reconstruction for an Order:

C#
public sealed class Order : IReconstructable<Order>
{
    public Guid Id { get; private set; }
    public string Status { get; private set; } = string.Empty;
    public decimal Total { get; private set; }
    public List<string> Notes { get; private set; } = [];

    public static Order? Apply(Order? current, JournalEntry entry)
    {
        return entry.EventType switch
        {
            "OrderPlaced" => ApplyOrderPlaced(entry),
            "OrderConfirmed" => ApplyOrderConfirmed(current, entry),
            "OrderCancelled" => ApplyOrderCancelled(current, entry),
            "NoteAdded" => ApplyNoteAdded(current, entry),
            _ => current // unknown events are skipped, not thrown
        };
    }

    private static Order ApplyOrderPlaced(JournalEntry entry)
    {
        var payload = JsonSerializer.Deserialize<OrderPlacedPayload>(entry.PayloadJson)!;
        return new Order
        {
            Id = entry.EntityId,
            Status = "Placed",
            Total = payload.Total
        };
    }

    private static Order? ApplyOrderConfirmed(Order? current, JournalEntry entry)
    {
        if (current is null) return null;
        return current with { Status = "Confirmed" };
    }

    private static Order? ApplyOrderCancelled(Order? current, JournalEntry entry)
    {
        if (current is null) return null;
        return current with { Status = "Cancelled" };
    }

    private static Order? ApplyNoteAdded(Order? current, JournalEntry entry)
    {
        if (current is null) return null;
        var payload = JsonSerializer.Deserialize<NoteAddedPayload>(entry.PayloadJson)!;
        var notes = new List<string>(current.Notes) { payload.Note };
        return current with { Notes = notes };
    }
}

The snapshot table is written by a background job every N events or every M minutes, whichever comes first. The snapshot caps the worst-case replay window.


Retention Policies Per Entity Type

Not all data has the same retention requirement. Financial transactions stay for 7 years. Session events stay for 90 days. Temporary diagnostic events stay for 30 days.

The retention policy is applied at the partition level, not the row level. Rows are never deleted individually. Entire monthly partitions are dropped when they fall outside all retention windows.

C#
public sealed class RetentionConfiguration
{
    public Dictionary<string, TimeSpan> PolicyByEntityType { get; set; } = new()
    {
        ["Invoice"]     = TimeSpan.FromDays(365 * 7),
        ["Order"]       = TimeSpan.FromDays(365 * 7),
        ["Payment"]     = TimeSpan.FromDays(365 * 7),
        ["UserSession"] = TimeSpan.FromDays(90),
        ["DiagEvent"]   = TimeSpan.FromDays(30),
    };

    public TimeSpan DefaultRetentionYears
        => TimeSpan.FromDays(365); // 1 year for anything not explicitly configured

    public TimeSpan GetRetention(string entityType)
        => PolicyByEntityType.TryGetValue(entityType, out var policy)
            ? policy
            : DefaultRetentionYears;

    // A partition can only be dropped when ALL entity types it might contain
    // have exceeded their retention. In practice, the partition age is compared
    // against the minimum retention for safety.
    public TimeSpan SafeDropAge
        => PolicyByEntityType.Values
            .Append(DefaultRetentionYears)
            .Min();
}

The partition drop only happens when the partition's entire time range is beyond the safe drop age. A January 2023 partition is safe to drop once the shortest retention window (30 days for DiagEvent) has passed — but you still need to archive the financial rows first. The archiver extracts financial rows to Azure Blob Storage in JSONL format before the partition is dropped.


Event Sourcing vs Audit Log: When to Use Each

These two patterns look similar but solve different problems.

An audit log records changes to state that is managed somewhere else. The state lives in the orders table. The audit log records who changed it and what they changed. The audit log is a secondary record. The primary source of truth is the table.

Event sourcing makes events the primary source of truth. The orders table does not exist. You reconstruct order state by replaying events. The event store is the only place where order data lives.

The choice has significant consequences:

Audit log is appropriate when you have an existing relational data model you cannot or will not replace, when you need forensic history but not full state reconstruction for every read, and when your team is not ready for the operational complexity of a pure event-sourced system.

Event sourcing is appropriate when you need temporal queries as a first-class feature, when the domain models for reading and writing are genuinely different (CQRS), and when you need to rebuild read models from scratch because requirements change.

The design in this article is an audit log, not event sourcing. The order state lives in the orders table. The journal records every mutation to that table. You can reconstruct past state by replaying the journal, but that is an audit capability, not the primary data access pattern.

If you are building a greenfield system and the domain is inherently event-driven (e.g. financial transactions, healthcare FHIR events, supply chain movements), consider event sourcing. If you are adding audit capability to an existing system, build an append-only journal.


Wiring It Together

C#
// Program.cs
builder.Services.AddSingleton<RetentionConfiguration>();
builder.Services.AddSingleton<IBlobArchiver, AzureBlobArchiver>();
builder.Services.AddSingleton<IJournalRepository, PostgresJournalRepository>();
builder.Services.AddSingleton<IJournalWriter, JournalWriter>(sp =>
{
    var logger = sp.GetRequiredService<ILogger<JournalWriter>>();
    var repo = sp.GetRequiredService<IJournalRepository>();
    var metricsLogger = sp.GetRequiredService<ILogger<JournalWriter>>();

    return new JournalWriter(logger, repo, deadLetter: entry =>
    {
        // DLQ: write to local disk, pick up by recovery worker
        var json = JsonSerializer.Serialize(entry);
        File.AppendAllText("/var/log/journal-dlq.jsonl", json + "\n");
        metricsLogger.LogError("Journal DLQ: EntityType={T} EntityId={Id}", entry.EntityType, entry.EntityId);
    });
});

builder.Services.AddHostedService<PartitionMaintenanceWorker>();
builder.Services.AddScoped(typeof(PointInTimeReconstructor<>));
builder.Services.AddSingleton<HashChainValidator>();

Usage in a command handler:

C#
public sealed class CancelOrderHandler : IRequestHandler<CancelOrderCommand>
{
    private readonly AppDbContext _db;
    private readonly IJournalWriter _journal;
    private readonly IHttpContextAccessor _http;

    public CancelOrderHandler(AppDbContext db, IJournalWriter journal, IHttpContextAccessor http)
    {
        _db = db;
        _journal = journal;
        _http = http;
    }

    public async Task Handle(CancelOrderCommand request, CancellationToken ct)
    {
        var order = await _db.Orders.FindAsync([request.OrderId], ct)
            ?? throw new OrderNotFoundException(request.OrderId);

        var previousState = JsonSerializer.Serialize(order);
        order.Cancel(request.Reason);
        await _db.SaveChangesAsync(ct);

        // Journal write is fire-and-forget at the channel level —
        // it will not block or fail the HTTP response
        var entry = JournalEntry.Create(
            tenantId: request.TenantId,
            entityType: "Order",
            entityId: order.Id,
            eventType: "OrderCancelled",
            actorId: request.ActorId,
            actorKind: "User",
            payloadJson: JsonSerializer.Serialize(new
            {
                Reason = request.Reason,
                PreviousStatus = order.Status,
                PreviousState = previousState
            }),
            sourceIp: _http.HttpContext?.Connection.RemoteIpAddress?.ToString(),
            correlationId: _http.HttpContext?.TraceIdentifier,
            occurredAt: DateTime.UtcNow,
            sequenceNumber: order.JournalSequence,
            previousHash: order.LastJournalHash);

        await _journal.WriteAsync(entry, ct);
    }
}

Summary

A production audit journal is not a ModifiedAt column. It is an append-only ledger with forensic integrity enforced at multiple layers: application interceptors that block mutations, hash chains that detect tampering, and PostgreSQL partitioning that makes retention a matter of dropping a table rather than running long-running deletes.

The write path uses Channel<T> to decouple business logic from the flush loop, and Npgsql's COPY protocol to sustain high insert throughput. Point-in-time queries combine snapshots with forward replay so that "what was the state of X at time T" is always answerable without a full table scan.

The key decisions are: use sequential IDs for the journal, never allow updates or deletes at any layer, partition by month for retention, and never let the journal write block a business transaction.

Enjoyed this article?

Explore the System Design learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.