Learnixo
Back to blog
AI Systemsintermediate

Server Streaming RPC — Pushing Data Flows with gRPC

Implement server-streaming gRPC in ASP.NET Core: streaming multiple responses for one request, handling cancellation, real-time data feeds, and when server streaming beats repeated polling.

Asma Hafeez KhanMay 16, 20264 min read
gRPCServer StreamingASP.NET Core.NETReal-Time
Share:𝕏

Server Streaming Pattern

One request from the client, multiple responses streamed back over time. The server writes responses using IServerStreamWriter<T>.

Client:
  Send StreamVitalsRequest once

Server:
  Sends VitalSignResponse every 30 seconds
  Continues until: client cancels, deadline exceeded,
                   or stream explicitly completed

Use when:
  ✓ Real-time data feeds (vitals, lab results, alerts)
  ✓ Large result sets (paginated download without multiple calls)
  ✓ Progress reporting for long-running operations

Proto Definition

PROTOBUF
service ClinicalMonitorService {
    rpc StreamPatientVitals (StreamVitalsRequest)
        returns (stream VitalSignResponse);

    rpc StreamWardAlerts (StreamAlertsRequest)
        returns (stream AlertResponse);
}

message StreamVitalsRequest {
    string patient_id         = 1;
    int32  interval_seconds   = 2;  // polling interval, e.g., 30
}

message VitalSignResponse {
    string patient_id   = 1;
    double heart_rate   = 2;
    double spo2         = 3;
    double temperature  = 4;
    double systolic_bp  = 5;
    double diastolic_bp = 6;
    int64  recorded_at  = 7;  // Unix timestamp ms
}

Server Implementation

C#
public sealed class ClinicalMonitorGrpcService
    : ClinicalMonitorService.ClinicalMonitorServiceBase
{
    private readonly IVitalsRepository _vitals;
    private readonly ILogger<ClinicalMonitorGrpcService> _logger;

    public override async Task StreamPatientVitals(
        StreamVitalsRequest request,
        IServerStreamWriter<VitalSignResponse> responseStream,
        ServerCallContext context)
    {
        if (!Guid.TryParse(request.PatientId, out var patientId))
            throw new RpcException(new Status(StatusCode.InvalidArgument,
                "PatientId must be a valid UUID."));

        var intervalMs = Math.Max(5, request.IntervalSeconds) * 1000;
        var ct         = context.CancellationToken;

        _logger.LogInformation("Starting vitals stream for patient {Id}", patientId);

        try
        {
            while (!ct.IsCancellationRequested)
            {
                var vital = await _vitals.GetLatestAsync(patientId, ct);

                if (vital is not null)
                {
                    await responseStream.WriteAsync(new VitalSignResponse
                    {
                        PatientId   = patientId.ToString(),
                        HeartRate   = vital.HeartRate,
                        Spo2        = vital.SpO2,
                        Temperature = vital.Temperature,
                        SystolicBp  = vital.SystolicBP,
                        DiastolicBp = vital.DiastolicBP,
                        RecordedAt  = vital.RecordedAt.ToUnixTimeMilliseconds(),
                    }, ct);
                }

                await Task.Delay(intervalMs, ct);
            }
        }
        catch (OperationCanceledException) when (ct.IsCancellationRequested)
        {
            // Normal: client cancelled or deadline exceeded — do not log as error
            _logger.LogDebug("Vitals stream cancelled for patient {Id}", patientId);
        }
    }
}

Client Consumption

C#
// .NET gRPC client
using var channel = GrpcChannel.ForAddress("https://monitor.systemforge.internal");
var client = new ClinicalMonitorService.ClinicalMonitorServiceClient(channel);

var request = new StreamVitalsRequest
{
    PatientId       = patientId.ToString(),
    IntervalSeconds = 30,
};

using var cts    = new CancellationTokenSource();
using var stream = client.StreamPatientVitals(request,
    deadline: DateTime.UtcNow.AddHours(8),  // 8-hour shift monitoring
    cancellationToken: cts.Token);

