Learnixo
Back to blog
Backend Systemsadvanced

Redis Patterns in .NET — Pub/Sub, Streams, Sorted Sets, and Lua Scripts

Advanced Redis patterns in .NET with StackExchange.Redis: Pub/Sub messaging, Streams for event logs, Sorted Sets for leaderboards, Lua scripts for atomicity, and distributed lock patterns.

Asma Hafeez KhanMay 25, 20267 min read
.NETC#RedisStackExchange.Rediscachingpub/subdistributed systems
Share:𝕏

Redis Patterns in .NET — Pub/Sub, Streams, Sorted Sets, and Lua Scripts

Redis is far more than a cache. It's a data structure server with built-in support for pub/sub messaging, event streams, sorted sets for rankings, and atomic Lua scripts — all with sub-millisecond latency.


Setup — StackExchange.Redis

C#
// Program.cs
builder.Services.AddStackExchangeRedisCache(opts =>
    opts.Configuration = builder.Configuration.GetConnectionString("Redis"));

// Full client for advanced operations (pub/sub, streams, Lua scripts)
builder.Services.AddSingleton<IConnectionMultiplexer>(_ =>
    ConnectionMultiplexer.Connect(new ConfigurationOptions
    {
        EndPoints   = { "redis:6379" },
        Password    = "your-redis-password",
        Ssl         = true,
        AbortOnConnectFail = false,
        ConnectRetry       = 3,
        ReconnectRetryPolicy = new LinearRetry(5000),
    }));

builder.Services.AddScoped<IDatabase>(sp =>
    sp.GetRequiredService<IConnectionMultiplexer>().GetDatabase());

Pattern 1: Cache-Aside with Type Safety

C#
public class ProductCache(IDatabase redis, IProductRepository db)
{
    private static readonly TimeSpan Ttl = TimeSpan.FromMinutes(15);

    public async Task<Product?> GetAsync(int productId, CancellationToken ct)
    {
        var key   = CacheKey(productId);
        var value = await redis.StringGetAsync(key);

        if (value.HasValue)
            return JsonSerializer.Deserialize<Product>(value!);

        // Cache miss — load from database
        var product = await db.GetByIdAsync(productId, ct);
        if (product is null) return null;

        // Store with TTL — fire and forget (don't block on cache write)
        await redis.StringSetAsync(key,
            JsonSerializer.Serialize(product),
            Ttl,
            flags: CommandFlags.FireAndForget);

        return product;
    }

    public async Task InvalidateAsync(int productId)
        => await redis.KeyDeleteAsync(CacheKey(productId));

    private static RedisKey CacheKey(int productId) => $"product:{productId}";
}

Pattern 2: Pub/Sub Messaging

C#
// Publisher — broadcasts messages to all subscribers
public class OrderEventPublisher(IConnectionMultiplexer redis)
{
    private readonly ISubscriber _subscriber = redis.GetSubscriber();

    public async Task PublishOrderPlacedAsync(int orderId, int customerId)
    {
        var payload = JsonSerializer.Serialize(new OrderPlacedMessage(orderId, customerId));
        var recipientsCount = await _subscriber.PublishAsync(
            RedisChannel.Literal("orders:placed"),
            payload);

        Console.WriteLine($"Published to {recipientsCount} subscribers");
    }
}

// Subscriber — receives messages
public class OrderEventSubscriber(IConnectionMultiplexer redis, ILogger<OrderEventSubscriber> logger)
    : IHostedService
{
    private readonly ISubscriber _subscriber = redis.GetSubscriber();

    public async Task StartAsync(CancellationToken ct)
    {
        await _subscriber.SubscribeAsync(
            RedisChannel.Pattern("orders:*"),   // pattern subscribe — all order events
            (channel, message) =>
            {
                logger.LogInformation("Received on {Channel}: {Message}", channel, message);

                // Parse and handle
                var orderEvent = JsonSerializer.Deserialize<OrderPlacedMessage>(message!);
                // ... handle
            });
    }

    public async Task StopAsync(CancellationToken ct)
        => await _subscriber.UnsubscribeAllAsync();
}

public record OrderPlacedMessage(int OrderId, int CustomerId);

Pattern 3: Redis Streams (Persistent, Consumer Groups)

C#
// Streams are persistent pub/sub — messages survive subscriber downtime
// Consumer groups enable competing consumers with acknowledgement

public class OrderEventStream(IDatabase redis)
{
    private const string StreamKey = "orders:events";
    private const string GroupName = "order-processors";
    private const string ConsumerName = "processor-1";

