Kafka & Event-Driven Architecture · Lesson 2 of 7

Producers & Consumers in Java and Python

Java — Spring Kafka

Dependency

XML
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Configuration

YAML
# application.yml
spring:
  kafka:
    bootstrap-servers: ${KAFKA_BROKERS:localhost:9092}
    producer:
      key-serializer:   org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
      retries: 2147483647
      properties:
        enable.idempotence: true
        compression.type:   snappy
        linger.ms:          5
    consumer:
      key-deserializer:   org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      auto-offset-reset:  earliest
      enable-auto-commit: false
      properties:
        spring.json.trusted.packages: "com.clinic.*"
    listener:
      ack-mode: MANUAL_IMMEDIATE

Events (Domain Events as Records)

JAVA
// src/main/java/com/clinic/events/AppointmentEvents.java

public sealed interface AppointmentEvent permits
    AppointmentCreated, AppointmentCancelled, AppointmentCompleted {
    String appointmentId();
    String clinicId();
    Instant occurredAt();
}

public record AppointmentCreated(
    String appointmentId,
    String clinicId,
    String patientId,
    LocalDateTime scheduledFor,
    String type,
    Instant occurredAt
) implements AppointmentEvent {}

public record AppointmentCancelled(
    String appointmentId,
    String clinicId,
    String cancelledBy,
    String reason,
    Instant occurredAt
) implements AppointmentEvent {}

Producer

JAVA
@Service
@RequiredArgsConstructor
public class AppointmentEventPublisher {

    private static final Logger log = LoggerFactory.getLogger(AppointmentEventPublisher.class);

    private final KafkaTemplate<String, Object> kafkaTemplate;

    public void publishCreated(Appointment appointment) {
        var event = new AppointmentCreated(
            appointment.getId(),
            appointment.getClinic().getId(),
            appointment.getPatient().getId(),
            appointment.getDateTime(),
            appointment.getType().name().toLowerCase(),
            Instant.now()
        );
        send("appointments.appointments.created", appointment.getClinic().getId(), event);
    }

    public void publishCancelled(Appointment appointment, String reason) {
        var event = new AppointmentCancelled(
            appointment.getId(),
            appointment.getClinic().getId(),
            "system",
            reason,
            Instant.now()
        );
        send("appointments.appointments.cancelled", appointment.getClinic().getId(), event);
    }

    private void send(String topic, String key, Object payload) {
        kafkaTemplate.send(topic, key, payload)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("Failed to publish {} to topic {}: {}",
                        payload.getClass().getSimpleName(), topic, ex.getMessage());
                } else {
                    log.debug("Published to {}-{} @ offset {}",
                        result.getRecordMetadata().topic(),
                        result.getRecordMetadata().partition(),
                        result.getRecordMetadata().offset());
                }
            });
    }
}

Consumer with Manual Commit

JAVA
@Component
@RequiredArgsConstructor
public class AppointmentEventConsumer {

    private static final Logger log = LoggerFactory.getLogger(AppointmentEventConsumer.class);

    private final NotificationService notificationService;
    private final AnalyticsService    analyticsService;

    @KafkaListener(
        topics       = "appointments.appointments.created",
        groupId      = "notification-service",
        concurrency  = "3"    // 3 consumer threads — one per partition
    )
    public void onAppointmentCreated(
        @Payload AppointmentCreated event,
        Acknowledgment ack,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        @Header(KafkaHeaders.OFFSET) long offset
    ) {
        log.info("Processing AppointmentCreated {} from partition {} offset {}",
            event.appointmentId(), partition, offset);

        try {
            notificationService.sendAppointmentConfirmation(
                event.patientId(),
                event.scheduledFor()
            );
            ack.acknowledge();   // commit offset only after success
        } catch (TransientException e) {
            // Don't ack — message will be redelivered
            log.warn("Transient failure processing {}, will retry: {}", event.appointmentId(), e.getMessage());
            throw e;
        } catch (Exception e) {
            // Permanent failure — ack to avoid infinite retry, send to DLT
            log.error("Permanent failure processing {}: {}", event.appointmentId(), e.getMessage());
            ack.acknowledge();   // don't block the partition
            // DLT handling below
        }
    }
}

Dead Letter Topic Configuration

JAVA
@Configuration
public class KafkaConfig {

    @Bean
    public DefaultErrorHandler errorHandler(KafkaOperations<String, Object> template) {
        // Retry 3 times with exponential backoff, then send to DLT
        ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(3);
        backOff.setInitialInterval(1000L);
        backOff.setMultiplier(2.0);

        DeadLetterPublishingRecoverer recoverer =
            new DeadLetterPublishingRecoverer(template,
                (record, ex) -> new TopicPartition(
                    record.topic() + ".DLT",
                    record.partition()
                )
            );

        return new DefaultErrorHandler(recoverer, backOff);
    }
}

Batch Consumer

JAVA
@KafkaListener(
    topics      = "calls.transcripts.created",
    groupId     = "analytics-service",
    concurrency = "6"
)
public void onTranscriptsBatch(
    List<ConsumerRecord<String, TranscriptCreated>> records,
    Acknowledgment ack
) {
    log.info("Processing batch of {} transcripts", records.size());

    List<TranscriptCreated> events = records.stream()
        .map(ConsumerRecord::value)
        .toList();

    try {
        analyticsService.processBatch(events);
        ack.acknowledge();
    } catch (Exception e) {
        log.error("Batch processing failed: {}", e.getMessage());
        throw e;
    }
}

