Learnixo
Back to blog
AI Systemsintermediate

Bidirectional Streaming RPC — Full-Duplex gRPC

Implement bidirectional streaming in gRPC ASP.NET Core: reading and writing concurrently, chat-style protocols, real-time collaborative workflows, and the production patterns for managing concurrent streams.

Asma Hafeez KhanMay 16, 20264 min read
gRPCBidirectional StreamingASP.NET Core.NETReal-Time
Share:𝕏

Bidirectional Streaming Pattern

Both client and server can send messages independently and concurrently over a single connection. Full-duplex communication.

Client: send PatientMonitorRequest  → Server processes
Server: send PatientStatusResponse  → Client processes
Client: send another request (any time)
Server: send another response (any time)

Both sides read and write concurrently
Stream ends when: either side completes their write stream,
                  deadline is exceeded, or either side cancels

Proto Definition

PROTOBUF
service PatientMonitorService {
    // Both sides stream independently
    rpc MonitorPatients (stream PatientMonitorRequest)
        returns (stream PatientStatusResponse);
}

message PatientMonitorRequest {
    oneof action {
        SubscribeRequest   subscribe   = 1;
        UnsubscribeRequest unsubscribe = 2;
        AcknowledgeRequest acknowledge = 3;
    }
}

message SubscribeRequest   { string patient_id = 1; }
message UnsubscribeRequest { string patient_id = 1; }
message AcknowledgeRequest { string alert_id   = 1; }

message PatientStatusResponse {
    string patient_id  = 1;
    string status_type = 2;  // "vitals_updated", "alert_triggered", "order_changed"
    bytes  payload     = 3;  // serialized type-specific data
    int64  timestamp   = 4;
}

Server Implementation

C#
public sealed class PatientMonitorGrpcService
    : PatientMonitorService.PatientMonitorServiceBase
{
    private readonly IVitalsRepository _vitals;
    private readonly IAlertService     _alerts;

    public override async Task MonitorPatients(
        IAsyncStreamReader<PatientMonitorRequest> requestStream,
        IServerStreamWriter<PatientStatusResponse> responseStream,
        ServerCallContext context)
    {
        var ct              = context.CancellationToken;
        var subscribedIds   = new ConcurrentHashSet<Guid>();

        // Read client messages concurrently with writing responses
        // Task A: process incoming requests (subscribe/unsubscribe)
        var readTask = Task.Run(async () =>
        {
            await foreach (var request in requestStream.ReadAllAsync(ct))
            {
                switch (request.ActionCase)
                {
                    case PatientMonitorRequest.ActionOneofCase.Subscribe:
                        if (Guid.TryParse(request.Subscribe.PatientId, out var subId))
                            subscribedIds.Add(subId);
                        break;

                    case PatientMonitorRequest.ActionOneofCase.Unsubscribe:
                        if (Guid.TryParse(request.Unsubscribe.PatientId, out var unsubId))
                            subscribedIds.TryRemove(unsubId);
                        break;

                    case PatientMonitorRequest.ActionOneofCase.Acknowledge:
                        if (Guid.TryParse(request.Acknowledge.AlertId, out var alertId))
                            await _alerts.AcknowledgeAsync(alertId, ct);
                        break;
                }
            }
        }, ct);

        // Task B: push status updates for subscribed patients
        var writeTask = Task.Run(async () =>
        {
            while (!ct.IsCancellationRequested)
            {
                foreach (var patientId in subscribedIds)
                {
                    var status = await _vitals.GetLatestStatusAsync(patientId, ct);
                    if (status is not null)
                    {
                        await responseStream.WriteAsync(new PatientStatusResponse
                        {
                            PatientId  = patientId.ToString(),
                            StatusType = "vitals_updated",
                            Payload    = ByteString.CopyFrom(
                                JsonSerializer.SerializeToUtf8Bytes(status)),
                            Timestamp  = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
                        }, ct);
                    }
                }

                await Task.Delay(TimeSpan.FromSeconds(15), ct);
            }
        }, ct);

        // Wait for both to complete
        try
        {
            await Task.WhenAll(readTask, writeTask);
        }
        catch (OperationCanceledException) when (ct.IsCancellationRequested) { }
    }
}

