Learnixo
Back to blog
AI Systemsintermediate

Client Streaming RPC — Uploading Data Flows with gRPC

Implement client-streaming gRPC in ASP.NET Core: receiving streams from clients, reading observations in order, processing with back-pressure, and when client streaming fits your data ingestion patterns.

Asma Hafeez KhanMay 16, 20265 min read
gRPCClient StreamingASP.NET Core.NETData Ingestion
Share:𝕏

Client Streaming Pattern

The client sends a stream of messages; the server reads them and returns a single response when the stream is complete.

Client:
  Opens the stream
  Sends ObservationRequest (multiple times)
  Closes the stream (signals completion)

Server:
  Reads each request as it arrives
  Processes each message
  Returns one UploadResponse at the end

Use when:
  ✓ Batch uploads (observations, lab results, audit events)
  ✓ IoT data ingestion (medical device telemetry)
  ✓ Log streaming from a device or service
  ✓ File upload chunked into pieces

Proto Definition

PROTOBUF
service ObservationService {
    rpc UploadObservations (stream ObservationRequest)
        returns (UploadObservationsResponse);

    rpc IngestDeviceTelemetry (stream DeviceTelemetryRequest)
        returns (IngestTelemetryResponse);
}

message ObservationRequest {
    string  patient_id     = 1;
    string  observation_type = 2;  // "heart_rate", "spo2", "inr"
    double  value          = 3;
    string  unit           = 4;
    int64   recorded_at    = 5;   // Unix timestamp ms
    string  device_id      = 6;
}

message UploadObservationsResponse {
    int32  total_received  = 1;
    int32  total_saved     = 2;
    int32  total_rejected  = 3;
    repeated string errors = 4;   // rejected observation reasons
}

Server Implementation

C#
public sealed class ObservationGrpcService
    : ObservationService.ObservationServiceBase
{
    private readonly IObservationRepository _repo;
    private readonly IUnitOfWork            _uow;

    public override async Task<UploadObservationsResponse> UploadObservations(
        IAsyncStreamReader<ObservationRequest> requestStream,
        ServerCallContext context)
    {
        var ct          = context.CancellationToken;
        var received    = 0;
        var saved       = 0;
        var rejected    = 0;
        var errors      = new List<string>();
        var batch       = new List<Observation>();

        // Read all incoming messages
        await foreach (var request in requestStream.ReadAllAsync(ct))
        {
            received++;

            if (!Guid.TryParse(request.PatientId, out var patientId))
            {
                rejected++;
                errors.Add($"Message {received}: invalid patient ID '{request.PatientId}'");
                continue;
            }

            var obs = Observation.Create(
                patientId,
                request.ObservationType,
                request.Value,
                request.Unit,
                DateTimeOffset.FromUnixTimeMilliseconds(request.RecordedAt));

            if (obs.IsFailure)
            {
                rejected++;
                errors.Add($"Message {received}: {obs.Error.Description}");
                continue;
            }

            batch.Add(obs.Value);

            // Save in batches of 100 to avoid memory pressure
            if (batch.Count >= 100)
            {
                await _repo.AddRangeAsync(batch, ct);
                await _uow.SaveChangesAsync(ct);
                saved += batch.Count;
                batch.Clear();
            }
        }

        // Save remaining batch
        if (batch.Count > 0)
        {
            await _repo.AddRangeAsync(batch, ct);
            await _uow.SaveChangesAsync(ct);
            saved += batch.Count;
        }

        return new UploadObservationsResponse
        {
            TotalReceived = received,
            TotalSaved    = saved,
            TotalRejected = rejected,
            Errors        = { errors },
        };
    }
}

Client Implementation

C#
// Send a stream of observations from a medical device
using var channel = GrpcChannel.ForAddress("https://observations.systemforge.internal");
var client = new ObservationService.ObservationServiceClient(channel);

using var call = client.UploadObservations(
    deadline: DateTime.UtcNow.AddMinutes(5));  // 5-minute upload window

