.NET & C# Development · Lesson 12 of 229
System.Threading.Channels — Producer-Consumer with Backpressure
System.Threading.Channels in C# — Producer-Consumer with Backpressure
System.Threading.Channels is the standard .NET API for producer-consumer pipelines. It replaces BlockingCollection<T>, ConcurrentQueue<T> with manual signalling, and TPL Dataflow for most scenarios. Channels are async-first, allocation-efficient, and give you precise control over what happens when the channel is full.
What you'll learn:
- Unbounded vs bounded channels
- Backpressure:
Wait,DropWrite,DropOldest,DropNewest - Single-writer/single-reader vs multi-writer/multi-reader
- Async read loops with
ReadAllAsync - Composing pipeline stages
- Fan-out and fan-in patterns
- Graceful shutdown and cancellation
Why Channels Over Alternatives
| Approach | Problem |
|---|---|
| ConcurrentQueue<T> + SpinWait | Burns CPU spinning in the consumer loop |
| BlockingCollection<T> | Blocks a thread — not async-friendly |
| TPL Dataflow | Complex API, harder to reason about |
| Channels | Async, composable, backpressure built-in |
1. Unbounded Channel — Basic Producer-Consumer
An unbounded channel never blocks the writer. Items accumulate until the consumer processes them.
// Create once — usually injected as a singleton
var channel = Channel.CreateUnbounded<OrderEvent>(new UnboundedChannelOptions
{
SingleWriter = false, // multiple producers allowed
SingleReader = false, // multiple consumers allowed
AllowSynchronousContinuations = false, // safer: don't run callbacks inline
});
// Producer (runs anywhere — HTTP handler, background service, etc.)
await channel.Writer.WriteAsync(new OrderEvent(orderId, "placed"), ct);
// Consumer (background service)
await foreach (var evt in channel.Reader.ReadAllAsync(ct))
{
await ProcessEventAsync(evt);
}SingleWriter = true / SingleReader = true enable internal optimisations — use them when you know there's only one writer or one reader.
Risk of unbounded channels: if the producer is faster than the consumer, memory grows indefinitely. Use bounded channels for production.
2. Bounded Channel — Backpressure Control
var channel = Channel.CreateBounded<OrderEvent>(new BoundedChannelOptions(capacity: 1000)
{
FullMode = BoundedChannelFullMode.Wait, // producer awaits until space is available
SingleWriter = false,
SingleReader = false,
});BoundedChannelFullMode options
Wait — the producer's WriteAsync awaits until a slot is free. This is true backpressure: the producer slows down to match the consumer. Use this when you cannot lose any messages.
// Producer blocks (async) when channel is full
await channel.Writer.WriteAsync(item, ct);DropWrite — if the channel is full, the new item is silently dropped. The write returns false. Use for metrics or telemetry where losing a sample is acceptable.
if (!channel.Writer.TryWrite(metric))
_droppedCounter.Increment(); // track drops for monitoringDropOldest — if full, the oldest item is evicted to make room. Use for live data feeds where stale data is worthless (live price ticks, sensor readings).
DropNewest — if full, the new item is dropped (same effect as DropWrite but eviction choice is explicit). Use when you want to protect buffered items already waiting.
3. Graceful Shutdown
Always call Complete() on the writer when the producer is done. This signals to the consumer that no more items will arrive, allowing ReadAllAsync to finish naturally.
// Producer
try
{
foreach (var item in source)
await channel.Writer.WriteAsync(item, ct);
}
catch (Exception ex)
{
// Signal the consumer that production failed
channel.Writer.Complete(ex);
throw;
}
finally
{
channel.Writer.TryComplete(); // no-op if already completed
}
// Consumer — ReadAllAsync exits when writer is completed
await foreach (var item in channel.Reader.ReadAllAsync(ct))
{
await ProcessAsync(item);
}
// After this line, channel is drained and writer is doneIf the writer completes with an exception, the exception propagates to the consumer's ReadAllAsync.
4. Background Service with Channel
The canonical pattern: an HTTP handler writes to the channel, a BackgroundService reads from it.
// Registration
builder.Services.AddSingleton(_ =>
Channel.CreateBounded<OrderEvent>(new BoundedChannelOptions(500)
{
FullMode = BoundedChannelFullMode.Wait,
}));
builder.Services.AddHostedService<OrderEventProcessor>();// HTTP endpoint — fast write, no processing time in request path
app.MapPost("/orders", async (
CreateOrderRequest req,
ChannelWriter<OrderEvent> writer,
CancellationToken ct) =>
{
var order = Order.Create(req);
await writer.WriteAsync(new OrderEvent(order.Id, "placed"), ct);
return Results.Created($"/orders/{order.Id}", order);
});// Background service — reads at its own pace
public class OrderEventProcessor : BackgroundService
{
private readonly ChannelReader<OrderEvent> _reader;
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<OrderEventProcessor> _logger;
public OrderEventProcessor(
Channel<OrderEvent> channel,
IServiceScopeFactory scopeFactory,
ILogger<OrderEventProcessor> logger)
{
_reader = channel.Reader;
_scopeFactory = scopeFactory;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken ct)
{
await foreach (var evt in _reader.ReadAllAsync(ct))
{
try
{
await ProcessEventAsync(evt, ct);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
_logger.LogError(ex, "Failed to process event {EventId}", evt.OrderId);
// Don't rethrow — keep the consumer loop running
}
}
}
private async Task ProcessEventAsync(OrderEvent evt, CancellationToken ct)
{
using var scope = _scopeFactory.CreateScope();
var handler = scope.ServiceProvider.GetRequiredService<IOrderEventHandler>();
await handler.HandleAsync(evt, ct);
}
}Important: inject Channel<T> (not ChannelWriter<T>) when you need both reader and writer in one place, or inject ChannelWriter<T> into producers and ChannelReader<T> into consumers to enforce direction at the type level.
5. Multiple Consumers — Parallel Processing
Channel readers are safe for multiple consumers: each item is delivered to exactly one consumer.
public class ParallelOrderProcessor : BackgroundService
{
private readonly ChannelReader<OrderEvent> _reader;
private const int DegreeOfParallelism = 4;
protected override async Task ExecuteAsync(CancellationToken ct)
{
// Launch N concurrent consumers from the same channel
var workers = Enumerable
.Range(0, DegreeOfParallelism)
.Select(_ => ConsumeAsync(ct));
await Task.WhenAll(workers);
}
private async Task ConsumeAsync(CancellationToken ct)
{
await foreach (var evt in _reader.ReadAllAsync(ct))
{
await ProcessEventAsync(evt, ct);
}
}
}Each worker reads from the same channel. No item is double-processed.
6. Fan-Out — One Channel, Multiple Destinations
For scenarios where every consumer should receive every item (broadcast):
public class Broadcaster<T>
{
private readonly List<Channel<T>> _subscribers = new();
private readonly Lock _lock = new();
public ChannelReader<T> Subscribe()
{
var channel = Channel.CreateBounded<T>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.DropOldest,
});
lock (_lock) _subscribers.Add(channel);
return channel.Reader;
}
public async ValueTask BroadcastAsync(T item, CancellationToken ct = default)
{
List<Channel<T>> snapshot;
lock (_lock) snapshot = [.._subscribers];
foreach (var sub in snapshot)
await sub.Writer.WriteAsync(item, ct);
}
}7. Pipeline Stages
Chain channels to build a data processing pipeline where each stage runs concurrently:
// Stage 1: Read raw lines from file
// Stage 2: Parse CSV lines into records
// Stage 3: Validate and enrich records
// Stage 4: Write to database
static async Task RunPipelineAsync(string filePath, CancellationToken ct)
{
var rawLines = Channel.CreateBounded<string>(200);
var parsed = Channel.CreateBounded<CsvRecord>(200);
var validated = Channel.CreateBounded<EnrichedRecord>(200);
// All stages run concurrently
await Task.WhenAll(
ReadFileAsync(filePath, rawLines.Writer, ct),
ParseStageAsync(rawLines.Reader, parsed.Writer, ct),
ValidateStageAsync(parsed.Reader, validated.Writer, ct),
WriteStageAsync(validated.Reader, ct)
);
}
static async Task ReadFileAsync(string path, ChannelWriter<string> writer, CancellationToken ct)
{
try
{
await foreach (var line in File.ReadLinesAsync(path, ct))
await writer.WriteAsync(line, ct);
}
finally
{
writer.TryComplete();
}
}
static async Task ParseStageAsync(
ChannelReader<string> reader,
ChannelWriter<CsvRecord> writer,
CancellationToken ct)
{
try
{
await foreach (var line in reader.ReadAllAsync(ct))
{
if (TryParseCsv(line, out var record))
await writer.WriteAsync(record, ct);
}
}
finally
{
writer.TryComplete();
}
}
// ValidateStage and WriteStage follow the same pattern...The TryComplete() in each stage's finally block ensures that downstream stages drain and exit when the upstream stage finishes, even if it fails.
8. Monitoring Channel Health
Expose channel depth as a metric so you can alert when the consumer is falling behind:
// In a metrics background service
var depth = channel.Reader.Count;
_metrics.RecordGauge("channel.depth", depth, new KeyValuePair<string, object?>("name", "order-events"));
if (depth > 800) // 80% of capacity 1000
_logger.LogWarning("Order event channel is {Depth}/1000 — consumer may be slow", depth);Channels vs Other Options
Use Channel<T> when:
- You have async producers and consumers
- You need backpressure
- You want a simple, lightweight queue within a single process
Use a message broker (RabbitMQ, Azure Service Bus) when:
- Producers and consumers are in different processes or services
- You need durability (survive process restart)
- You need guaranteed delivery or ordering across services
Use IAsyncEnumerable<T> (without a channel) when:
- You have a single producer and consumer and don't need a buffer
- A generator method (
yield return) models the production naturally
Use BlockingCollection<T> when:
- You're forced to use synchronous APIs (rare in modern code)