try
{
    await foreach (var vital in stream.ResponseStream.ReadAllAsync(cts.Token))
    {
        Console.WriteLine(
            $"[{DateTimeOffset.FromUnixTimeMilliseconds(vital.RecordedAt)}] " +
            $"HR={vital.HeartRate}, SpO2={vital.Spo2}%, Temp={vital.Temperature}°C");
        UpdateMonitorDisplay(vital);
    }
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
    Console.WriteLine("Stream cancelled.");
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.DeadlineExceeded)
{
    Console.WriteLine("Stream deadline exceeded — reconnecting.");
    // Reconnect logic here
}

Large Result Set Streaming

C#
// Stream all active patients instead of paginating
public override async Task GetAllActivePatients(
    GetAllActivePatientsRequest request,
    IServerStreamWriter<PatientResponse> responseStream,
    ServerCallContext context)
{
    var ct = context.CancellationToken;

    // Process in chunks to avoid loading all records into memory
    var batchSize = 100;
    var offset    = 0;

    while (!ct.IsCancellationRequested)
    {
        var batch = await _repo.GetActiveBatchAsync(offset, batchSize, ct);

        foreach (var patient in batch)
            await responseStream.WriteAsync(patient.ToGrpcResponse(), ct);

        if (batch.Count < batchSize) break;  // last batch
        offset += batch.Count;
    }
}

// Client:
await foreach (var patient in client.GetAllActivePatients(request).ResponseStream.ReadAllAsync())
    ProcessPatient(patient);

Progress Reporting for Long Operations

C#
// Stream progress updates for a long-running batch operation
public override async Task RunNightlyReview(
    NightlyReviewRequest request,
    IServerStreamWriter<NightlyReviewProgress> responseStream,
    ServerCallContext context)
{
    var ct = context.CancellationToken;
    var patients = await _repo.GetAllActiveAsync(ct);
    var total    = patients.Count;

    for (int i = 0; i < total && !ct.IsCancellationRequested; i++)
    {
        await ProcessPatientReviewAsync(patients[i], ct);

        // Send progress update every 10 patients
        if (i % 10 == 0 || i == total - 1)
        {
            await responseStream.WriteAsync(new NightlyReviewProgress
            {
                Processed = i + 1,
                Total     = total,
                Percentage = (int)((i + 1.0 / total) * 100),
            }, ct);
        }
    }
}

WriteOptions and Flow Control

C#
// Control write behavior
responseStream.WriteOptions = new WriteOptions(WriteFlags.BufferHint);
// BufferHint: buffer this write, send with next write (batching)

// Flush immediately after write
responseStream.WriteOptions = WriteOptions.Default;
// Default: each write is flushed immediately

// For high-frequency small messages: use BufferHint, flush every N messages
for (int i = 0; i < alerts.Count; i++)
{
    responseStream.WriteOptions = i < alerts.Count - 1
        ? new WriteOptions(WriteFlags.BufferHint)  // buffer intermediate
        : WriteOptions.Default;                     // flush on last
    await responseStream.WriteAsync(alerts[i], ct);
}

Production issue I've seen: A gRPC server stream for ward alerts wrote each alert as it was generated — 200 individual writes per minute under peak load. Each write triggered a flush over TLS, adding latency. Switching to BufferHint with a batch flush every 10 messages reduced TLS overhead and improved throughput by 3x.


Key Takeaway

Server streaming sends multiple responses for a single request using IServerStreamWriter<T>. The CancellationToken from ServerCallContext fires when the client disconnects or the deadline is reached — always check it in the streaming loop. Use ReadAllAsync() on the client side for clean async enumeration. Stream large datasets in chunks to avoid memory pressure. Set appropriate deadlines — open-ended streams without deadlines accumulate and exhaust resources.

gRPC Knowledge Check

5 questions · Test what you just learned · Instant explanations

Enjoyed this article?

Explore the AI Systems learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.