Learnixo
Back to blog
Backend Systemsadvanced

gRPC Streaming Patterns in .NET 9

Production gRPC streaming in .NET 9: server streaming, client streaming, bidirectional streaming, flow control, deadlines, metadata, error handling, health checks, and when to use each pattern over REST.

Asma Hafeez KhanMay 26, 202612 min read
.NETC#gRPCStreamingProtobufMicroservicesPerformanceReal-Time
Share:𝕏

gRPC Streaming Patterns in .NET 9

gRPC has four call types. Most tutorials cover Unary (request/response). The three streaming patterns — server streaming, client streaming, and bidirectional streaming — are where gRPC's advantages over REST become decisive: structured contracts, binary framing, multiplexed HTTP/2 connections, and built-in flow control.

What you'll build:

  • Server streaming: live log tailing, progress events
  • Client streaming: bulk data ingestion
  • Bidirectional streaming: real-time collaborative editing
  • Deadlines, cancellation, and error propagation
  • Metadata and authentication headers
  • Health checks and reflection

The Four gRPC Call Types

PROTOBUF
service DataService {
  // 1. Unary — single request, single response
  rpc GetRecord (GetRecordRequest) returns (RecordResponse);

  // 2. Server streaming — single request, stream of responses
  rpc TailLogs (TailLogsRequest) returns (stream LogEntry);

  // 3. Client streaming — stream of requests, single response
  rpc IngestEvents (stream EventRecord) returns (IngestSummary);

  // 4. Bidirectional streaming — stream of requests, stream of responses
  rpc Collaborate (stream CollaborationMessage) returns (stream CollaborationMessage);
}

The streaming types share the same HTTP/2 connection. A single connection supports multiple concurrent streams, unlike HTTP/1.1 where each request needs its own connection.


1. Server Streaming — Live Log Tailing

The client sends one request. The server streams responses until the stream is complete or the client cancels.

Use cases: log tailing, progress reporting, live dashboards, event feeds.

Proto definition

PROTOBUF
// Protos/log_service.proto
syntax = "proto3";
option csharp_namespace = "LogService.Grpc";

package logservice;

service Logs {
  rpc TailLogs (TailLogsRequest) returns (stream LogEntry);
  rpc StreamMetrics (MetricsRequest) returns (stream MetricSnapshot);
}

message TailLogsRequest {
  string service_name = 1;
  string level_filter = 2;      // "ERROR", "WARN", "INFO", ""=all
  int32 tail_lines = 3;         // 0 = only new entries
}

message LogEntry {
  string id = 1;
  string level = 2;
  string message = 3;
  string service = 4;
  int64 timestamp_unix_ms = 5;
  map<string, string> attributes = 6;
}

message MetricsRequest {
  repeated string metric_names = 1;
  int32 interval_seconds = 2;
}

message MetricSnapshot {
  string name = 1;
  double value = 2;
  int64 timestamp_unix_ms = 3;
}

Server implementation

C#
// Services/LogsGrpcService.cs
public class LogsGrpcService : Logs.LogsBase
{
    private readonly ILogStore _logStore;
    private readonly ILogger<LogsGrpcService> _logger;

    public LogsGrpcService(ILogStore logStore, ILogger<LogsGrpcService> logger)
    {
        _logStore = logStore;
        _logger = logger;
    }

    public override async Task TailLogs(
        TailLogsRequest request,
        IServerStreamWriter<LogEntry> responseStream,
        ServerCallContext context)
    {
        _logger.LogInformation(
            "Starting log tail for {Service}, filter={Filter}",
            request.ServiceName, request.LevelFilter);

        // Optionally send historical tail first
        if (request.TailLines > 0)
        {
            var historical = await _logStore.GetTailAsync(
                request.ServiceName, request.TailLines, context.CancellationToken);

            foreach (var entry in historical)
            {
                await responseStream.WriteAsync(MapToProto(entry), context.CancellationToken);
            }
        }

        // Then stream live entries until client cancels
        await foreach (var entry in _logStore.StreamLiveAsync(
            request.ServiceName,
            request.LevelFilter,
            context.CancellationToken))
        {
            await responseStream.WriteAsync(MapToProto(entry), context.CancellationToken);
        }
    }