// Send observations one at a time
var observations = await device.ReadPendingObservationsAsync();
foreach (var obs in observations)
{
    await call.RequestStream.WriteAsync(new ObservationRequest
    {
        PatientId       = obs.PatientId.ToString(),
        ObservationType = obs.Type,
        Value           = obs.Value,
        Unit            = obs.Unit,
        RecordedAt      = obs.RecordedAt.ToUnixTimeMilliseconds(),
        DeviceId        = device.Id,
    });
}

// Signal end of stream
await call.RequestStream.CompleteAsync();

// Wait for server response
var response = await call;
Console.WriteLine(
    $"Uploaded: {response.TotalSaved} saved, {response.TotalRejected} rejected");

Chunked File Upload

PROTOBUF
message FileChunkRequest {
    string file_name  = 1;
    int32  chunk_index = 2;
    int32  total_chunks = 3;
    bytes  data       = 4;   // chunk bytes
    string checksum   = 5;   // MD5 of this chunk
}

message FileUploadResponse {
    string file_id    = 1;
    int64  total_bytes = 2;
    bool   verified   = 3;   // checksum verified
}
C#
// Upload a clinical document in chunks
using var call = client.UploadDocument();
var fileBytes  = await File.ReadAllBytesAsync("patient-report.pdf");
var chunkSize  = 64 * 1024;  // 64KB chunks
var totalChunks = (int)Math.Ceiling((double)fileBytes.Length / chunkSize);

for (int i = 0; i < totalChunks; i++)
{
    var chunk = fileBytes.AsSpan(i * chunkSize, Math.Min(chunkSize, fileBytes.Length - i * chunkSize));
    await call.RequestStream.WriteAsync(new FileChunkRequest
    {
        FileName    = "patient-report.pdf",
        ChunkIndex  = i,
        TotalChunks = totalChunks,
        Data        = ByteString.CopyFrom(chunk),
        Checksum    = ComputeMD5(chunk),
    });
}

await call.RequestStream.CompleteAsync();
var result = await call.ResponseAsync;

Back-Pressure Handling

C#
// Server can signal back-pressure via WriteOptions or delays
// If the server is slow to process, the client's Write will naturally slow down
// (gRPC flow control — the transport buffers fill, blocking the Write)

// For explicit back-pressure — track server metrics
// Client: check response metadata between writes
// Server: attach metadata to acknowledge processing capacity

// Simple approach: fixed batch size on client
const int batchPause = 500;  // ms pause between batches
for (int i = 0; i < observations.Count; i++)
{
    await call.RequestStream.WriteAsync(observations[i]);
    if (i % 100 == 99)
        await Task.Delay(batchPause);  // rate-limit the client
}

Error Handling During Stream

C#
// Client: handle errors during streaming
try
{
    foreach (var obs in observations)
        await call.RequestStream.WriteAsync(obs);

    await call.RequestStream.CompleteAsync();
    var result = await call.ResponseAsync;
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.ResourceExhausted)
{
    // Server is rate-limiting — wait and retry
    await Task.Delay(TimeSpan.FromSeconds(30));
    // Retry from last known position (requires tracking)
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.DeadlineExceeded)
{
    // Upload timed out — resume from checkpoint
    await ResumeUploadFromCheckpointAsync();
}

Production issue I've seen: A medical device sent 10,000 observations in a single client-streaming RPC with no batching on the server side. The server's UploadObservations method loaded all 10,000 into a list before saving. Under concurrent device uploads, the process used 8GB of RAM. Adding server-side batch saves every 100 items brought peak memory to under 50MB.


Key Takeaway

Client streaming sends multiple messages in one RPC, getting one response when done. The server reads via IAsyncStreamReader<T> with ReadAllAsync(). Process in batches server-side to avoid memory accumulation. The client calls CompleteAsync() to signal end of stream. Use for: bulk data upload, device telemetry ingestion, log batching. Always set a deadline — an open-ended client stream with a slow sender can hold server resources indefinitely.

gRPC Knowledge Check

5 questions · Test what you just learned · Instant explanations

Enjoyed this article?

Explore the AI Systems learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.