Server Streaming RPC — Pushing Data Flows with gRPC
Implement server-streaming gRPC in ASP.NET Core: streaming multiple responses for one request, handling cancellation, real-time data feeds, and when server streaming beats repeated polling.
Server Streaming Pattern
One request from the client, multiple responses streamed back over time. The server writes responses using IServerStreamWriter<T>.
Client:
Send StreamVitalsRequest once
Server:
Sends VitalSignResponse every 30 seconds
Continues until: client cancels, deadline exceeded,
or stream explicitly completed
Use when:
✓ Real-time data feeds (vitals, lab results, alerts)
✓ Large result sets (paginated download without multiple calls)
✓ Progress reporting for long-running operationsProto Definition
service ClinicalMonitorService {
rpc StreamPatientVitals (StreamVitalsRequest)
returns (stream VitalSignResponse);
rpc StreamWardAlerts (StreamAlertsRequest)
returns (stream AlertResponse);
}
message StreamVitalsRequest {
string patient_id = 1;
int32 interval_seconds = 2; // polling interval, e.g., 30
}
message VitalSignResponse {
string patient_id = 1;
double heart_rate = 2;
double spo2 = 3;
double temperature = 4;
double systolic_bp = 5;
double diastolic_bp = 6;
int64 recorded_at = 7; // Unix timestamp ms
}Server Implementation
public sealed class ClinicalMonitorGrpcService
: ClinicalMonitorService.ClinicalMonitorServiceBase
{
private readonly IVitalsRepository _vitals;
private readonly ILogger<ClinicalMonitorGrpcService> _logger;
public override async Task StreamPatientVitals(
StreamVitalsRequest request,
IServerStreamWriter<VitalSignResponse> responseStream,
ServerCallContext context)
{
if (!Guid.TryParse(request.PatientId, out var patientId))
throw new RpcException(new Status(StatusCode.InvalidArgument,
"PatientId must be a valid UUID."));
var intervalMs = Math.Max(5, request.IntervalSeconds) * 1000;
var ct = context.CancellationToken;
_logger.LogInformation("Starting vitals stream for patient {Id}", patientId);
try
{
while (!ct.IsCancellationRequested)
{
var vital = await _vitals.GetLatestAsync(patientId, ct);
if (vital is not null)
{
await responseStream.WriteAsync(new VitalSignResponse
{
PatientId = patientId.ToString(),
HeartRate = vital.HeartRate,
Spo2 = vital.SpO2,
Temperature = vital.Temperature,
SystolicBp = vital.SystolicBP,
DiastolicBp = vital.DiastolicBP,
RecordedAt = vital.RecordedAt.ToUnixTimeMilliseconds(),
}, ct);
}
await Task.Delay(intervalMs, ct);
}
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
// Normal: client cancelled or deadline exceeded — do not log as error
_logger.LogDebug("Vitals stream cancelled for patient {Id}", patientId);
}
}
}Client Consumption
// .NET gRPC client
using var channel = GrpcChannel.ForAddress("https://monitor.systemforge.internal");
var client = new ClinicalMonitorService.ClinicalMonitorServiceClient(channel);
var request = new StreamVitalsRequest
{
PatientId = patientId.ToString(),
IntervalSeconds = 30,
};
using var cts = new CancellationTokenSource();
using var stream = client.StreamPatientVitals(request,
deadline: DateTime.UtcNow.AddHours(8), // 8-hour shift monitoring
cancellationToken: cts.Token);
try
{
await foreach (var vital in stream.ResponseStream.ReadAllAsync(cts.Token))
{
Console.WriteLine(
$"[{DateTimeOffset.FromUnixTimeMilliseconds(vital.RecordedAt)}] " +
$"HR={vital.HeartRate}, SpO2={vital.Spo2}%, Temp={vital.Temperature}°C");
UpdateMonitorDisplay(vital);
}
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
Console.WriteLine("Stream cancelled.");
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.DeadlineExceeded)
{
Console.WriteLine("Stream deadline exceeded — reconnecting.");
// Reconnect logic here
}Large Result Set Streaming
// Stream all active patients instead of paginating
public override async Task GetAllActivePatients(
GetAllActivePatientsRequest request,
IServerStreamWriter<PatientResponse> responseStream,
ServerCallContext context)
{
var ct = context.CancellationToken;
// Process in chunks to avoid loading all records into memory
var batchSize = 100;
var offset = 0;
while (!ct.IsCancellationRequested)
{
var batch = await _repo.GetActiveBatchAsync(offset, batchSize, ct);
foreach (var patient in batch)
await responseStream.WriteAsync(patient.ToGrpcResponse(), ct);
if (batch.Count < batchSize) break; // last batch
offset += batch.Count;
}
}
// Client:
await foreach (var patient in client.GetAllActivePatients(request).ResponseStream.ReadAllAsync())
ProcessPatient(patient);Progress Reporting for Long Operations
// Stream progress updates for a long-running batch operation
public override async Task RunNightlyReview(
NightlyReviewRequest request,
IServerStreamWriter<NightlyReviewProgress> responseStream,
ServerCallContext context)
{
var ct = context.CancellationToken;
var patients = await _repo.GetAllActiveAsync(ct);
var total = patients.Count;
for (int i = 0; i < total && !ct.IsCancellationRequested; i++)
{
await ProcessPatientReviewAsync(patients[i], ct);
// Send progress update every 10 patients
if (i % 10 == 0 || i == total - 1)
{
await responseStream.WriteAsync(new NightlyReviewProgress
{
Processed = i + 1,
Total = total,
Percentage = (int)((i + 1.0 / total) * 100),
}, ct);
}
}
}WriteOptions and Flow Control
// Control write behavior
responseStream.WriteOptions = new WriteOptions(WriteFlags.BufferHint);
// BufferHint: buffer this write, send with next write (batching)
// Flush immediately after write
responseStream.WriteOptions = WriteOptions.Default;
// Default: each write is flushed immediately
// For high-frequency small messages: use BufferHint, flush every N messages
for (int i = 0; i < alerts.Count; i++)
{
responseStream.WriteOptions = i < alerts.Count - 1
? new WriteOptions(WriteFlags.BufferHint) // buffer intermediate
: WriteOptions.Default; // flush on last
await responseStream.WriteAsync(alerts[i], ct);
}Production issue I've seen: A gRPC server stream for ward alerts wrote each alert as it was generated — 200 individual writes per minute under peak load. Each write triggered a flush over TLS, adding latency. Switching to
BufferHintwith a batch flush every 10 messages reduced TLS overhead and improved throughput by 3x.
Key Takeaway
Server streaming sends multiple responses for a single request using
IServerStreamWriter<T>. TheCancellationTokenfromServerCallContextfires when the client disconnects or the deadline is reached — always check it in the streaming loop. UseReadAllAsync()on the client side for clean async enumeration. Stream large datasets in chunks to avoid memory pressure. Set appropriate deadlines — open-ended streams without deadlines accumulate and exhaust resources.
gRPC Knowledge Check
5 questions · Test what you just learned · Instant explanations
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.