    // Producer — append to stream
    public async Task AppendAsync(string eventType, int orderId)
    {
        var messageId = await redis.StreamAddAsync(StreamKey,
            new NameValueEntry[]
            {
                new("event_type", eventType),
                new("order_id",   orderId.ToString()),
                new("timestamp",  DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString()),
            });

        Console.WriteLine($"Appended {eventType} for order {orderId}: {messageId}");
    }

    // Consumer — read and acknowledge
    public async Task ConsumeAsync(CancellationToken ct)
    {
        // Create consumer group (idempotent)
        try
        {
            await redis.StreamCreateConsumerGroupAsync(StreamKey, GroupName, StreamPosition.NewMessages);
        }
        catch (RedisException ex) when (ex.Message.Contains("BUSYGROUP"))
        {
            // Group already exists — normal on restart
        }

        while (!ct.IsCancellationRequested)
        {
            var messages = await redis.StreamReadGroupAsync(
                StreamKey,
                GroupName,
                ConsumerName,
                count: 10,
                noAck: false);

            foreach (var message in messages)
            {
                try
                {
                    var eventType = (string)message["event_type"]!;
                    var orderId   = int.Parse((string)message["order_id"]!);

                    Console.WriteLine($"Processing {eventType} for order {orderId}");
                    // ... process

                    // Acknowledge — removes from pending entries
                    await redis.StreamAcknowledgeAsync(StreamKey, GroupName, message.Id);
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Failed to process {message.Id}: {ex.Message}");
                    // Message remains in pending — will be redelivered
                }
            }

            if (!messages.Any())
                await Task.Delay(100, ct);   // poll interval
        }
    }

    // Reclaim messages pending for too long (consumer crashed)
    public async Task ClaimStalePendingAsync(TimeSpan olderThan)
    {
        var pending = await redis.StreamPendingMessagesAsync(StreamKey, GroupName, 50, RedisValue.Null);

        var staleIds = pending
            .Where(p => p.IdleTimeInMilliseconds > (long)olderThan.TotalMilliseconds)
            .Select(p => p.MessageId)
            .ToArray();

        if (staleIds.Length > 0)
            await redis.StreamClaimAsync(StreamKey, GroupName, ConsumerName,
                (long)olderThan.TotalMilliseconds, staleIds);
    }
}

Pattern 4: Sorted Sets — Leaderboards and Rankings

C#
public class Leaderboard(IDatabase redis)
{
    private const string Key = "game:leaderboard";

    // Add or update a player's score
    public async Task SetScoreAsync(string playerId, double score)
        => await redis.SortedSetAddAsync(Key, playerId, score);

    // Increment score (atomic — no race condition)
    public async Task<double> AddToScoreAsync(string playerId, double increment)
        => await redis.SortedSetIncrementAsync(Key, playerId, increment);

    // Top N players (highest score first)
    public async Task<List<LeaderboardEntry>> GetTopNAsync(int count)
    {
        var entries = await redis.SortedSetRangeByRankWithScoresAsync(
            Key, 0, count - 1,
            order: Order.Descending);

        return entries.Select((e, i) => new LeaderboardEntry(
            Rank:     i + 1,
            PlayerId: e.Element!,
            Score:    e.Score)).ToList();
    }

    // A player's rank (0-based, ascending)
    public async Task<long?> GetRankAsync(string playerId)
        => await redis.SortedSetRankAsync(Key, playerId, Order.Descending);

    // Players within a score range (e.g. players eligible for a prize)
    public async Task<List<string>> GetByScoreRangeAsync(double minScore, double maxScore)
    {
        var entries = await redis.SortedSetRangeByScoreAsync(
            Key, minScore, maxScore,
            order: Order.Descending);

        return entries.Select(e => (string)e!).ToList();
    }
}

public record LeaderboardEntry(int Rank, string PlayerId, double Score);

Pattern 5: Lua Scripts for Atomicity

C#
// Lua scripts run atomically — no race conditions, no MULTI/EXEC needed
public class AtomicOperations(IDatabase redis)
{
    // Rate limiter using token bucket — atomic check-and-decrement
    private static readonly LuaScript RateLimitScript = LuaScript.Prepare("""
        local key       = KEYS[1]
        local capacity  = tonumber(ARGV[1])
        local refill    = tonumber(ARGV[2])
        local interval  = tonumber(ARGV[3])
        local now       = tonumber(ARGV[4])

        local data      = redis.call('HMGET', key, 'tokens', 'last_refill')
        local tokens    = tonumber(data[1]) or capacity
        local last      = tonumber(data[2]) or now

        -- Refill tokens based on elapsed time
        local elapsed   = now - last
        local refilled  = math.floor(elapsed / interval) * refill
        tokens          = math.min(capacity, tokens + refilled)

        if tokens < 1 then
            return 0   -- rate limit hit
        end

        tokens = tokens - 1
        redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
        redis.call('EXPIRE', key, interval * 2)
        return 1   -- allowed
        """);