Client Implementation

C#
using var channel = GrpcChannel.ForAddress("https://monitor.systemforge.internal");
var client = new PatientMonitorService.PatientMonitorServiceClient(channel);

using var cts  = new CancellationTokenSource();
using var call = client.MonitorPatients(cancellationToken: cts.Token);

// Read responses concurrently
var readTask = Task.Run(async () =>
{
    await foreach (var status in call.ResponseStream.ReadAllAsync(cts.Token))
    {
        switch (status.StatusType)
        {
            case "vitals_updated":
                var vitals = JsonSerializer.Deserialize<VitalsDto>(status.Payload.ToArray());
                UpdateVitalsDisplay(status.PatientId, vitals!);
                break;

            case "alert_triggered":
                var alert = JsonSerializer.Deserialize<AlertDto>(status.Payload.ToArray());
                ShowAlert(alert!);
                break;
        }
    }
}, cts.Token);

// Subscribe to patients
await call.RequestStream.WriteAsync(new PatientMonitorRequest
{
    Subscribe = new SubscribeRequest { PatientId = patientId.ToString() }
});

// Acknowledge an alert
await call.RequestStream.WriteAsync(new PatientMonitorRequest
{
    Acknowledge = new AcknowledgeRequest { AlertId = alertId.ToString() }
});

// Unsubscribe when done
await call.RequestStream.WriteAsync(new PatientMonitorRequest
{
    Unsubscribe = new UnsubscribeRequest { PatientId = patientId.ToString() }
});

await call.RequestStream.CompleteAsync();
await readTask;

When to Use Bidirectional Streaming

Bidirectional streaming is the most complex gRPC pattern. Use it when:
  ✓ Client needs to send commands AND receive real-time updates over the same connection
  ✓ Collaborative workflows (multiple users editing the same record)
  ✓ Command-response over long-lived connection (chat-style protocol)
  ✓ Interactive monitoring where the client controls what it monitors

Consider simpler alternatives:
  Server streaming: if the client only subscribes once and receives data
  SignalR: if you need browser WebSocket support with group broadcasting
  Unary + polling: if real-time is not critical and request patterns are simple

Concurrent Write Safety

C#
// IServerStreamWriter is NOT thread-safe for concurrent writes
// If both tasks write concurrently, use a Channel as a buffer

var responseChannel = Channel.CreateUnbounded<PatientStatusResponse>();

// Multiple sources write to the channel
_ = Task.Run(async () =>
{
    while (true)
        await responseChannel.Writer.WriteAsync(await GetVitalStatusAsync());
});

// Single consumer writes to the gRPC stream (thread-safe single writer)
await foreach (var response in responseChannel.Reader.ReadAllAsync(ct))
    await responseStream.WriteAsync(response, ct);

Production issue I've seen: A team's bidirectional streaming implementation had two concurrent tasks both calling responseStream.WriteAsync(). Under load, this caused InvalidOperationException: Cannot write to a response that has already completed — but the underlying issue was concurrent writes to a non-thread-safe writer. The Channel-based single writer pattern fixed it.


Key Takeaway

Bidirectional streaming is full-duplex: client and server send independently. Run read and write as concurrent tasks. IServerStreamWriter<T> is not thread-safe — use a Channel<T> buffer if multiple sources write to the stream. Use bidirectional streaming for interactive protocols where the client controls what it monitors. For simpler scenarios (subscribe-and-receive), server streaming or SignalR is more appropriate.

gRPC Knowledge Check

5 questions · Test what you just learned · Instant explanations

Enjoyed this article?

Explore the AI Systems learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.