    public override async Task StreamMetrics(
        MetricsRequest request,
        IServerStreamWriter<MetricSnapshot> responseStream,
        ServerCallContext context)
    {
        var interval = TimeSpan.FromSeconds(Math.Max(1, request.IntervalSeconds));

        while (!context.CancellationToken.IsCancellationRequested)
        {
            foreach (var name in request.MetricNames)
            {
                var value = await _logStore.GetMetricValueAsync(name, context.CancellationToken);
                await responseStream.WriteAsync(new MetricSnapshot
                {
                    Name = name,
                    Value = value,
                    TimestampUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
                }, context.CancellationToken);
            }

            await Task.Delay(interval, context.CancellationToken);
        }
    }

    private static LogEntry MapToProto(LogRecord record) => new()
    {
        Id = record.Id,
        Level = record.Level,
        Message = record.Message,
        Service = record.ServiceName,
        TimestampUnixMs = record.Timestamp.ToUnixTimeMilliseconds(),
        Attributes = { record.Attributes },
    };
}

Client consumption

C#
// Consuming server streaming from another .NET service
public class LogTailConsumer
{
    private readonly Logs.LogsClient _client;

    public async Task TailAsync(string serviceName, CancellationToken ct)
    {
        using var call = _client.TailLogs(new TailLogsRequest
        {
            ServiceName = serviceName,
            LevelFilter = "ERROR",
            TailLines = 100,
        }, cancellationToken: ct);

        await foreach (var entry in call.ResponseStream.ReadAllAsync(ct))
        {
            Console.WriteLine($"[{entry.Level}] {entry.Message}");
        }
    }
}

Key point: The await foreach loop on ResponseStream.ReadAllAsync blocks until the server closes the stream or the CancellationToken fires. When the client cancels, the server receives OperationCanceledException on context.CancellationToken.


2. Client Streaming — Bulk Data Ingestion

The client sends a stream of requests. The server accumulates them and returns a single response when the client signals it is done.

Use cases: file uploads in chunks, bulk record import, log aggregation, telemetry ingestion.

Proto definition

PROTOBUF
// Protos/ingestion.proto
syntax = "proto3";
option csharp_namespace = "IngestionService.Grpc";

package ingestion;

service Ingestion {
  rpc IngestEvents (stream EventRecord) returns (IngestSummary);
  rpc UploadFile (stream FileChunk) returns (UploadResult);
}

message EventRecord {
  string event_id = 1;
  string event_type = 2;
  string payload_json = 3;
  int64 occurred_at_unix_ms = 4;
  string source_service = 5;
}

message IngestSummary {
  int32 accepted = 1;
  int32 rejected = 2;
  repeated string rejected_ids = 3;
  int64 processing_duration_ms = 4;
}

message FileChunk {
  bytes data = 1;
  string file_name = 2;    // sent only in first chunk
  string content_type = 3; // sent only in first chunk
  int32 chunk_index = 4;
  bool is_last = 5;
}

message UploadResult {
  string file_id = 1;
  string url = 2;
  int64 size_bytes = 3;
}

Server implementation

C#
// Services/IngestionGrpcService.cs
public class IngestionGrpcService : Ingestion.IngestionBase
{
    private readonly IEventRepository _events;
    private readonly IFileStorage _storage;

    public override async Task<IngestSummary> IngestEvents(
        IAsyncStreamReader<EventRecord> requestStream,
        ServerCallContext context)
    {
        var sw = Stopwatch.StartNew();
        var accepted = 0;
        var rejected = new List<string>();
        var batch = new List<EventRecord>(capacity: 100);

        await foreach (var record in requestStream.ReadAllAsync(context.CancellationToken))
        {
            if (IsValid(record))
            {
                batch.Add(record);
                accepted++;

                // Flush in batches of 100 to avoid holding everything in memory
                if (batch.Count >= 100)
                {
                    await _events.BulkInsertAsync(batch, context.CancellationToken);
                    batch.Clear();
                }
            }
            else
            {
                rejected.Add(record.EventId);
            }
        }

        // Flush remaining
        if (batch.Count > 0)
            await _events.BulkInsertAsync(batch, context.CancellationToken);

        sw.Stop();

        return new IngestSummary
        {
            Accepted = accepted,
            Rejected = rejected.Count,
            RejectedIds = { rejected },
            ProcessingDurationMs = sw.ElapsedMilliseconds,
        };
    }