    public async Task<bool> IsAllowedAsync(string clientId, int capacity, int refillRate, int intervalMs)
    {
        var result = await redis.ScriptEvaluateAsync(RateLimitScript, new
        {
            key = (RedisKey)$"ratelimit:{clientId}",
        },
        new RedisValue[]
        {
            capacity, refillRate, intervalMs,
            DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
        });

        return (int)result == 1;
    }

    // Distributed lock — SET NX PX
    public async Task<bool> TryAcquireLockAsync(string resource, string lockValue, TimeSpan timeout)
        => await redis.StringSetAsync(
            $"lock:{resource}",
            lockValue,
            timeout,
            When.NotExists);   // SET NX — atomic

    // Safe release — only delete if we own the lock (Lua)
    private static readonly LuaScript ReleaseLockScript = LuaScript.Prepare("""
        if redis.call('GET', KEYS[1]) == ARGV[1] then
            return redis.call('DEL', KEYS[1])
        else
            return 0
        end
        """);

    public async Task<bool> ReleaseLockAsync(string resource, string lockValue)
    {
        var result = await redis.ScriptEvaluateAsync(ReleaseLockScript, new
        {
            key = (RedisKey)$"lock:{resource}",
        },
        new RedisValue[] { lockValue });

        return (int)result == 1;
    }
}

Pattern 6: Session Storage

C#
// Store session data with sliding expiry
builder.Services.AddSession(opts =>
{
    opts.IdleTimeout        = TimeSpan.FromMinutes(30);
    opts.Cookie.HttpOnly    = true;
    opts.Cookie.IsEssential = true;
    opts.Cookie.SecurePolicy = CookieSecurePolicy.Always;
});

// Use Redis as the session store (via IDistributedCache)
builder.Services.AddStackExchangeRedisCache(opts =>
    opts.Configuration = connectionString);

// In a controller
public class CartController(ISession session) : ControllerBase
{
    [HttpPost("add-to-cart")]
    public IActionResult AddToCart(int productId, int quantity)
    {
        var cart = HttpContext.Session.Get<List<CartItem>>("cart") ?? [];
        cart.Add(new CartItem(productId, quantity));
        HttpContext.Session.Set("cart", cart);
        return Ok();
    }
}

Pattern 7: Pipeline Batching

C#
// Send multiple commands in one network round trip — big performance win
public async Task<Dictionary<int, Product?>> GetManyAsync(IEnumerable<int> productIds)
{
    var ids     = productIds.ToList();
    var batch   = redis.CreateBatch();
    var tasks   = ids.ToDictionary(id => id, id =>
        batch.StringGetAsync($"product:{id}"));

    batch.Execute();   // sends all commands at once

    await Task.WhenAll(tasks.Values);

    return tasks.ToDictionary(
        kvp => kvp.Key,
        kvp => kvp.Value.Result.HasValue
            ? JsonSerializer.Deserialize<Product>(kvp.Value.Result!)
            : null);
}

Interview Answer

"Redis patterns in .NET beyond basic caching: Pub/Sub delivers messages to all current subscribers in real-time (fire-and-forget — messages are lost if no subscriber is connected). Streams solve that: each message is persisted with an ID, consumer groups enable competing consumers with acknowledgement — PEL (pending entries list) tracks unacked messages so they can be redelivered if a consumer crashes. Sorted Sets power leaderboards: SortedSetIncrementAsync atomically updates a player's score, SortedSetRangeByRankWithScoresAsync returns top N in O(log N + count), and GetRankAsync returns a player's position in O(log N). Lua scripts run atomically across multiple keys — the canonical use is the token bucket rate limiter and the distributed lock safe-release (check then delete without a race condition). For distributed locks: StringSetAsync with When.NotExists + TTL acquires the lock atomically; a Lua script for release ensures only the lock holder can delete it. Pipeline batching: CreateBatch + batch.Execute() sends all commands in one TCP round trip — critical for bulk reads."

Enjoyed this article?

Explore the Backend Systems learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.