System Design: Event Journal and Audit Trail in .NET — Append-Only Storage, Retention, and Point-in-Time Queries
Design a high-throughput audit journal in .NET: append-only ledger with PostgreSQL, write batching, retention policies with table partitioning, point-in-time entity reconstruction, hash-chain tamper detection, and the event sourcing vs audit log trade-off.
Event Journal and Audit Trail in .NET
Every production system eventually gets this requirement: "We need to know who changed what, and when." Sometimes it comes from compliance. Sometimes it comes after an incident where no one can explain how an order ended up in a corrupted state. Sometimes a regulator asks for a 7-year history of financial transactions.
The naive response is to add CreatedBy, ModifiedBy, and ModifiedAt columns to every table. That tells you the last change. It tells you nothing about the 47 changes before it.
A proper audit journal is a different beast. It is an append-only ledger of every state transition in your system. You never update or delete journal entries — not for performance, not for correctness, not for any reason. The journal is the forensic record. It is also the foundation for point-in-time queries, tamper detection, and regulatory retention.
This article walks through a production-grade design for a high-throughput audit journal in .NET 9 with PostgreSQL, covering the data model, hash-chain integrity, write batching at 10K events per second, table partitioning for retention, and point-in-time entity reconstruction.
The Fundamental Rule: Immutability Is Not Optional
Before writing a single line of code, internalize this constraint. Journal entries are never updated or deleted. Not by the application. Not by a migration script. Not by a hotfix at 2 AM.
The reason is forensic integrity. The moment you allow updates, you lose the ability to prove what happened. A tampered audit log is worse than no audit log, because it creates false confidence. Courts, regulators, and incident post-mortems all depend on the log being the ground truth.
In practice this means:
- No
UPDATEstatements touch the journal table — enforced in code and by PostgreSQL row-level security if needed - No
DELETEstatements touch the journal table — retention is handled by dropping whole partitions, not deleting individual rows - The EF Core model for
JournalEntryhas saves intercepted to block any mutation - Schema migrations never alter existing rows
The hash chain (covered below) provides mathematical proof that no row was tampered with after insertion.
Data Model
Every state change in the system maps to a JournalEntry. The fields are chosen deliberately:
public sealed class JournalEntry
{
public long Id { get; private set; } // auto-increment sequence, never a GUID
public Guid TenantId { get; private set; }
public string EntityType { get; private set; } // "Order", "Invoice", "User"
public Guid EntityId { get; private set; }
public string EventType { get; private set; } // "OrderPlaced", "OrderCancelled"
public Guid ActorId { get; private set; } // user or service account that triggered it
public string ActorKind { get; private set; } // "User", "ServiceAccount", "System"
public string PayloadJson { get; private set; } // serialized diff or full snapshot
public string? SourceIp { get; private set; }
public string? CorrelationId { get; private set; }
public DateTime OccurredAt { get; private set; } // UTC, immutable
public long SequenceNumber { get; private set; } // monotonic per entity
public string? PreviousHash { get; private set; } // hash of the previous entry for this entity
public string Hash { get; private set; } // SHA-256 of PreviousHash + PayloadJson
// Private constructor — created only through the factory
private JournalEntry() { }
public static JournalEntry Create(
Guid tenantId,
string entityType,
Guid entityId,
string eventType,
Guid actorId,
string actorKind,
string payloadJson,
string? sourceIp,
string? correlationId,
DateTime occurredAt,
long sequenceNumber,
string? previousHash)
{
var entry = new JournalEntry
{
TenantId = tenantId,
EntityType = entityType,
EntityId = entityId,
EventType = eventType,
ActorId = actorId,
ActorKind = actorKind,
PayloadJson = payloadJson,
SourceIp = sourceIp,
CorrelationId = correlationId,
OccurredAt = occurredAt,
SequenceNumber = sequenceNumber,
PreviousHash = previousHash,
};
entry.Hash = entry.ComputeHash();
return entry;
}
private string ComputeHash()
{
// The hash covers the chain link (previous hash) plus the full payload.
// Changing any field in a past entry breaks the chain from that point forward.
var input = $"{PreviousHash ?? "GENESIS"}|{EntityType}|{EntityId}|{EventType}" +
$"|{SequenceNumber}|{OccurredAt:O}|{PayloadJson}";
var bytes = SHA256.HashData(Encoding.UTF8.GetBytes(input));
return Convert.ToHexString(bytes).ToLowerInvariant();
}
}The Id is a long, not a Guid. At 10K inserts per second a sequential integer is significantly cheaper for PostgreSQL B-tree indexes. Use a BIGSERIAL or a sequence shared across partitions.
SequenceNumber is per entity, not global. It lets you detect gaps in an entity's history without scanning the entire table.
EF Core Configuration: Enforcing Append-Only
EF Core will happily issue UPDATE and DELETE for any entity you track. You must block this at the interceptor level so a future developer does not accidentally break the invariant.
public sealed class JournalEntryConfiguration : IEntityTypeConfiguration<JournalEntry>
{
public void Configure(EntityTypeBuilder<JournalEntry> builder)
{
builder.ToTable("journal_entries");
builder.HasKey(e => e.Id);
builder.Property(e => e.Id)
.UseIdentityAlwaysColumn(); // PostgreSQL GENERATED ALWAYS AS IDENTITY
builder.Property(e => e.TenantId).IsRequired();
builder.Property(e => e.EntityType).HasMaxLength(100).IsRequired();
builder.Property(e => e.EntityId).IsRequired();
builder.Property(e => e.EventType).HasMaxLength(100).IsRequired();
builder.Property(e => e.ActorId).IsRequired();
builder.Property(e => e.ActorKind).HasMaxLength(50).IsRequired();
builder.Property(e => e.PayloadJson).HasColumnType("jsonb").IsRequired();
builder.Property(e => e.SourceIp).HasMaxLength(45);
builder.Property(e => e.CorrelationId).HasMaxLength(100);
builder.Property(e => e.OccurredAt).IsRequired();
builder.Property(e => e.SequenceNumber).IsRequired();
builder.Property(e => e.PreviousHash).HasMaxLength(64);
builder.Property(e => e.Hash).HasMaxLength(64).IsRequired();
// Composite index for entity history queries
builder.HasIndex(e => new { e.EntityType, e.EntityId, e.OccurredAt });
// Index for tenant-scoped queries
builder.HasIndex(e => new { e.TenantId, e.OccurredAt });
}
}Now the interceptor that blocks mutations:
public sealed class AppendOnlyJournalInterceptor : SaveChangesInterceptor
{
public override InterceptionResult<int> SavingChanges(
DbContextEventData eventData,
InterceptionResult<int> result)
{
ThrowIfJournalMutated(eventData.Context);
return base.SavingChanges(eventData, result);
}
public override ValueTask<InterceptionResult<int>> SavingChangesAsync(
DbContextEventData eventData,
InterceptionResult<int> result,
CancellationToken cancellationToken = default)
{
ThrowIfJournalMutated(eventData.Context);
return base.SavingChangesAsync(eventData, result, cancellationToken);
}
private static void ThrowIfJournalMutated(DbContext? context)
{
if (context is null) return;
var illegal = context.ChangeTracker
.Entries<JournalEntry>()
.Where(e => e.State is EntityState.Modified or EntityState.Deleted)
.ToList();
if (illegal.Count > 0)
{
throw new InvalidOperationException(
$"Journal entries are immutable. Attempted to {illegal[0].State} " +
$"entry with Id={illegal[0].Entity.Id}. " +
"Audit journal entries may only be inserted, never modified or deleted.");
}
}
}Register the interceptor when configuring EF Core:
services.AddDbContext<JournalDbContext>((sp, options) =>
{
options.UseNpgsql(connectionString)
.AddInterceptors(sp.GetRequiredService<AppendOnlyJournalInterceptor>());
});
services.AddSingleton<AppendOnlyJournalInterceptor>();The Hash Chain: Tamper Detection Without a Blockchain
Each entry's Hash covers the previous entry's hash plus the current payload. If anyone modifies row 500 in the database directly, the hash stored in row 501 no longer matches, and the chain is broken from that point forward.
Validation is straightforward and can run as an offline audit job:
public sealed class HashChainValidator
{
private readonly ILogger<HashChainValidator> _logger;
public HashChainValidator(ILogger<HashChainValidator> logger)
{
_logger = logger;
}
public async Task<HashChainValidationResult> ValidateEntityChainAsync(
IAsyncEnumerable<JournalEntry> entriesInOrder,
CancellationToken ct)
{
string? expectedPreviousHash = null;
long entryCount = 0;
long? firstBreakAtSequence = null;
await foreach (var entry in entriesInOrder.WithCancellation(ct))
{
entryCount++;
// Check the link to the previous entry
if (entry.PreviousHash != expectedPreviousHash)
{
firstBreakAtSequence ??= entry.SequenceNumber;
_logger.LogWarning(
"Hash chain break detected. Entity={EntityType}/{EntityId} " +
"Sequence={Sequence}. Expected PreviousHash={Expected} but found {Actual}",
entry.EntityType, entry.EntityId, entry.SequenceNumber,
expectedPreviousHash, entry.PreviousHash);
}
// Recompute the hash to verify the entry itself was not tampered with
var recomputed = RecomputeHash(entry);
if (recomputed != entry.Hash)
{
firstBreakAtSequence ??= entry.SequenceNumber;
_logger.LogWarning(
"Hash mismatch for entry. Entity={EntityType}/{EntityId} " +
"Sequence={Sequence}. Stored={Stored} Computed={Computed}",
entry.EntityType, entry.EntityId, entry.SequenceNumber,
entry.Hash, recomputed);
}
expectedPreviousHash = entry.Hash;
}
return new HashChainValidationResult(
IsValid: firstBreakAtSequence is null,
EntriesChecked: entryCount,
FirstBreakAtSequence: firstBreakAtSequence);
}
private static string RecomputeHash(JournalEntry entry)
{
var input = $"{entry.PreviousHash ?? "GENESIS"}|{entry.EntityType}|{entry.EntityId}" +
$"|{entry.EventType}|{entry.SequenceNumber}|{entry.OccurredAt:O}|{entry.PayloadJson}";
var bytes = SHA256.HashData(Encoding.UTF8.GetBytes(input));
return Convert.ToHexString(bytes).ToLowerInvariant();
}
}
public sealed record HashChainValidationResult(
bool IsValid,
long EntriesChecked,
long? FirstBreakAtSequence);The validation job can run nightly against a random sample of entities or against partitions that are about to be archived. If the chain is broken, you have forensic proof of tampering.
Write Path: High-Frequency Batching with Channel
At 10,000 events per second, inserting individual rows is not viable. A single PostgreSQL INSERT round trip takes 1–5 ms. At 10K/s you need batching.
The design uses Channel<JournalEntry> as a bounded in-memory queue. Business logic writes to the channel without blocking. A background writer drains the channel and flushes to PostgreSQL using the COPY protocol via Npgsql, which is the fastest possible bulk insert path.
public interface IJournalWriter
{
ValueTask WriteAsync(JournalEntry entry, CancellationToken ct = default);
}
public sealed class JournalWriter : IJournalWriter, IAsyncDisposable
{
private readonly Channel<JournalEntry> _channel;
private readonly ILogger<JournalWriter> _logger;
private readonly Task _flushLoop;
private readonly CancellationTokenSource _cts = new();
// DLQ callback — called when the channel is full and entries are dropped
private readonly Action<JournalEntry>? _deadLetterCallback;
public JournalWriter(
ILogger<JournalWriter> logger,
IJournalRepository repository,
Action<JournalEntry>? deadLetterCallback = null)
{
_logger = logger;
_deadLetterCallback = deadLetterCallback;
// Bounded channel: 10,000 entries in memory max.
// If the writer falls behind, new writes drop rather than block business logic.
_channel = Channel.CreateBounded<JournalEntry>(new BoundedChannelOptions(10_000)
{
FullMode = BoundedChannelFullMode.DropWrite,
SingleWriter = false,
SingleReader = true
});
_flushLoop = RunFlushLoopAsync(repository, _cts.Token);
}
public ValueTask WriteAsync(JournalEntry entry, CancellationToken ct = default)
{
if (!_channel.Writer.TryWrite(entry))
{
// Channel is full — business logic must not be blocked.
// Route to DLQ so the event is not silently lost.
_logger.LogError(
"Journal channel full — dropping entry. EntityType={EntityType} EntityId={EntityId} " +
"EventType={EventType}. Configure alerting on this log event.",
entry.EntityType, entry.EntityId, entry.EventType);
_deadLetterCallback?.Invoke(entry);
}
return ValueTask.CompletedTask;
}
private async Task RunFlushLoopAsync(
IJournalRepository repository,
CancellationToken ct)
{
var batch = new List<JournalEntry>(capacity: 1_000);
while (!ct.IsCancellationRequested)
{
batch.Clear();
// Wait for at least one entry, then drain up to 1,000 or 100ms, whichever first
try
{
// Block until an entry is available
var first = await _channel.Reader.ReadAsync(ct);
batch.Add(first);
// Drain the rest without blocking
using var timeout = new CancellationTokenSource(TimeSpan.FromMilliseconds(100));
using var linked = CancellationTokenSource.CreateLinkedTokenSource(ct, timeout.Token);
while (batch.Count < 1_000)
{
if (!_channel.Reader.TryRead(out var next)) break;
batch.Add(next);
}
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
{
// Timeout expired — flush whatever we have
}
if (batch.Count > 0)
{
await FlushBatchAsync(repository, batch, ct);
}
}
// Drain remaining entries on shutdown
while (_channel.Reader.TryRead(out var remaining))
{
batch.Add(remaining);
}
if (batch.Count > 0)
{
await FlushBatchAsync(repository, batch, CancellationToken.None);
}
}
private async Task FlushBatchAsync(
IJournalRepository repository,
List<JournalEntry> batch,
CancellationToken ct)
{
try
{
await repository.BulkInsertAsync(batch, ct);
_logger.LogDebug("Flushed {Count} journal entries", batch.Count);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to flush journal batch of {Count} entries", batch.Count);
// In a real system: write to a local file DLQ, retry with backoff,
// or publish to a dead-letter topic. Do not silently drop.
}
}
public async ValueTask DisposeAsync()
{
_channel.Writer.Complete();
await _cts.CancelAsync();
await _flushLoop;
_cts.Dispose();
}
}The Npgsql COPY-based bulk insert:
public sealed class PostgresJournalRepository : IJournalRepository
{
private readonly string _connectionString;
public PostgresJournalRepository(string connectionString)
{
_connectionString = connectionString;
}
public async Task BulkInsertAsync(IReadOnlyList<JournalEntry> entries, CancellationToken ct)
{
await using var conn = new NpgsqlConnection(_connectionString);
await conn.OpenAsync(ct);
// COPY is 10-20x faster than batched INSERTs for large volumes
await using var writer = await conn.BeginBinaryImportAsync(
"COPY journal_entries " +
"(tenant_id, entity_type, entity_id, event_type, actor_id, actor_kind, " +
"payload_json, source_ip, correlation_id, occurred_at, sequence_number, " +
"previous_hash, hash) " +
"FROM STDIN (FORMAT BINARY)",
ct);
foreach (var e in entries)
{
await writer.StartRowAsync(ct);
await writer.WriteAsync(e.TenantId, NpgsqlDbType.Uuid, ct);
await writer.WriteAsync(e.EntityType, NpgsqlDbType.Varchar, ct);
await writer.WriteAsync(e.EntityId, NpgsqlDbType.Uuid, ct);
await writer.WriteAsync(e.EventType, NpgsqlDbType.Varchar, ct);
await writer.WriteAsync(e.ActorId, NpgsqlDbType.Uuid, ct);
await writer.WriteAsync(e.ActorKind, NpgsqlDbType.Varchar, ct);
await writer.WriteAsync(e.PayloadJson, NpgsqlDbType.Jsonb, ct);
await writer.WriteAsync(e.SourceIp ?? (object)DBNull.Value, NpgsqlDbType.Varchar, ct);
await writer.WriteAsync(e.CorrelationId ?? (object)DBNull.Value, NpgsqlDbType.Varchar, ct);
await writer.WriteAsync(e.OccurredAt, NpgsqlDbType.TimestampTz, ct);
await writer.WriteAsync(e.SequenceNumber, NpgsqlDbType.Bigint, ct);
await writer.WriteAsync(e.PreviousHash ?? (object)DBNull.Value, NpgsqlDbType.Varchar, ct);
await writer.WriteAsync(e.Hash, NpgsqlDbType.Varchar, ct);
}
await writer.CompleteAsync(ct);
}
}Journal writes must never block business logic
This is the critical operational constraint. The JournalWriter.WriteAsync call is fire-and-forget at the channel level. Business logic registers the journal event and moves on. If the channel is full — because the flush loop is behind — the entry is routed to a dead-letter callback rather than blocking the HTTP request.
Configure an alert on the LogError in the full-channel path. If that fires in production, your flush loop is too slow and you need to scale the writer or increase the batch size.
The DLQ callback can write to a local file, a Redis list, or a message broker topic. A reasonable minimum is:
Action<JournalEntry> deadLetter = entry =>
{
// Write to a local overflow file so entries survive a restart
var line = JsonSerializer.Serialize(entry);
File.AppendAllText("/var/log/journal-dlq.jsonl", line + "\n");
};A separate recovery job replays the DLQ into the journal once the flush loop catches up.
Table Partitioning for Retention
A journal table accumulates data indefinitely. At 10K events/second that is 864 million rows per day. You cannot query that without partitioning.
CREATE TABLE journal_entries (
id BIGINT GENERATED ALWAYS AS IDENTITY,
tenant_id UUID NOT NULL,
entity_type VARCHAR(100) NOT NULL,
entity_id UUID NOT NULL,
event_type VARCHAR(100) NOT NULL,
actor_id UUID NOT NULL,
actor_kind VARCHAR(50) NOT NULL,
payload_json JSONB NOT NULL,
source_ip VARCHAR(45),
correlation_id VARCHAR(100),
occurred_at TIMESTAMPTZ NOT NULL,
sequence_number BIGINT NOT NULL,
previous_hash VARCHAR(64),
hash VARCHAR(64) NOT NULL
) PARTITION BY RANGE (occurred_at);
-- Create partitions monthly
CREATE TABLE journal_entries_2025_01
PARTITION OF journal_entries
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
CREATE TABLE journal_entries_2025_02
PARTITION OF journal_entries
FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');PostgreSQL's partition pruning means a query with WHERE occurred_at BETWEEN '2025-01-01' AND '2025-01-31' only scans the January partition. A full-table sequential scan never happens for time-bounded queries.
The PartitionMaintenanceWorker creates next month's partition and drops or archives old ones:
public sealed class PartitionMaintenanceWorker : BackgroundService
{
private readonly ILogger<PartitionMaintenanceWorker> _logger;
private readonly NpgsqlDataSource _dataSource;
private readonly RetentionConfiguration _retention;
private readonly IBlobArchiver _archiver;
public PartitionMaintenanceWorker(
ILogger<PartitionMaintenanceWorker> logger,
NpgsqlDataSource dataSource,
RetentionConfiguration retention,
IBlobArchiver archiver)
{
_logger = logger;
_dataSource = dataSource;
_retention = retention;
_archiver = archiver;
}
protected override async Task ExecuteAsync(CancellationToken ct)
{
// Run once a day at 02:00 UTC
while (!ct.IsCancellationRequested)
{
var now = DateTime.UtcNow;
var nextRun = now.Date.AddDays(1).AddHours(2);
await Task.Delay(nextRun - now, ct);
await CreateNextMonthPartitionAsync(ct);
await ArchiveAndDropExpiredPartitionsAsync(ct);
}
}
private async Task CreateNextMonthPartitionAsync(CancellationToken ct)
{
// Always keep two months ahead
var target = new DateTime(DateTime.UtcNow.Year, DateTime.UtcNow.Month, 1).AddMonths(2);
var partitionName = $"journal_entries_{target:yyyy_MM}";
var from = target.ToString("yyyy-MM-dd");
var to = target.AddMonths(1).ToString("yyyy-MM-dd");
await using var conn = await _dataSource.OpenConnectionAsync(ct);
await using var cmd = conn.CreateCommand();
cmd.CommandText = $"""
CREATE TABLE IF NOT EXISTS {partitionName}
PARTITION OF journal_entries
FOR VALUES FROM ('{from}') TO ('{to}');
""";
await cmd.ExecuteNonQueryAsync(ct);
_logger.LogInformation("Ensured partition {Name} exists", partitionName);
}
private async Task ArchiveAndDropExpiredPartitionsAsync(CancellationToken ct)
{
// Retention: financial entries kept 7 years, default 1 year
var cutoff = DateTime.UtcNow.AddYears(-_retention.DefaultRetentionYears);
// List all partitions older than the cutoff
await using var conn = await _dataSource.OpenConnectionAsync(ct);
await using var listCmd = conn.CreateCommand();
listCmd.CommandText = """
SELECT child.relname
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
WHERE parent.relname = 'journal_entries'
ORDER BY child.relname;
""";
var partitions = new List<string>();
await using var reader = await listCmd.ExecuteReaderAsync(ct);
while (await reader.ReadAsync(ct))
{
partitions.Add(reader.GetString(0));
}
foreach (var partition in partitions)
{
// Parse partition date from name, e.g. journal_entries_2023_01
if (!TryParsePartitionDate(partition, out var partitionMonth)) continue;
if (partitionMonth >= cutoff) continue;
_logger.LogInformation("Archiving expired partition {Partition}", partition);
await _archiver.ArchivePartitionAsync(partition, ct);
await using var dropCmd = conn.CreateCommand();
dropCmd.CommandText = $"DROP TABLE {partition};";
await dropCmd.ExecuteNonQueryAsync(ct);
_logger.LogInformation("Dropped expired partition {Partition}", partition);
}
}
private static bool TryParsePartitionDate(string partitionName, out DateTime date)
{
// journal_entries_2023_01 → 2023-01-01
var parts = partitionName.Split('_');
if (parts.Length >= 4
&& int.TryParse(parts[^2], out var year)
&& int.TryParse(parts[^1], out var month))
{
date = new DateTime(year, month, 1, 0, 0, 0, DateTimeKind.Utc);
return true;
}
date = default;
return false;
}
}Point-in-Time Queries: What Was the State at a Given Timestamp?
"What was the state of Order abc-123 at 2025-03-15 14:32:00 UTC?" is a question no ModifiedAt column can answer. The journal can.
The naive approach replays all events from the beginning of time up to the target timestamp. For entities with thousands of events this is slow. The solution is a snapshot table: periodically checkpoint the entity's state at a point in time, then replay only forward from the nearest snapshot.
public sealed class PointInTimeReconstructor<TEntity>
where TEntity : class, IReconstructable<TEntity>
{
private readonly IJournalQueryRepository _journal;
private readonly ISnapshotRepository<TEntity> _snapshots;
public PointInTimeReconstructor(
IJournalQueryRepository journal,
ISnapshotRepository<TEntity> snapshots)
{
_journal = journal;
_snapshots = snapshots;
}
public async Task<TEntity?> ReconstructAtAsync(
string entityType,
Guid entityId,
DateTime pointInTime,
CancellationToken ct)
{
// 1. Find the most recent snapshot at or before the target time
var snapshot = await _snapshots.GetLatestBeforeAsync(entityType, entityId, pointInTime, ct);
DateTime replayFrom;
TEntity? state;
if (snapshot is not null)
{
state = snapshot.State;
replayFrom = snapshot.SnapshotAt;
}
else
{
// No snapshot — replay from the very beginning
state = null;
replayFrom = DateTime.MinValue;
}
// 2. Replay journal entries from the snapshot forward to pointInTime
var events = _journal.GetEventsAsync(
entityType, entityId,
from: replayFrom,
to: pointInTime,
ct: ct);
await foreach (var entry in events.WithCancellation(ct))
{
state = TEntity.Apply(state, entry);
}
return state;
}
}
// The entity type implements this interface to define how events are applied
public interface IReconstructable<TSelf> where TSelf : class
{
static abstract TSelf? Apply(TSelf? current, JournalEntry entry);
}An example reconstruction for an Order:
public sealed class Order : IReconstructable<Order>
{
public Guid Id { get; private set; }
public string Status { get; private set; } = string.Empty;
public decimal Total { get; private set; }
public List<string> Notes { get; private set; } = [];
public static Order? Apply(Order? current, JournalEntry entry)
{
return entry.EventType switch
{
"OrderPlaced" => ApplyOrderPlaced(entry),
"OrderConfirmed" => ApplyOrderConfirmed(current, entry),
"OrderCancelled" => ApplyOrderCancelled(current, entry),
"NoteAdded" => ApplyNoteAdded(current, entry),
_ => current // unknown events are skipped, not thrown
};
}
private static Order ApplyOrderPlaced(JournalEntry entry)
{
var payload = JsonSerializer.Deserialize<OrderPlacedPayload>(entry.PayloadJson)!;
return new Order
{
Id = entry.EntityId,
Status = "Placed",
Total = payload.Total
};
}
private static Order? ApplyOrderConfirmed(Order? current, JournalEntry entry)
{
if (current is null) return null;
return current with { Status = "Confirmed" };
}
private static Order? ApplyOrderCancelled(Order? current, JournalEntry entry)
{
if (current is null) return null;
return current with { Status = "Cancelled" };
}
private static Order? ApplyNoteAdded(Order? current, JournalEntry entry)
{
if (current is null) return null;
var payload = JsonSerializer.Deserialize<NoteAddedPayload>(entry.PayloadJson)!;
var notes = new List<string>(current.Notes) { payload.Note };
return current with { Notes = notes };
}
}The snapshot table is written by a background job every N events or every M minutes, whichever comes first. The snapshot caps the worst-case replay window.
Retention Policies Per Entity Type
Not all data has the same retention requirement. Financial transactions stay for 7 years. Session events stay for 90 days. Temporary diagnostic events stay for 30 days.
The retention policy is applied at the partition level, not the row level. Rows are never deleted individually. Entire monthly partitions are dropped when they fall outside all retention windows.
public sealed class RetentionConfiguration
{
public Dictionary<string, TimeSpan> PolicyByEntityType { get; set; } = new()
{
["Invoice"] = TimeSpan.FromDays(365 * 7),
["Order"] = TimeSpan.FromDays(365 * 7),
["Payment"] = TimeSpan.FromDays(365 * 7),
["UserSession"] = TimeSpan.FromDays(90),
["DiagEvent"] = TimeSpan.FromDays(30),
};
public TimeSpan DefaultRetentionYears
=> TimeSpan.FromDays(365); // 1 year for anything not explicitly configured
public TimeSpan GetRetention(string entityType)
=> PolicyByEntityType.TryGetValue(entityType, out var policy)
? policy
: DefaultRetentionYears;
// A partition can only be dropped when ALL entity types it might contain
// have exceeded their retention. In practice, the partition age is compared
// against the minimum retention for safety.
public TimeSpan SafeDropAge
=> PolicyByEntityType.Values
.Append(DefaultRetentionYears)
.Min();
}The partition drop only happens when the partition's entire time range is beyond the safe drop age. A January 2023 partition is safe to drop once the shortest retention window (30 days for DiagEvent) has passed — but you still need to archive the financial rows first. The archiver extracts financial rows to Azure Blob Storage in JSONL format before the partition is dropped.
Event Sourcing vs Audit Log: When to Use Each
These two patterns look similar but solve different problems.
An audit log records changes to state that is managed somewhere else. The state lives in the orders table. The audit log records who changed it and what they changed. The audit log is a secondary record. The primary source of truth is the table.
Event sourcing makes events the primary source of truth. The orders table does not exist. You reconstruct order state by replaying events. The event store is the only place where order data lives.
The choice has significant consequences:
Audit log is appropriate when you have an existing relational data model you cannot or will not replace, when you need forensic history but not full state reconstruction for every read, and when your team is not ready for the operational complexity of a pure event-sourced system.
Event sourcing is appropriate when you need temporal queries as a first-class feature, when the domain models for reading and writing are genuinely different (CQRS), and when you need to rebuild read models from scratch because requirements change.
The design in this article is an audit log, not event sourcing. The order state lives in the orders table. The journal records every mutation to that table. You can reconstruct past state by replaying the journal, but that is an audit capability, not the primary data access pattern.
If you are building a greenfield system and the domain is inherently event-driven (e.g. financial transactions, healthcare FHIR events, supply chain movements), consider event sourcing. If you are adding audit capability to an existing system, build an append-only journal.
Wiring It Together
// Program.cs
builder.Services.AddSingleton<RetentionConfiguration>();
builder.Services.AddSingleton<IBlobArchiver, AzureBlobArchiver>();
builder.Services.AddSingleton<IJournalRepository, PostgresJournalRepository>();
builder.Services.AddSingleton<IJournalWriter, JournalWriter>(sp =>
{
var logger = sp.GetRequiredService<ILogger<JournalWriter>>();
var repo = sp.GetRequiredService<IJournalRepository>();
var metricsLogger = sp.GetRequiredService<ILogger<JournalWriter>>();
return new JournalWriter(logger, repo, deadLetter: entry =>
{
// DLQ: write to local disk, pick up by recovery worker
var json = JsonSerializer.Serialize(entry);
File.AppendAllText("/var/log/journal-dlq.jsonl", json + "\n");
metricsLogger.LogError("Journal DLQ: EntityType={T} EntityId={Id}", entry.EntityType, entry.EntityId);
});
});
builder.Services.AddHostedService<PartitionMaintenanceWorker>();
builder.Services.AddScoped(typeof(PointInTimeReconstructor<>));
builder.Services.AddSingleton<HashChainValidator>();Usage in a command handler:
public sealed class CancelOrderHandler : IRequestHandler<CancelOrderCommand>
{
private readonly AppDbContext _db;
private readonly IJournalWriter _journal;
private readonly IHttpContextAccessor _http;
public CancelOrderHandler(AppDbContext db, IJournalWriter journal, IHttpContextAccessor http)
{
_db = db;
_journal = journal;
_http = http;
}
public async Task Handle(CancelOrderCommand request, CancellationToken ct)
{
var order = await _db.Orders.FindAsync([request.OrderId], ct)
?? throw new OrderNotFoundException(request.OrderId);
var previousState = JsonSerializer.Serialize(order);
order.Cancel(request.Reason);
await _db.SaveChangesAsync(ct);
// Journal write is fire-and-forget at the channel level —
// it will not block or fail the HTTP response
var entry = JournalEntry.Create(
tenantId: request.TenantId,
entityType: "Order",
entityId: order.Id,
eventType: "OrderCancelled",
actorId: request.ActorId,
actorKind: "User",
payloadJson: JsonSerializer.Serialize(new
{
Reason = request.Reason,
PreviousStatus = order.Status,
PreviousState = previousState
}),
sourceIp: _http.HttpContext?.Connection.RemoteIpAddress?.ToString(),
correlationId: _http.HttpContext?.TraceIdentifier,
occurredAt: DateTime.UtcNow,
sequenceNumber: order.JournalSequence,
previousHash: order.LastJournalHash);
await _journal.WriteAsync(entry, ct);
}
}Summary
A production audit journal is not a ModifiedAt column. It is an append-only ledger with forensic integrity enforced at multiple layers: application interceptors that block mutations, hash chains that detect tampering, and PostgreSQL partitioning that makes retention a matter of dropping a table rather than running long-running deletes.
The write path uses Channel<T> to decouple business logic from the flush loop, and Npgsql's COPY protocol to sustain high insert throughput. Point-in-time queries combine snapshots with forward replay so that "what was the state of X at time T" is always answerable without a full table scan.
The key decisions are: use sequential IDs for the journal, never allow updates or deletes at any layer, partition by month for retention, and never let the journal write block a business transaction.
Enjoyed this article?
Explore the System Design learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.