Python — confluent-kafka

Bash
pip install confluent-kafka

Producer

Python
# kafka/producer.py
import json
import logging
from datetime import datetime
from confluent_kafka import Producer, KafkaException

logger = logging.getLogger(__name__)


def create_producer(brokers: str) -> Producer:
    return Producer({
        "bootstrap.servers":    brokers,
        "acks":                 "all",
        "retries":              2_147_483_647,
        "enable.idempotence":   True,
        "compression.type":     "snappy",
        "linger.ms":            5,
        "batch.size":           65_536,
    })


class AppointmentEventPublisher:
    def __init__(self, producer: Producer):
        self._producer = producer

    def publish_created(self, appointment: dict) -> None:
        event = {
            "event_type":       "AppointmentCreated",
            "appointment_id":   appointment["id"],
            "clinic_id":        appointment["clinic_id"],
            "patient_id":       appointment["patient_id"],
            "scheduled_for":    appointment["datetime"],
            "type":             appointment["type"],
            "occurred_at":      datetime.utcnow().isoformat()
        }
        self._send(
            topic="appointments.appointments.created",
            key=appointment["clinic_id"],
            value=event
        )

    def _send(self, topic: str, key: str, value: dict) -> None:
        self._producer.produce(
            topic     = topic,
            key       = key.encode("utf-8"),
            value     = json.dumps(value).encode("utf-8"),
            on_delivery = self._on_delivery
        )
        self._producer.poll(0)   # trigger callbacks

    @staticmethod
    def _on_delivery(err, msg) -> None:
        if err:
            logger.error("Delivery failed for %s: %s", msg.topic(), err)
        else:
            logger.debug("Delivered to %s [%d] @ %d",
                         msg.topic(), msg.partition(), msg.offset())

    def flush(self) -> None:
        """Call before shutdown — ensures all buffered messages are sent."""
        self._producer.flush(timeout=30)

Consumer

Python
# kafka/consumer.py
import json
import logging
import signal
from confluent_kafka import Consumer, KafkaError, KafkaException

logger = logging.getLogger(__name__)

class AppointmentEventConsumer:
    def __init__(self, brokers: str, group_id: str, handler):
        self._consumer = Consumer({
            "bootstrap.servers":  brokers,
            "group.id":           group_id,
            "auto.offset.reset":  "earliest",
            "enable.auto.commit": False,        # manual commit
            "max.poll.interval.ms": 300_000,
            "session.timeout.ms":   45_000,
        })
        self._handler = handler
        self._running = True
        signal.signal(signal.SIGTERM, self._shutdown)
        signal.signal(signal.SIGINT,  self._shutdown)

    def run(self, topics: list[str]) -> None:
        self._consumer.subscribe(topics)
        logger.info("Subscribed to %s", topics)

        try:
            while self._running:
                msg = self._consumer.poll(timeout=1.0)

                if msg is None:
                    continue
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        continue
                    raise KafkaException(msg.error())

                self._process(msg)

        finally:
            self._consumer.close()

    def _process(self, msg) -> None:
        try:
            event = json.loads(msg.value().decode("utf-8"))
            logger.info("Processing %s from %s[%d]@%d",
                        event.get("event_type"), msg.topic(),
                        msg.partition(), msg.offset())

            self._handler(event)

            # Commit after successful processing
            self._consumer.commit(message=msg, asynchronous=False)

        except Exception as e:
            logger.exception("Failed to process message at %s[%d]@%d: %s",
                             msg.topic(), msg.partition(), msg.offset(), e)
            # Don't commit — message will be redelivered
            # For permanent errors, publish to DLT and commit

    def _shutdown(self, *_) -> None:
        logger.info("Shutdown signal received")
        self._running = False


# Usage
def handle_appointment_created(event: dict) -> None:
    appointment_id = event["appointment_id"]
    clinic_id      = event["clinic_id"]
    # process...

consumer = AppointmentEventConsumer(
    brokers  = "localhost:9092",
    group_id = "notification-service",
    handler  = handle_appointment_created
)
consumer.run(["appointments.appointments.created"])

Idempotent Consumers

Design consumers to be safe to re-run. Use an idempotency key (usually the event ID) stored in a DB:

Python
def handle_appointment_created(event: dict) -> None:
    event_id = event["appointment_id"] + "_" + event["occurred_at"]

    # Check if already processed
    if already_processed(event_id):
        logger.info("Skipping duplicate event %s", event_id)
        return

    # Process
    send_confirmation_email(event["patient_id"], event["scheduled_for"])

    # Mark as processed  use DB transaction
    mark_processed(event_id)
Python
def already_processed(event_id: str) -> bool:
    result = table.get_item(Key={"PK": f"EVENT#{event_id}", "SK": "PROCESSED"})
    return "Item" in result

def mark_processed(event_id: str) -> None:
    table.put_item(
        Item={
            "PK": f"EVENT#{event_id}",
            "SK": "PROCESSED",
            "processed_at": datetime.utcnow().isoformat(),
            "ttl": int(time.time()) + 7 * 24 * 3600,  # TTL: 7 days
        },
        ConditionExpression="attribute_not_exists(PK)"
    )