Client Streaming RPC — Uploading Data Flows with gRPC
Implement client-streaming gRPC in ASP.NET Core: receiving streams from clients, reading observations in order, processing with back-pressure, and when client streaming fits your data ingestion patterns.
Client Streaming Pattern
The client sends a stream of messages; the server reads them and returns a single response when the stream is complete.
Client:
Opens the stream
Sends ObservationRequest (multiple times)
Closes the stream (signals completion)
Server:
Reads each request as it arrives
Processes each message
Returns one UploadResponse at the end
Use when:
✓ Batch uploads (observations, lab results, audit events)
✓ IoT data ingestion (medical device telemetry)
✓ Log streaming from a device or service
✓ File upload chunked into piecesProto Definition
service ObservationService {
rpc UploadObservations (stream ObservationRequest)
returns (UploadObservationsResponse);
rpc IngestDeviceTelemetry (stream DeviceTelemetryRequest)
returns (IngestTelemetryResponse);
}
message ObservationRequest {
string patient_id = 1;
string observation_type = 2; // "heart_rate", "spo2", "inr"
double value = 3;
string unit = 4;
int64 recorded_at = 5; // Unix timestamp ms
string device_id = 6;
}
message UploadObservationsResponse {
int32 total_received = 1;
int32 total_saved = 2;
int32 total_rejected = 3;
repeated string errors = 4; // rejected observation reasons
}Server Implementation
public sealed class ObservationGrpcService
: ObservationService.ObservationServiceBase
{
private readonly IObservationRepository _repo;
private readonly IUnitOfWork _uow;
public override async Task<UploadObservationsResponse> UploadObservations(
IAsyncStreamReader<ObservationRequest> requestStream,
ServerCallContext context)
{
var ct = context.CancellationToken;
var received = 0;
var saved = 0;
var rejected = 0;
var errors = new List<string>();
var batch = new List<Observation>();
// Read all incoming messages
await foreach (var request in requestStream.ReadAllAsync(ct))
{
received++;
if (!Guid.TryParse(request.PatientId, out var patientId))
{
rejected++;
errors.Add($"Message {received}: invalid patient ID '{request.PatientId}'");
continue;
}
var obs = Observation.Create(
patientId,
request.ObservationType,
request.Value,
request.Unit,
DateTimeOffset.FromUnixTimeMilliseconds(request.RecordedAt));
if (obs.IsFailure)
{
rejected++;
errors.Add($"Message {received}: {obs.Error.Description}");
continue;
}
batch.Add(obs.Value);
// Save in batches of 100 to avoid memory pressure
if (batch.Count >= 100)
{
await _repo.AddRangeAsync(batch, ct);
await _uow.SaveChangesAsync(ct);
saved += batch.Count;
batch.Clear();
}
}
// Save remaining batch
if (batch.Count > 0)
{
await _repo.AddRangeAsync(batch, ct);
await _uow.SaveChangesAsync(ct);
saved += batch.Count;
}
return new UploadObservationsResponse
{
TotalReceived = received,
TotalSaved = saved,
TotalRejected = rejected,
Errors = { errors },
};
}
}Client Implementation
// Send a stream of observations from a medical device
using var channel = GrpcChannel.ForAddress("https://observations.systemforge.internal");
var client = new ObservationService.ObservationServiceClient(channel);
using var call = client.UploadObservations(
deadline: DateTime.UtcNow.AddMinutes(5)); // 5-minute upload window
// Send observations one at a time
var observations = await device.ReadPendingObservationsAsync();
foreach (var obs in observations)
{
await call.RequestStream.WriteAsync(new ObservationRequest
{
PatientId = obs.PatientId.ToString(),
ObservationType = obs.Type,
Value = obs.Value,
Unit = obs.Unit,
RecordedAt = obs.RecordedAt.ToUnixTimeMilliseconds(),
DeviceId = device.Id,
});
}
// Signal end of stream
await call.RequestStream.CompleteAsync();
// Wait for server response
var response = await call;
Console.WriteLine(
$"Uploaded: {response.TotalSaved} saved, {response.TotalRejected} rejected");Chunked File Upload
message FileChunkRequest {
string file_name = 1;
int32 chunk_index = 2;
int32 total_chunks = 3;
bytes data = 4; // chunk bytes
string checksum = 5; // MD5 of this chunk
}
message FileUploadResponse {
string file_id = 1;
int64 total_bytes = 2;
bool verified = 3; // checksum verified
}// Upload a clinical document in chunks
using var call = client.UploadDocument();
var fileBytes = await File.ReadAllBytesAsync("patient-report.pdf");
var chunkSize = 64 * 1024; // 64KB chunks
var totalChunks = (int)Math.Ceiling((double)fileBytes.Length / chunkSize);
for (int i = 0; i < totalChunks; i++)
{
var chunk = fileBytes.AsSpan(i * chunkSize, Math.Min(chunkSize, fileBytes.Length - i * chunkSize));
await call.RequestStream.WriteAsync(new FileChunkRequest
{
FileName = "patient-report.pdf",
ChunkIndex = i,
TotalChunks = totalChunks,
Data = ByteString.CopyFrom(chunk),
Checksum = ComputeMD5(chunk),
});
}
await call.RequestStream.CompleteAsync();
var result = await call.ResponseAsync;Back-Pressure Handling
// Server can signal back-pressure via WriteOptions or delays
// If the server is slow to process, the client's Write will naturally slow down
// (gRPC flow control — the transport buffers fill, blocking the Write)
// For explicit back-pressure — track server metrics
// Client: check response metadata between writes
// Server: attach metadata to acknowledge processing capacity
// Simple approach: fixed batch size on client
const int batchPause = 500; // ms pause between batches
for (int i = 0; i < observations.Count; i++)
{
await call.RequestStream.WriteAsync(observations[i]);
if (i % 100 == 99)
await Task.Delay(batchPause); // rate-limit the client
}Error Handling During Stream
// Client: handle errors during streaming
try
{
foreach (var obs in observations)
await call.RequestStream.WriteAsync(obs);
await call.RequestStream.CompleteAsync();
var result = await call.ResponseAsync;
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.ResourceExhausted)
{
// Server is rate-limiting — wait and retry
await Task.Delay(TimeSpan.FromSeconds(30));
// Retry from last known position (requires tracking)
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.DeadlineExceeded)
{
// Upload timed out — resume from checkpoint
await ResumeUploadFromCheckpointAsync();
}Production issue I've seen: A medical device sent 10,000 observations in a single client-streaming RPC with no batching on the server side. The server's
UploadObservationsmethod loaded all 10,000 into a list before saving. Under concurrent device uploads, the process used 8GB of RAM. Adding server-side batch saves every 100 items brought peak memory to under 50MB.
Key Takeaway
Client streaming sends multiple messages in one RPC, getting one response when done. The server reads via
IAsyncStreamReader<T>withReadAllAsync(). Process in batches server-side to avoid memory accumulation. The client callsCompleteAsync()to signal end of stream. Use for: bulk data upload, device telemetry ingestion, log batching. Always set a deadline — an open-ended client stream with a slow sender can hold server resources indefinitely.
gRPC Knowledge Check
5 questions · Test what you just learned · Instant explanations
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.