Back to blog
Distributed Systemsadvanced

Kafka Streams: Real-Time Stream Processing in Java

Process event streams in real time with Kafka Streams — stateless transformations, stateful aggregations, windowed operations, join streams, and build a live call volume counter and appointment funnel.

LearnixoApril 16, 20265 min read
Kafka StreamsKafkaStream ProcessingJavaReal-TimeDistributed Systems
Share:𝕏

What Is Kafka Streams?

Kafka Streams is a Java library for stream processing that runs inside your application — no separate cluster needed. It reads from Kafka topics, processes the data, and writes results back to Kafka (or a state store).

Input topics → Kafka Streams topology → Output topics / State stores

Advantages over Spark/Flink:

  • No separate processing cluster — runs as a library
  • Exactly-once processing semantics
  • Elastic scaling — just run more instances
  • Native Kafka integration — no connectors needed

Core Concepts

Topology

A topology is a DAG of processing nodes:

Source → Filter → Map → GroupBy → Aggregate → Sink

KStream vs KTable

  • KStream — a stream of events (every record is an independent event)
  • KTable — a changelog stream (each key has one current value; new records update the value)
  • GlobalKTable — like KTable but fully replicated to every instance (for small lookup tables)
KStream("appointments"):
  CLN-001: {status: "created"}
  CLN-001: {status: "confirmed"}    ← independent events
  CLN-001: {status: "completed"}

KTable("clinic-state"):
  CLN-001: {name: "Sunrise Eye", active: true}   ← only latest value per key
  CLN-002: {name: "Valley Optometry", active: true}

Setup

XML
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
</dependency>
JAVA
@Configuration
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration streamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(APPLICATION_ID_CONFIG,   "portal-streams");
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG,
                  Serdes.String().getClass().getName());
        props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG,
                  Serdes.String().getClass().getName());
        props.put(PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2);  // exactly-once
        props.put(REPLICATION_FACTOR_CONFIG, 3);
        return new KafkaStreamsConfiguration(props);
    }
}

Stateless Transformations

JAVA
@Component
public class AppointmentStreamProcessor {

    private final Serde<AppointmentEvent> appointmentSerde;

    @Autowired
    public void buildTopology(StreamsBuilder builder) {
        // Source stream
        KStream<String, AppointmentEvent> appointments = builder.stream(
            "appointments.appointments.created",
            Consumed.with(Serdes.String(), appointmentSerde)
        );

        // Filter — only routine exams
        KStream<String, AppointmentEvent> routineExams = appointments
            .filter((key, value) -> "routine".equals(value.type()));

        // Map — transform to enriched event
        KStream<String, AppointmentEnriched> enriched = appointments
            .mapValues(event -> new AppointmentEnriched(
                event.appointmentId(),
                event.clinicId(),
                event.patientId(),
                event.scheduledFor(),
                calculateDayOfWeek(event.scheduledFor()),
                isBusinessHours(event.scheduledFor())
            ));

        // FlatMap — one event → many events (e.g., fan-out notifications)
        KStream<String, Notification> notifications = appointments
            .flatMapValues(event -> List.of(
                new Notification("EMAIL", event.patientId(), event.appointmentId()),
                new Notification("SMS",   event.patientId(), event.appointmentId())
            ));

        // Branch — split one stream into multiple
        Map<String, KStream<String, AppointmentEvent>> branches = appointments.split()
            .branch((k, v) -> "emergency".equals(v.type()),    Named.as("emergency"))
            .branch((k, v) -> "routine".equals(v.type()),      Named.as("routine"))
            .defaultBranch(Named.as("other"));

        branches.get("emergency").to("appointments.emergency",
            Produced.with(Serdes.String(), appointmentSerde));

        // Sink
        enriched.to("appointments.enriched");
    }
}

Stateful Aggregation — Live Call Volume Counter

Count calls per clinic in a tumbling 5-minute window:

JAVA
@Autowired
public void buildCallCountTopology(StreamsBuilder builder) {
    KStream<String, CallEvent> calls = builder.stream(
        "calls.calls.started",
        Consumed.with(Serdes.String(), callEventSerde)
    );

    // Re-key by clinic_id for grouping
    KStream<String, CallEvent> byClinic = calls
        .selectKey((k, v) -> v.clinicId());

    // Tumbling window — count per clinic per 5 minutes
    KTable<Windowed<String>, Long> callCounts = byClinic
        .groupByKey(Grouped.with(Serdes.String(), callEventSerde))
        .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
        .count(Materialized.as("call-counts-store"));

    // Write window counts to output topic
    callCounts.toStream()
        .map((windowedKey, count) -> KeyValue.pair(
            windowedKey.key(),
            new CallVolumeMetric(
                windowedKey.key(),
                count,
                windowedKey.window().startTime(),
                windowedKey.window().endTime()
            )
        ))
        .to("calls.metrics.volume-5min");
}

