Kafka Streams: Real-Time Stream Processing in Java
Process event streams in real time with Kafka Streams — stateless transformations, stateful aggregations, windowed operations, join streams, and build a live call volume counter and appointment funnel.
What Is Kafka Streams?
Kafka Streams is a Java library for stream processing that runs inside your application — no separate cluster needed. It reads from Kafka topics, processes the data, and writes results back to Kafka (or a state store).
Input topics → Kafka Streams topology → Output topics / State storesAdvantages over Spark/Flink:
- No separate processing cluster — runs as a library
- Exactly-once processing semantics
- Elastic scaling — just run more instances
- Native Kafka integration — no connectors needed
Core Concepts
Topology
A topology is a DAG of processing nodes:
Source → Filter → Map → GroupBy → Aggregate → SinkKStream vs KTable
- KStream — a stream of events (every record is an independent event)
- KTable — a changelog stream (each key has one current value; new records update the value)
- GlobalKTable — like KTable but fully replicated to every instance (for small lookup tables)
KStream("appointments"):
CLN-001: {status: "created"}
CLN-001: {status: "confirmed"} ← independent events
CLN-001: {status: "completed"}
KTable("clinic-state"):
CLN-001: {name: "Sunrise Eye", active: true} ← only latest value per key
CLN-002: {name: "Valley Optometry", active: true}Setup
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>@Configuration
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration streamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(APPLICATION_ID_CONFIG, "portal-streams");
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2); // exactly-once
props.put(REPLICATION_FACTOR_CONFIG, 3);
return new KafkaStreamsConfiguration(props);
}
}Stateless Transformations
@Component
public class AppointmentStreamProcessor {
private final Serde<AppointmentEvent> appointmentSerde;
@Autowired
public void buildTopology(StreamsBuilder builder) {
// Source stream
KStream<String, AppointmentEvent> appointments = builder.stream(
"appointments.appointments.created",
Consumed.with(Serdes.String(), appointmentSerde)
);
// Filter — only routine exams
KStream<String, AppointmentEvent> routineExams = appointments
.filter((key, value) -> "routine".equals(value.type()));
// Map — transform to enriched event
KStream<String, AppointmentEnriched> enriched = appointments
.mapValues(event -> new AppointmentEnriched(
event.appointmentId(),
event.clinicId(),
event.patientId(),
event.scheduledFor(),
calculateDayOfWeek(event.scheduledFor()),
isBusinessHours(event.scheduledFor())
));
// FlatMap — one event → many events (e.g., fan-out notifications)
KStream<String, Notification> notifications = appointments
.flatMapValues(event -> List.of(
new Notification("EMAIL", event.patientId(), event.appointmentId()),
new Notification("SMS", event.patientId(), event.appointmentId())
));
// Branch — split one stream into multiple
Map<String, KStream<String, AppointmentEvent>> branches = appointments.split()
.branch((k, v) -> "emergency".equals(v.type()), Named.as("emergency"))
.branch((k, v) -> "routine".equals(v.type()), Named.as("routine"))
.defaultBranch(Named.as("other"));
branches.get("emergency").to("appointments.emergency",
Produced.with(Serdes.String(), appointmentSerde));
// Sink
enriched.to("appointments.enriched");
}
}Stateful Aggregation — Live Call Volume Counter
Count calls per clinic in a tumbling 5-minute window:
@Autowired
public void buildCallCountTopology(StreamsBuilder builder) {
KStream<String, CallEvent> calls = builder.stream(
"calls.calls.started",
Consumed.with(Serdes.String(), callEventSerde)
);
// Re-key by clinic_id for grouping
KStream<String, CallEvent> byClinic = calls
.selectKey((k, v) -> v.clinicId());
// Tumbling window — count per clinic per 5 minutes
KTable<Windowed<String>, Long> callCounts = byClinic
.groupByKey(Grouped.with(Serdes.String(), callEventSerde))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("call-counts-store"));
// Write window counts to output topic
callCounts.toStream()
.map((windowedKey, count) -> KeyValue.pair(
windowedKey.key(),
new CallVolumeMetric(
windowedKey.key(),
count,
windowedKey.window().startTime(),
windowedKey.window().endTime()
)
))
.to("calls.metrics.volume-5min");
}Query the State Store (for the dashboard)
@Service
public class CallMetricsService {
@Autowired
private KafkaStreams kafkaStreams;
public Map<String, Long> getLiveCallCounts() {
ReadOnlyWindowStore<String, Long> store = kafkaStreams.store(
StoreQueryParameters.fromNameAndType(
"call-counts-store",
QueryableStoreTypes.windowStore()
)
);
Instant now = Instant.now();
Instant start = now.minus(Duration.ofMinutes(5));
Map<String, Long> counts = new HashMap<>();
try (KeyValueIterator<Windowed<String>, Long> it =
store.fetchAll(start, now)) {
while (it.hasNext()) {
KeyValue<Windowed<String>, Long> kv = it.next();
counts.put(kv.key.key(), kv.value);
}
}
return counts;
}
}Joining Streams
Join appointments with patient data to enrich events:
@Autowired
public void buildEnrichmentTopology(StreamsBuilder builder) {
// Stream of new appointments
KStream<String, AppointmentCreated> appointments = builder.stream(
"appointments.appointments.created",
Consumed.with(Serdes.String(), appointmentCreatedSerde)
).selectKey((k, v) -> v.patientId()); // re-key by patient ID
// Table of patient records (compact topic — latest per patient)
KTable<String, Patient> patients = builder.table(
"patients.patients.state",
Consumed.with(Serdes.String(), patientSerde)
);
// Stream-Table join — enrich appointment with patient data
KStream<String, AppointmentEnriched> enriched = appointments.join(
patients,
(appointment, patient) -> new AppointmentEnriched(
appointment.appointmentId(),
appointment.clinicId(),
patient.id(),
patient.firstName() + " " + patient.lastName(),
patient.email(),
appointment.scheduledFor()
)
);
enriched.to("appointments.appointments.enriched");
}Stream-Stream Join (events within a time window)
// Join call.started with call.ended to compute duration
KStream<String, CallStarted> starts = builder.stream("calls.calls.started");
KStream<String, CallEnded> ends = builder.stream("calls.calls.ended");
// Re-key both by call ID
KStream<String, CallStarted> startsByCall = starts.selectKey((k, v) -> v.callId());
KStream<String, CallEnded> endsByCall = ends.selectKey((k, v) -> v.callId());
// Join within 60 minutes (call must end within 60min of starting)
KStream<String, CompletedCall> completedCalls = startsByCall.join(
endsByCall,
(start, end) -> new CompletedCall(
start.callId(),
start.clinicId(),
start.agentId(),
Duration.between(start.startTime(), end.endTime()).toSeconds()
),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1))
);
completedCalls.to("calls.calls.completed");Appointment Funnel — Aggregating Status Transitions
Track the conversion funnel: created → confirmed → completed:
@Autowired
public void buildFunnelTopology(StreamsBuilder builder) {
// Merge multiple event streams
KStream<String, AppointmentEvent> allEvents = builder
.<String, AppointmentCreated>stream("appointments.appointments.created")
.mapValues(e -> (AppointmentEvent) e)
.merge(
builder.<String, AppointmentEvent>stream("appointments.appointments.confirmed")
)
.merge(
builder.<String, AppointmentEvent>stream("appointments.appointments.completed")
)
.merge(
builder.<String, AppointmentEvent>stream("appointments.appointments.cancelled")
);
// Count per clinic per event type in daily window
allEvents
.selectKey((k, v) -> v.clinicId() + "#" + v.getClass().getSimpleName())
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(1)))
.count(Materialized.as("daily-funnel-store"))
.toStream()
.to("appointments.metrics.daily-funnel");
}Error Handling
// Dead-letter handler for deserialization errors
props.put(
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class // log error, skip bad message
);
// Or custom handler:
public class DltExceptionHandler implements DeserializationExceptionHandler {
@Override
public DeserializationHandlerResponse handle(
ProcessorContext ctx, ConsumerRecord<byte[], byte[]> record, Exception ex) {
// Send to DLT
producer.send("appointments.appointments.created.DLT", record.key(), record.value());
return CONTINUE; // don't stop processing
}
}Enjoyed this article?
Explore the Distributed Systems learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.