Redis Patterns in .NET — Pub/Sub, Streams, Sorted Sets, and Lua Scripts
Advanced Redis patterns in .NET with StackExchange.Redis: Pub/Sub messaging, Streams for event logs, Sorted Sets for leaderboards, Lua scripts for atomicity, and distributed lock patterns.
Redis Patterns in .NET — Pub/Sub, Streams, Sorted Sets, and Lua Scripts
Redis is far more than a cache. It's a data structure server with built-in support for pub/sub messaging, event streams, sorted sets for rankings, and atomic Lua scripts — all with sub-millisecond latency.
Setup — StackExchange.Redis
// Program.cs
builder.Services.AddStackExchangeRedisCache(opts =>
opts.Configuration = builder.Configuration.GetConnectionString("Redis"));
// Full client for advanced operations (pub/sub, streams, Lua scripts)
builder.Services.AddSingleton<IConnectionMultiplexer>(_ =>
ConnectionMultiplexer.Connect(new ConfigurationOptions
{
EndPoints = { "redis:6379" },
Password = "your-redis-password",
Ssl = true,
AbortOnConnectFail = false,
ConnectRetry = 3,
ReconnectRetryPolicy = new LinearRetry(5000),
}));
builder.Services.AddScoped<IDatabase>(sp =>
sp.GetRequiredService<IConnectionMultiplexer>().GetDatabase());Pattern 1: Cache-Aside with Type Safety
public class ProductCache(IDatabase redis, IProductRepository db)
{
private static readonly TimeSpan Ttl = TimeSpan.FromMinutes(15);
public async Task<Product?> GetAsync(int productId, CancellationToken ct)
{
var key = CacheKey(productId);
var value = await redis.StringGetAsync(key);
if (value.HasValue)
return JsonSerializer.Deserialize<Product>(value!);
// Cache miss — load from database
var product = await db.GetByIdAsync(productId, ct);
if (product is null) return null;
// Store with TTL — fire and forget (don't block on cache write)
await redis.StringSetAsync(key,
JsonSerializer.Serialize(product),
Ttl,
flags: CommandFlags.FireAndForget);
return product;
}
public async Task InvalidateAsync(int productId)
=> await redis.KeyDeleteAsync(CacheKey(productId));
private static RedisKey CacheKey(int productId) => $"product:{productId}";
}Pattern 2: Pub/Sub Messaging
// Publisher — broadcasts messages to all subscribers
public class OrderEventPublisher(IConnectionMultiplexer redis)
{
private readonly ISubscriber _subscriber = redis.GetSubscriber();
public async Task PublishOrderPlacedAsync(int orderId, int customerId)
{
var payload = JsonSerializer.Serialize(new OrderPlacedMessage(orderId, customerId));
var recipientsCount = await _subscriber.PublishAsync(
RedisChannel.Literal("orders:placed"),
payload);
Console.WriteLine($"Published to {recipientsCount} subscribers");
}
}
// Subscriber — receives messages
public class OrderEventSubscriber(IConnectionMultiplexer redis, ILogger<OrderEventSubscriber> logger)
: IHostedService
{
private readonly ISubscriber _subscriber = redis.GetSubscriber();
public async Task StartAsync(CancellationToken ct)
{
await _subscriber.SubscribeAsync(
RedisChannel.Pattern("orders:*"), // pattern subscribe — all order events
(channel, message) =>
{
logger.LogInformation("Received on {Channel}: {Message}", channel, message);
// Parse and handle
var orderEvent = JsonSerializer.Deserialize<OrderPlacedMessage>(message!);
// ... handle
});
}
public async Task StopAsync(CancellationToken ct)
=> await _subscriber.UnsubscribeAllAsync();
}
public record OrderPlacedMessage(int OrderId, int CustomerId);Pattern 3: Redis Streams (Persistent, Consumer Groups)
// Streams are persistent pub/sub — messages survive subscriber downtime
// Consumer groups enable competing consumers with acknowledgement
public class OrderEventStream(IDatabase redis)
{
private const string StreamKey = "orders:events";
private const string GroupName = "order-processors";
private const string ConsumerName = "processor-1";
// Producer — append to stream
public async Task AppendAsync(string eventType, int orderId)
{
var messageId = await redis.StreamAddAsync(StreamKey,
new NameValueEntry[]
{
new("event_type", eventType),
new("order_id", orderId.ToString()),
new("timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString()),
});
Console.WriteLine($"Appended {eventType} for order {orderId}: {messageId}");
}
// Consumer — read and acknowledge
public async Task ConsumeAsync(CancellationToken ct)
{
// Create consumer group (idempotent)
try
{
await redis.StreamCreateConsumerGroupAsync(StreamKey, GroupName, StreamPosition.NewMessages);
}
catch (RedisException ex) when (ex.Message.Contains("BUSYGROUP"))
{
// Group already exists — normal on restart
}
while (!ct.IsCancellationRequested)
{
var messages = await redis.StreamReadGroupAsync(
StreamKey,
GroupName,
ConsumerName,
count: 10,
noAck: false);
foreach (var message in messages)
{
try
{
var eventType = (string)message["event_type"]!;
var orderId = int.Parse((string)message["order_id"]!);
Console.WriteLine($"Processing {eventType} for order {orderId}");
// ... process
// Acknowledge — removes from pending entries
await redis.StreamAcknowledgeAsync(StreamKey, GroupName, message.Id);
}
catch (Exception ex)
{
Console.WriteLine($"Failed to process {message.Id}: {ex.Message}");
// Message remains in pending — will be redelivered
}
}
if (!messages.Any())
await Task.Delay(100, ct); // poll interval
}
}
// Reclaim messages pending for too long (consumer crashed)
public async Task ClaimStalePendingAsync(TimeSpan olderThan)
{
var pending = await redis.StreamPendingMessagesAsync(StreamKey, GroupName, 50, RedisValue.Null);
var staleIds = pending
.Where(p => p.IdleTimeInMilliseconds > (long)olderThan.TotalMilliseconds)
.Select(p => p.MessageId)
.ToArray();
if (staleIds.Length > 0)
await redis.StreamClaimAsync(StreamKey, GroupName, ConsumerName,
(long)olderThan.TotalMilliseconds, staleIds);
}
}Pattern 4: Sorted Sets — Leaderboards and Rankings
public class Leaderboard(IDatabase redis)
{
private const string Key = "game:leaderboard";
// Add or update a player's score
public async Task SetScoreAsync(string playerId, double score)
=> await redis.SortedSetAddAsync(Key, playerId, score);
// Increment score (atomic — no race condition)
public async Task<double> AddToScoreAsync(string playerId, double increment)
=> await redis.SortedSetIncrementAsync(Key, playerId, increment);
// Top N players (highest score first)
public async Task<List<LeaderboardEntry>> GetTopNAsync(int count)
{
var entries = await redis.SortedSetRangeByRankWithScoresAsync(
Key, 0, count - 1,
order: Order.Descending);
return entries.Select((e, i) => new LeaderboardEntry(
Rank: i + 1,
PlayerId: e.Element!,
Score: e.Score)).ToList();
}
// A player's rank (0-based, ascending)
public async Task<long?> GetRankAsync(string playerId)
=> await redis.SortedSetRankAsync(Key, playerId, Order.Descending);
// Players within a score range (e.g. players eligible for a prize)
public async Task<List<string>> GetByScoreRangeAsync(double minScore, double maxScore)
{
var entries = await redis.SortedSetRangeByScoreAsync(
Key, minScore, maxScore,
order: Order.Descending);
return entries.Select(e => (string)e!).ToList();
}
}
public record LeaderboardEntry(int Rank, string PlayerId, double Score);Pattern 5: Lua Scripts for Atomicity
// Lua scripts run atomically — no race conditions, no MULTI/EXEC needed
public class AtomicOperations(IDatabase redis)
{
// Rate limiter using token bucket — atomic check-and-decrement
private static readonly LuaScript RateLimitScript = LuaScript.Prepare("""
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill = tonumber(ARGV[2])
local interval = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
local data = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(data[1]) or capacity
local last = tonumber(data[2]) or now
-- Refill tokens based on elapsed time
local elapsed = now - last
local refilled = math.floor(elapsed / interval) * refill
tokens = math.min(capacity, tokens + refilled)
if tokens < 1 then
return 0 -- rate limit hit
end
tokens = tokens - 1
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, interval * 2)
return 1 -- allowed
""");
public async Task<bool> IsAllowedAsync(string clientId, int capacity, int refillRate, int intervalMs)
{
var result = await redis.ScriptEvaluateAsync(RateLimitScript, new
{
key = (RedisKey)$"ratelimit:{clientId}",
},
new RedisValue[]
{
capacity, refillRate, intervalMs,
DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
});
return (int)result == 1;
}
// Distributed lock — SET NX PX
public async Task<bool> TryAcquireLockAsync(string resource, string lockValue, TimeSpan timeout)
=> await redis.StringSetAsync(
$"lock:{resource}",
lockValue,
timeout,
When.NotExists); // SET NX — atomic
// Safe release — only delete if we own the lock (Lua)
private static readonly LuaScript ReleaseLockScript = LuaScript.Prepare("""
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
""");
public async Task<bool> ReleaseLockAsync(string resource, string lockValue)
{
var result = await redis.ScriptEvaluateAsync(ReleaseLockScript, new
{
key = (RedisKey)$"lock:{resource}",
},
new RedisValue[] { lockValue });
return (int)result == 1;
}
}Pattern 6: Session Storage
// Store session data with sliding expiry
builder.Services.AddSession(opts =>
{
opts.IdleTimeout = TimeSpan.FromMinutes(30);
opts.Cookie.HttpOnly = true;
opts.Cookie.IsEssential = true;
opts.Cookie.SecurePolicy = CookieSecurePolicy.Always;
});
// Use Redis as the session store (via IDistributedCache)
builder.Services.AddStackExchangeRedisCache(opts =>
opts.Configuration = connectionString);
// In a controller
public class CartController(ISession session) : ControllerBase
{
[HttpPost("add-to-cart")]
public IActionResult AddToCart(int productId, int quantity)
{
var cart = HttpContext.Session.Get<List<CartItem>>("cart") ?? [];
cart.Add(new CartItem(productId, quantity));
HttpContext.Session.Set("cart", cart);
return Ok();
}
}Pattern 7: Pipeline Batching
// Send multiple commands in one network round trip — big performance win
public async Task<Dictionary<int, Product?>> GetManyAsync(IEnumerable<int> productIds)
{
var ids = productIds.ToList();
var batch = redis.CreateBatch();
var tasks = ids.ToDictionary(id => id, id =>
batch.StringGetAsync($"product:{id}"));
batch.Execute(); // sends all commands at once
await Task.WhenAll(tasks.Values);
return tasks.ToDictionary(
kvp => kvp.Key,
kvp => kvp.Value.Result.HasValue
? JsonSerializer.Deserialize<Product>(kvp.Value.Result!)
: null);
}Interview Answer
"Redis patterns in .NET beyond basic caching: Pub/Sub delivers messages to all current subscribers in real-time (fire-and-forget — messages are lost if no subscriber is connected). Streams solve that: each message is persisted with an ID, consumer groups enable competing consumers with acknowledgement — PEL (pending entries list) tracks unacked messages so they can be redelivered if a consumer crashes. Sorted Sets power leaderboards: SortedSetIncrementAsync atomically updates a player's score, SortedSetRangeByRankWithScoresAsync returns top N in O(log N + count), and GetRankAsync returns a player's position in O(log N). Lua scripts run atomically across multiple keys — the canonical use is the token bucket rate limiter and the distributed lock safe-release (check then delete without a race condition). For distributed locks: StringSetAsync with When.NotExists + TTL acquires the lock atomically; a Lua script for release ensures only the lock holder can delete it. Pipeline batching: CreateBatch + batch.Execute() sends all commands in one TCP round trip — critical for bulk reads."
Enjoyed this article?
Explore the Backend Systems learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.