Bidirectional Streaming RPC — Full-Duplex gRPC
Implement bidirectional streaming in gRPC ASP.NET Core: reading and writing concurrently, chat-style protocols, real-time collaborative workflows, and the production patterns for managing concurrent streams.
Bidirectional Streaming Pattern
Both client and server can send messages independently and concurrently over a single connection. Full-duplex communication.
Client: send PatientMonitorRequest → Server processes
Server: send PatientStatusResponse → Client processes
Client: send another request (any time)
Server: send another response (any time)
Both sides read and write concurrently
Stream ends when: either side completes their write stream,
deadline is exceeded, or either side cancelsProto Definition
service PatientMonitorService {
// Both sides stream independently
rpc MonitorPatients (stream PatientMonitorRequest)
returns (stream PatientStatusResponse);
}
message PatientMonitorRequest {
oneof action {
SubscribeRequest subscribe = 1;
UnsubscribeRequest unsubscribe = 2;
AcknowledgeRequest acknowledge = 3;
}
}
message SubscribeRequest { string patient_id = 1; }
message UnsubscribeRequest { string patient_id = 1; }
message AcknowledgeRequest { string alert_id = 1; }
message PatientStatusResponse {
string patient_id = 1;
string status_type = 2; // "vitals_updated", "alert_triggered", "order_changed"
bytes payload = 3; // serialized type-specific data
int64 timestamp = 4;
}Server Implementation
public sealed class PatientMonitorGrpcService
: PatientMonitorService.PatientMonitorServiceBase
{
private readonly IVitalsRepository _vitals;
private readonly IAlertService _alerts;
public override async Task MonitorPatients(
IAsyncStreamReader<PatientMonitorRequest> requestStream,
IServerStreamWriter<PatientStatusResponse> responseStream,
ServerCallContext context)
{
var ct = context.CancellationToken;
var subscribedIds = new ConcurrentHashSet<Guid>();
// Read client messages concurrently with writing responses
// Task A: process incoming requests (subscribe/unsubscribe)
var readTask = Task.Run(async () =>
{
await foreach (var request in requestStream.ReadAllAsync(ct))
{
switch (request.ActionCase)
{
case PatientMonitorRequest.ActionOneofCase.Subscribe:
if (Guid.TryParse(request.Subscribe.PatientId, out var subId))
subscribedIds.Add(subId);
break;
case PatientMonitorRequest.ActionOneofCase.Unsubscribe:
if (Guid.TryParse(request.Unsubscribe.PatientId, out var unsubId))
subscribedIds.TryRemove(unsubId);
break;
case PatientMonitorRequest.ActionOneofCase.Acknowledge:
if (Guid.TryParse(request.Acknowledge.AlertId, out var alertId))
await _alerts.AcknowledgeAsync(alertId, ct);
break;
}
}
}, ct);
// Task B: push status updates for subscribed patients
var writeTask = Task.Run(async () =>
{
while (!ct.IsCancellationRequested)
{
foreach (var patientId in subscribedIds)
{
var status = await _vitals.GetLatestStatusAsync(patientId, ct);
if (status is not null)
{
await responseStream.WriteAsync(new PatientStatusResponse
{
PatientId = patientId.ToString(),
StatusType = "vitals_updated",
Payload = ByteString.CopyFrom(
JsonSerializer.SerializeToUtf8Bytes(status)),
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
}, ct);
}
}
await Task.Delay(TimeSpan.FromSeconds(15), ct);
}
}, ct);
// Wait for both to complete
try
{
await Task.WhenAll(readTask, writeTask);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested) { }
}
}Client Implementation
using var channel = GrpcChannel.ForAddress("https://monitor.systemforge.internal");
var client = new PatientMonitorService.PatientMonitorServiceClient(channel);
using var cts = new CancellationTokenSource();
using var call = client.MonitorPatients(cancellationToken: cts.Token);
// Read responses concurrently
var readTask = Task.Run(async () =>
{
await foreach (var status in call.ResponseStream.ReadAllAsync(cts.Token))
{
switch (status.StatusType)
{
case "vitals_updated":
var vitals = JsonSerializer.Deserialize<VitalsDto>(status.Payload.ToArray());
UpdateVitalsDisplay(status.PatientId, vitals!);
break;
case "alert_triggered":
var alert = JsonSerializer.Deserialize<AlertDto>(status.Payload.ToArray());
ShowAlert(alert!);
break;
}
}
}, cts.Token);
// Subscribe to patients
await call.RequestStream.WriteAsync(new PatientMonitorRequest
{
Subscribe = new SubscribeRequest { PatientId = patientId.ToString() }
});
// Acknowledge an alert
await call.RequestStream.WriteAsync(new PatientMonitorRequest
{
Acknowledge = new AcknowledgeRequest { AlertId = alertId.ToString() }
});
// Unsubscribe when done
await call.RequestStream.WriteAsync(new PatientMonitorRequest
{
Unsubscribe = new UnsubscribeRequest { PatientId = patientId.ToString() }
});
await call.RequestStream.CompleteAsync();
await readTask;When to Use Bidirectional Streaming
Bidirectional streaming is the most complex gRPC pattern. Use it when:
✓ Client needs to send commands AND receive real-time updates over the same connection
✓ Collaborative workflows (multiple users editing the same record)
✓ Command-response over long-lived connection (chat-style protocol)
✓ Interactive monitoring where the client controls what it monitors
Consider simpler alternatives:
Server streaming: if the client only subscribes once and receives data
SignalR: if you need browser WebSocket support with group broadcasting
Unary + polling: if real-time is not critical and request patterns are simpleConcurrent Write Safety
// IServerStreamWriter is NOT thread-safe for concurrent writes
// If both tasks write concurrently, use a Channel as a buffer
var responseChannel = Channel.CreateUnbounded<PatientStatusResponse>();
// Multiple sources write to the channel
_ = Task.Run(async () =>
{
while (true)
await responseChannel.Writer.WriteAsync(await GetVitalStatusAsync());
});
// Single consumer writes to the gRPC stream (thread-safe single writer)
await foreach (var response in responseChannel.Reader.ReadAllAsync(ct))
await responseStream.WriteAsync(response, ct);Production issue I've seen: A team's bidirectional streaming implementation had two concurrent tasks both calling
responseStream.WriteAsync(). Under load, this causedInvalidOperationException: Cannot write to a response that has already completed— but the underlying issue was concurrent writes to a non-thread-safe writer. The Channel-based single writer pattern fixed it.
Key Takeaway
Bidirectional streaming is full-duplex: client and server send independently. Run read and write as concurrent tasks.
IServerStreamWriter<T>is not thread-safe — use aChannel<T>buffer if multiple sources write to the stream. Use bidirectional streaming for interactive protocols where the client controls what it monitors. For simpler scenarios (subscribe-and-receive), server streaming or SignalR is more appropriate.
gRPC Knowledge Check
5 questions · Test what you just learned · Instant explanations
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.