Kafka & Event-Driven Architecture · Lesson 2 of 7
Producers & Consumers in Java and Python
Java — Spring Kafka
Dependency
XML
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>Configuration
YAML
# application.yml
spring:
kafka:
bootstrap-servers: ${KAFKA_BROKERS:localhost:9092}
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 2147483647
properties:
enable.idempotence: true
compression.type: snappy
linger.ms: 5
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest
enable-auto-commit: false
properties:
spring.json.trusted.packages: "com.clinic.*"
listener:
ack-mode: MANUAL_IMMEDIATEEvents (Domain Events as Records)
JAVA
// src/main/java/com/clinic/events/AppointmentEvents.java
public sealed interface AppointmentEvent permits
AppointmentCreated, AppointmentCancelled, AppointmentCompleted {
String appointmentId();
String clinicId();
Instant occurredAt();
}
public record AppointmentCreated(
String appointmentId,
String clinicId,
String patientId,
LocalDateTime scheduledFor,
String type,
Instant occurredAt
) implements AppointmentEvent {}
public record AppointmentCancelled(
String appointmentId,
String clinicId,
String cancelledBy,
String reason,
Instant occurredAt
) implements AppointmentEvent {}Producer
JAVA
@Service
@RequiredArgsConstructor
public class AppointmentEventPublisher {
private static final Logger log = LoggerFactory.getLogger(AppointmentEventPublisher.class);
private final KafkaTemplate<String, Object> kafkaTemplate;
public void publishCreated(Appointment appointment) {
var event = new AppointmentCreated(
appointment.getId(),
appointment.getClinic().getId(),
appointment.getPatient().getId(),
appointment.getDateTime(),
appointment.getType().name().toLowerCase(),
Instant.now()
);
send("appointments.appointments.created", appointment.getClinic().getId(), event);
}
public void publishCancelled(Appointment appointment, String reason) {
var event = new AppointmentCancelled(
appointment.getId(),
appointment.getClinic().getId(),
"system",
reason,
Instant.now()
);
send("appointments.appointments.cancelled", appointment.getClinic().getId(), event);
}
private void send(String topic, String key, Object payload) {
kafkaTemplate.send(topic, key, payload)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed to publish {} to topic {}: {}",
payload.getClass().getSimpleName(), topic, ex.getMessage());
} else {
log.debug("Published to {}-{} @ offset {}",
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
});
}
}Consumer with Manual Commit
JAVA
@Component
@RequiredArgsConstructor
public class AppointmentEventConsumer {
private static final Logger log = LoggerFactory.getLogger(AppointmentEventConsumer.class);
private final NotificationService notificationService;
private final AnalyticsService analyticsService;
@KafkaListener(
topics = "appointments.appointments.created",
groupId = "notification-service",
concurrency = "3" // 3 consumer threads — one per partition
)
public void onAppointmentCreated(
@Payload AppointmentCreated event,
Acknowledgment ack,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset
) {
log.info("Processing AppointmentCreated {} from partition {} offset {}",
event.appointmentId(), partition, offset);
try {
notificationService.sendAppointmentConfirmation(
event.patientId(),
event.scheduledFor()
);
ack.acknowledge(); // commit offset only after success
} catch (TransientException e) {
// Don't ack — message will be redelivered
log.warn("Transient failure processing {}, will retry: {}", event.appointmentId(), e.getMessage());
throw e;
} catch (Exception e) {
// Permanent failure — ack to avoid infinite retry, send to DLT
log.error("Permanent failure processing {}: {}", event.appointmentId(), e.getMessage());
ack.acknowledge(); // don't block the partition
// DLT handling below
}
}
}Dead Letter Topic Configuration
JAVA
@Configuration
public class KafkaConfig {
@Bean
public DefaultErrorHandler errorHandler(KafkaOperations<String, Object> template) {
// Retry 3 times with exponential backoff, then send to DLT
ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(3);
backOff.setInitialInterval(1000L);
backOff.setMultiplier(2.0);
DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(template,
(record, ex) -> new TopicPartition(
record.topic() + ".DLT",
record.partition()
)
);
return new DefaultErrorHandler(recoverer, backOff);
}
}Batch Consumer
JAVA
@KafkaListener(
topics = "calls.transcripts.created",
groupId = "analytics-service",
concurrency = "6"
)
public void onTranscriptsBatch(
List<ConsumerRecord<String, TranscriptCreated>> records,
Acknowledgment ack
) {
log.info("Processing batch of {} transcripts", records.size());
List<TranscriptCreated> events = records.stream()
.map(ConsumerRecord::value)
.toList();
try {
analyticsService.processBatch(events);
ack.acknowledge();
} catch (Exception e) {
log.error("Batch processing failed: {}", e.getMessage());
throw e;
}
}Python — confluent-kafka
Bash
pip install confluent-kafkaProducer
Python
# kafka/producer.py
import json
import logging
from datetime import datetime
from confluent_kafka import Producer, KafkaException
logger = logging.getLogger(__name__)
def create_producer(brokers: str) -> Producer:
return Producer({
"bootstrap.servers": brokers,
"acks": "all",
"retries": 2_147_483_647,
"enable.idempotence": True,
"compression.type": "snappy",
"linger.ms": 5,
"batch.size": 65_536,
})
class AppointmentEventPublisher:
def __init__(self, producer: Producer):
self._producer = producer
def publish_created(self, appointment: dict) -> None:
event = {
"event_type": "AppointmentCreated",
"appointment_id": appointment["id"],
"clinic_id": appointment["clinic_id"],
"patient_id": appointment["patient_id"],
"scheduled_for": appointment["datetime"],
"type": appointment["type"],
"occurred_at": datetime.utcnow().isoformat()
}
self._send(
topic="appointments.appointments.created",
key=appointment["clinic_id"],
value=event
)
def _send(self, topic: str, key: str, value: dict) -> None:
self._producer.produce(
topic = topic,
key = key.encode("utf-8"),
value = json.dumps(value).encode("utf-8"),
on_delivery = self._on_delivery
)
self._producer.poll(0) # trigger callbacks
@staticmethod
def _on_delivery(err, msg) -> None:
if err:
logger.error("Delivery failed for %s: %s", msg.topic(), err)
else:
logger.debug("Delivered to %s [%d] @ %d",
msg.topic(), msg.partition(), msg.offset())
def flush(self) -> None:
"""Call before shutdown — ensures all buffered messages are sent."""
self._producer.flush(timeout=30)Consumer
Python
# kafka/consumer.py
import json
import logging
import signal
from confluent_kafka import Consumer, KafkaError, KafkaException
logger = logging.getLogger(__name__)
class AppointmentEventConsumer:
def __init__(self, brokers: str, group_id: str, handler):
self._consumer = Consumer({
"bootstrap.servers": brokers,
"group.id": group_id,
"auto.offset.reset": "earliest",
"enable.auto.commit": False, # manual commit
"max.poll.interval.ms": 300_000,
"session.timeout.ms": 45_000,
})
self._handler = handler
self._running = True
signal.signal(signal.SIGTERM, self._shutdown)
signal.signal(signal.SIGINT, self._shutdown)
def run(self, topics: list[str]) -> None:
self._consumer.subscribe(topics)
logger.info("Subscribed to %s", topics)
try:
while self._running:
msg = self._consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise KafkaException(msg.error())
self._process(msg)
finally:
self._consumer.close()
def _process(self, msg) -> None:
try:
event = json.loads(msg.value().decode("utf-8"))
logger.info("Processing %s from %s[%d]@%d",
event.get("event_type"), msg.topic(),
msg.partition(), msg.offset())
self._handler(event)
# Commit after successful processing
self._consumer.commit(message=msg, asynchronous=False)
except Exception as e:
logger.exception("Failed to process message at %s[%d]@%d: %s",
msg.topic(), msg.partition(), msg.offset(), e)
# Don't commit — message will be redelivered
# For permanent errors, publish to DLT and commit
def _shutdown(self, *_) -> None:
logger.info("Shutdown signal received")
self._running = False
# Usage
def handle_appointment_created(event: dict) -> None:
appointment_id = event["appointment_id"]
clinic_id = event["clinic_id"]
# process...
consumer = AppointmentEventConsumer(
brokers = "localhost:9092",
group_id = "notification-service",
handler = handle_appointment_created
)
consumer.run(["appointments.appointments.created"])Idempotent Consumers
Design consumers to be safe to re-run. Use an idempotency key (usually the event ID) stored in a DB:
Python
def handle_appointment_created(event: dict) -> None:
event_id = event["appointment_id"] + "_" + event["occurred_at"]
# Check if already processed
if already_processed(event_id):
logger.info("Skipping duplicate event %s", event_id)
return
# Process
send_confirmation_email(event["patient_id"], event["scheduled_for"])
# Mark as processed — use DB transaction
mark_processed(event_id)Python
def already_processed(event_id: str) -> bool:
result = table.get_item(Key={"PK": f"EVENT#{event_id}", "SK": "PROCESSED"})
return "Item" in result
def mark_processed(event_id: str) -> None:
table.put_item(
Item={
"PK": f"EVENT#{event_id}",
"SK": "PROCESSED",
"processed_at": datetime.utcnow().isoformat(),
"ttl": int(time.time()) + 7 * 24 * 3600, # TTL: 7 days
},
ConditionExpression="attribute_not_exists(PK)"
)