Query the State Store (for the dashboard)

JAVA
@Service
public class CallMetricsService {

    @Autowired
    private KafkaStreams kafkaStreams;

    public Map<String, Long> getLiveCallCounts() {
        ReadOnlyWindowStore<String, Long> store = kafkaStreams.store(
            StoreQueryParameters.fromNameAndType(
                "call-counts-store",
                QueryableStoreTypes.windowStore()
            )
        );

        Instant now   = Instant.now();
        Instant start = now.minus(Duration.ofMinutes(5));

        Map<String, Long> counts = new HashMap<>();
        try (KeyValueIterator<Windowed<String>, Long> it =
             store.fetchAll(start, now)) {
            while (it.hasNext()) {
                KeyValue<Windowed<String>, Long> kv = it.next();
                counts.put(kv.key.key(), kv.value);
            }
        }

        return counts;
    }
}

Joining Streams

Join appointments with patient data to enrich events:

JAVA
@Autowired
public void buildEnrichmentTopology(StreamsBuilder builder) {
    // Stream of new appointments
    KStream<String, AppointmentCreated> appointments = builder.stream(
        "appointments.appointments.created",
        Consumed.with(Serdes.String(), appointmentCreatedSerde)
    ).selectKey((k, v) -> v.patientId());  // re-key by patient ID

    // Table of patient records (compact topic — latest per patient)
    KTable<String, Patient> patients = builder.table(
        "patients.patients.state",
        Consumed.with(Serdes.String(), patientSerde)
    );

    // Stream-Table join — enrich appointment with patient data
    KStream<String, AppointmentEnriched> enriched = appointments.join(
        patients,
        (appointment, patient) -> new AppointmentEnriched(
            appointment.appointmentId(),
            appointment.clinicId(),
            patient.id(),
            patient.firstName() + " " + patient.lastName(),
            patient.email(),
            appointment.scheduledFor()
        )
    );

    enriched.to("appointments.appointments.enriched");
}

Stream-Stream Join (events within a time window)

JAVA
// Join call.started with call.ended to compute duration
KStream<String, CallStarted> starts = builder.stream("calls.calls.started");
KStream<String, CallEnded>   ends   = builder.stream("calls.calls.ended");

// Re-key both by call ID
KStream<String, CallStarted> startsByCall = starts.selectKey((k, v) -> v.callId());
KStream<String, CallEnded>   endsByCall   = ends.selectKey((k, v) -> v.callId());

// Join within 60 minutes (call must end within 60min of starting)
KStream<String, CompletedCall> completedCalls = startsByCall.join(
    endsByCall,
    (start, end) -> new CompletedCall(
        start.callId(),
        start.clinicId(),
        start.agentId(),
        Duration.between(start.startTime(), end.endTime()).toSeconds()
    ),
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1))
);

completedCalls.to("calls.calls.completed");

Appointment Funnel — Aggregating Status Transitions

Track the conversion funnel: created → confirmed → completed:

JAVA
@Autowired
public void buildFunnelTopology(StreamsBuilder builder) {
    // Merge multiple event streams
    KStream<String, AppointmentEvent> allEvents = builder
        .<String, AppointmentCreated>stream("appointments.appointments.created")
        .mapValues(e -> (AppointmentEvent) e)
        .merge(
            builder.<String, AppointmentEvent>stream("appointments.appointments.confirmed")
        )
        .merge(
            builder.<String, AppointmentEvent>stream("appointments.appointments.completed")
        )
        .merge(
            builder.<String, AppointmentEvent>stream("appointments.appointments.cancelled")
        );

    // Count per clinic per event type in daily window
    allEvents
        .selectKey((k, v) -> v.clinicId() + "#" + v.getClass().getSimpleName())
        .groupByKey()
        .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(1)))
        .count(Materialized.as("daily-funnel-store"))
        .toStream()
        .to("appointments.metrics.daily-funnel");
}

Error Handling

JAVA
// Dead-letter handler for deserialization errors
props.put(
    DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
    LogAndContinueExceptionHandler.class  // log error, skip bad message
);

// Or custom handler:
public class DltExceptionHandler implements DeserializationExceptionHandler {
    @Override
    public DeserializationHandlerResponse handle(
        ProcessorContext ctx, ConsumerRecord<byte[], byte[]> record, Exception ex) {
        // Send to DLT
        producer.send("appointments.appointments.created.DLT", record.key(), record.value());
        return CONTINUE;  // don't stop processing
    }
}

Enjoyed this article?

Explore the Distributed Systems learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.