    public override async Task<UploadResult> UploadFile(
        IAsyncStreamReader<FileChunk> requestStream,
        ServerCallContext context)
    {
        string? fileName = null;
        string? contentType = null;
        long totalBytes = 0;
        var fileId = Guid.NewGuid().ToString();

        await using var writeStream = await _storage.OpenWriteAsync(fileId);

        await foreach (var chunk in requestStream.ReadAllAsync(context.CancellationToken))
        {
            if (chunk.ChunkIndex == 0)
            {
                fileName = chunk.FileName;
                contentType = chunk.ContentType;
            }

            await writeStream.WriteAsync(chunk.Data.Memory, context.CancellationToken);
            totalBytes += chunk.Data.Length;
        }

        var url = await _storage.FinaliseAsync(fileId, fileName!, contentType!);

        return new UploadResult
        {
            FileId = fileId,
            Url = url,
            SizeBytes = totalBytes,
        };
    }

    private static bool IsValid(EventRecord record) =>
        !string.IsNullOrEmpty(record.EventId) &&
        !string.IsNullOrEmpty(record.EventType) &&
        record.OccurredAtUnixMs > 0;
}

Client sending a stream

C#
// Sending a bulk upload from a .NET client
public class EventIngestionClient
{
    private readonly Ingestion.IngestionClient _grpc;

    public async Task<IngestSummary> SendEventsAsync(
        IEnumerable<DomainEvent> events,
        CancellationToken ct = default)
    {
        // Set a deadline: the whole stream must complete within 30 seconds
        using var call = _grpc.IngestEvents(
            deadline: DateTime.UtcNow.AddSeconds(30),
            cancellationToken: ct);

        foreach (var evt in events)
        {
            await call.RequestStream.WriteAsync(new EventRecord
            {
                EventId = evt.Id,
                EventType = evt.GetType().Name,
                PayloadJson = JsonSerializer.Serialize(evt),
                OccurredAtUnixMs = evt.OccurredAt.ToUnixTimeMilliseconds(),
                SourceService = "orders-service",
            }, ct);
        }

        // Signal end of stream — server's ReadAllAsync completes
        await call.RequestStream.CompleteAsync();

        return await call.ResponseAsync;
    }
}

3. Bidirectional Streaming — Collaborative Editing

Both sides send independent streams simultaneously. Neither waits for the other to finish before sending.

Use cases: real-time collaborative document editing, multiplayer game state sync, live auctions, trading systems.

Proto definition

PROTOBUF
// Protos/collaboration.proto
syntax = "proto3";
option csharp_namespace = "CollaborationService.Grpc";

package collab;

service Collaboration {
  rpc EditSession (stream ClientMessage) returns (stream ServerMessage);
}

message ClientMessage {
  oneof payload {
    JoinSession join = 1;
    DocumentPatch patch = 2;
    CursorPosition cursor = 3;
    PingMessage ping = 4;
  }
}

message JoinSession {
  string document_id = 1;
  string user_id = 2;
  string display_name = 3;
  int32 last_known_version = 4;
}

message DocumentPatch {
  string document_id = 1;
  string operation_json = 2;  // OT or CRDT operation
  int32 base_version = 3;
}

message CursorPosition {
  int32 line = 1;
  int32 column = 2;
}

message PingMessage {
  int64 client_time_unix_ms = 1;
}

message ServerMessage {
  oneof payload {
    SessionJoined session_joined = 1;
    PatchBroadcast patch = 2;
    CursorBroadcast cursor = 3;
    PongMessage pong = 4;
    ErrorMessage error = 5;
  }
}

message SessionJoined {
  int32 current_version = 1;
  string document_snapshot_json = 2;
  repeated ActiveUser active_users = 3;
}

message ActiveUser {
  string user_id = 1;
  string display_name = 2;
}

message PatchBroadcast {
  string author_user_id = 1;
  string operation_json = 2;
  int32 version = 3;
}

message CursorBroadcast {
  string user_id = 1;
  int32 line = 2;
  int32 column = 3;
}

message PongMessage {
  int64 client_time_unix_ms = 1;
  int64 server_time_unix_ms = 2;
}

message ErrorMessage {
  string code = 1;
  string message = 2;
}

Server implementation

C#
// Services/CollaborationGrpcService.cs
public class CollaborationGrpcService : Collaboration.CollaborationBase
{
    private readonly IDocumentSessionManager _sessions;
    private readonly ILogger<CollaborationGrpcService> _logger;

    public override async Task EditSession(
        IAsyncStreamReader<ClientMessage> requestStream,
        IServerStreamWriter<ServerMessage> responseStream,
        ServerCallContext context)
    {
        string? userId = null;
        string? documentId = null;
        IDocumentSession? session = null;

        // Subscribe to session broadcasts — messages from OTHER clients
        // that this client should receive
        var broadcastChannel = Channel.CreateBounded<ServerMessage>(
            new BoundedChannelOptions(50)
            {
                FullMode = BoundedChannelFullMode.DropOldest,
            });

        try
        {
            // Read from client and write to response concurrently
            var readTask = ReadClientMessagesAsync(
                requestStream, responseStream, broadcastChannel, context,
                ref userId, ref documentId, ref session);

            var writeTask = WriteBroadcastsAsync(responseStream, broadcastChannel, context);

            await Task.WhenAll(readTask, writeTask);
        }
        finally
        {
            if (session is not null && userId is not null)
                await session.RemoveParticipantAsync(userId, broadcastChannel);
        }
    }

    private async Task ReadClientMessagesAsync(
        IAsyncStreamReader<ClientMessage> reader,
        IServerStreamWriter<ServerMessage> writer,
        Channel<ServerMessage> broadcastChannel,
        ServerCallContext context,
        ref string? userId,
        ref string? documentId,
        ref IDocumentSession? session)
    {
        await foreach (var msg in reader.ReadAllAsync(context.CancellationToken))
        {
            switch (msg.PayloadCase)
            {
                case ClientMessage.PayloadOneofCase.Join:
                    (userId, documentId, session) = await HandleJoinAsync(
                        msg.Join, writer, broadcastChannel, context);
                    break;

                case ClientMessage.PayloadOneofCase.Patch when session is not null:
                    await HandlePatchAsync(msg.Patch, session, userId!, context);
                    break;

                case ClientMessage.PayloadOneofCase.Cursor when session is not null:
                    await session.BroadcastCursorAsync(userId!, msg.Cursor.Line, msg.Cursor.Column);
                    break;

                case ClientMessage.PayloadOneofCase.Ping:
                    await writer.WriteAsync(new ServerMessage
                    {
                        Pong = new PongMessage
                        {
                            ClientTimeUnixMs = msg.Ping.ClientTimeUnixMs,
                            ServerTimeUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
                        }
                    }, context.CancellationToken);
                    break;
            }
        }
    }

    private async Task WriteBroadcastsAsync(
        IServerStreamWriter<ServerMessage> writer,
        Channel<ServerMessage> channel,
        ServerCallContext context)
    {
        await foreach (var msg in channel.Reader.ReadAllAsync(context.CancellationToken))
        {
            await writer.WriteAsync(msg, context.CancellationToken);
        }
    }

    private async Task<(string userId, string documentId, IDocumentSession session)> HandleJoinAsync(
        JoinSession join,
        IServerStreamWriter<ServerMessage> writer,
        Channel<ServerMessage> broadcastChannel,
        ServerCallContext context)
    {
        var session = await _sessions.GetOrCreateAsync(join.DocumentId);
        var snapshot = await session.AddParticipantAsync(join.UserId, broadcastChannel);

        await writer.WriteAsync(new ServerMessage
        {
            SessionJoined = new SessionJoined
            {
                CurrentVersion = snapshot.Version,
                DocumentSnapshotJson = snapshot.ContentJson,
                ActiveUsers =
                {
                    snapshot.ActiveUsers.Select(u => new ActiveUser
                    {
                        UserId = u.Id,
                        DisplayName = u.DisplayName,
                    })
                },
            }
        }, context.CancellationToken);

        return (join.UserId, join.DocumentId, session);
    }

    private async Task HandlePatchAsync(
        DocumentPatch patch,
        IDocumentSession session,
        string authorId,
        ServerCallContext context)
    {
        var result = await session.ApplyPatchAsync(patch.OperationJson, patch.BaseVersion, authorId);

        if (!result.Success)
        {
            // Conflict — client needs to rebase; this would trigger client-side OT resolution
            _logger.LogWarning("Patch conflict for user {UserId}", authorId);
        }
    }
}

4. Deadlines and Cancellation

Deadlines are set by the client and enforced end-to-end across the entire call chain. They are the correct way to bound gRPC call duration — not Task.WhenAny with a timeout.

C#
// Client: always set a deadline
var call = _client.TailLogs(request,
    deadline: DateTime.UtcNow.AddMinutes(5),
    cancellationToken: userCancellationToken);
C#
// Server: check CancellationToken on every iteration
await foreach (var entry in _store.StreamAsync(context.CancellationToken))
{
    // If client deadline exceeded or client cancelled, this throws OperationCanceledException
    await responseStream.WriteAsync(MapToProto(entry), context.CancellationToken);
}

Catching deadline exceeded on the client:

C#
try
{
    await foreach (var entry in call.ResponseStream.ReadAllAsync(ct))
    {
        Process(entry);
    }
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.DeadlineExceeded)
{
    _logger.LogWarning("Log tail deadline exceeded after {Duration}", elapsed);
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
    // Normal — user navigated away, caller cancelled
}

5. Metadata (Headers and Trailers)

gRPC has request headers (sent before the first message) and response trailers (sent after the last message). Use them for correlation IDs, rate limit info, and pagination cursors.

C#
// Server: read request headers
public override async Task TailLogs(
    TailLogsRequest request,
    IServerStreamWriter<LogEntry> responseStream,
    ServerCallContext context)
{
    var correlationId = context.RequestHeaders
        .GetValue("x-correlation-id") ?? Guid.NewGuid().ToString();

    // ... processing ...

    // Send trailers after last message
    context.ResponseTrailers.Add("x-total-entries", _totalSent.ToString());
    context.ResponseTrailers.Add("x-stream-id", Guid.NewGuid().ToString());
}
C#
// Client: set request headers and read trailers
var headers = new Metadata
{
    { "x-correlation-id", correlationId },
    { "x-tenant-id", tenantId },
};

using var call = _client.TailLogs(request, headers, deadline, ct);

await foreach (var entry in call.ResponseStream.ReadAllAsync(ct)) { }

// Read trailers after stream completes
var trailers = call.GetTrailers();
var totalEntries = trailers.GetValue("x-total-entries");

6. Authentication

gRPC calls go through the standard ASP.NET Core authentication pipeline. For service-to-service calls, use a CallCredentials to attach a token:

C#
// Client: attach JWT to every call via ChannelCredentials
var credentials = CallCredentials.FromInterceptor(async (context, metadata) =>
{
    var token = await _tokenProvider.GetTokenAsync();
    metadata.Add("Authorization", $"Bearer {token}");
});

var channel = GrpcChannel.ForAddress("https://logservice:443",
    new GrpcChannelOptions
    {
        Credentials = ChannelCredentials.Create(new SslCredentials(), credentials),
    });

7. gRPC Health Checks and Reflection

Bash
dotnet add package Grpc.AspNetCore.HealthChecks
dotnet add package Grpc.AspNetCore.Server.Reflection
C#
// Program.cs
builder.Services.AddGrpcHealthChecks()
    .AddCheck<DatabaseHealthCheck>("database");

// ...

app.MapGrpcHealthChecksService();
app.MapGrpcReflectionService(); // enables grpcurl and Postman discovery
Bash
# Test with grpcurl
grpcurl -plaintext localhost:5001 list
grpcurl -plaintext localhost:5001 logservice.Logs/TailLogs

Choosing the Right Streaming Pattern

| Pattern | Client sends | Server sends | Use case | |---|---|---|---| | Unary | 1 request | 1 response | CRUD, queries | | Server streaming | 1 request | N responses | Live feed, progress, export | | Client streaming | N requests | 1 response | Bulk import, file upload | | Bidirectional | N requests | N responses | Collaboration, games, trading |

Performance rule of thumb: If you find yourself polling a unary endpoint every second, replace it with server streaming. If you find yourself batching and sending large payloads, replace it with client streaming. If both sides need independent async communication, use bidirectional.

vs SignalR: Use gRPC streaming for service-to-service. Use SignalR for browser clients — browsers cannot initiate gRPC calls over HTTP/2 without gRPC-Web (which requires a translation proxy). For browser real-time, SignalR is simpler. For internal service-to-service streaming, gRPC is faster, schema-enforced, and easier to test with grpcurl.

gRPC Knowledge Check

5 questions · Test what you just learned · Instant explanations

Enjoyed this article?

Explore the Backend Systems learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.