Kafka & Event-Driven Architecture · Lesson 4 of 7
EDA Patterns: CQRS, Event Sourcing & Saga
CQRS — Command Query Responsibility Segregation
CQRS separates the write model (commands that change state) from the read model (queries that return data).
Why? Write models are normalized for consistency. Read models are denormalized for query performance. Trying to serve both from one model leads to compromises.
Client
├── Command → Command Handler → Write DB (normalized)
│ ↓ event
└── Query → Query Handler ← Read DB (denormalized/projected)Command Side (Spring Boot)
// Commands — intent to change state
public sealed interface AppointmentCommand permits
ScheduleAppointment, ConfirmAppointment, CancelAppointment {}
public record ScheduleAppointment(
String commandId, // for idempotency
String clinicId,
String patientId,
LocalDateTime requestedAt,
String type
) implements AppointmentCommand {}
// Command handler — validates + executes + publishes event
@Service
@Transactional
@RequiredArgsConstructor
public class AppointmentCommandHandler {
private final AppointmentRepository repository;
private final AppointmentEventPublisher events;
private final ProcessedCommandRepository processedCommands;
public String handle(ScheduleAppointment cmd) {
// Idempotency — reject duplicate commands
if (processedCommands.exists(cmd.commandId())) {
return processedCommands.getResultId(cmd.commandId());
}
// Business validation
validateNoConflict(cmd.patientId(), cmd.requestedAt());
// Persist
Appointment appt = new Appointment(
UUID.randomUUID().toString(),
cmd.clinicId(),
cmd.patientId(),
cmd.requestedAt(),
AppointmentStatus.SCHEDULED,
cmd.type()
);
repository.save(appt);
// Publish event (via outbox pattern — see below)
events.publishCreated(appt);
processedCommands.record(cmd.commandId(), appt.getId());
return appt.getId();
}
}Query Side — Projection
The read model is built from events:
// Read model — optimized for queries
@Entity
@Table(name = "appointment_summaries")
public class AppointmentSummary {
private String id;
private String clinicId;
private String patientName;
private String patientEmail;
private LocalDateTime dateTime;
private String status;
private String type;
private int cancellationCount;
// ... additional denormalized fields
}
// Projection — updates read model from events
@Component
@KafkaListener(topics = "appointments.appointments.*", groupId = "cqrs-projection")
public class AppointmentProjection {
@Autowired
private AppointmentSummaryRepository summaryRepo;
public void on(AppointmentCreated event) {
AppointmentSummary summary = new AppointmentSummary();
summary.setId(event.appointmentId());
summary.setClinicId(event.clinicId());
summary.setStatus("SCHEDULED");
// fetch patient details if needed
summaryRepo.save(summary);
}
public void on(AppointmentCancelled event) {
summaryRepo.findById(event.appointmentId()).ifPresent(s -> {
s.setStatus("CANCELLED");
s.setCancellationCount(s.getCancellationCount() + 1);
summaryRepo.save(s);
});
}
}
// Query handler — reads from read model
@Service
@RequiredArgsConstructor
public class AppointmentQueryHandler {
private final AppointmentSummaryRepository repo;
public Page<AppointmentSummary> getByClinic(String clinicId, Pageable pageable) {
return repo.findByClinicIdOrderByDateTimeAsc(clinicId, pageable);
}
public AppointmentStats getStats(String clinicId, LocalDate date) {
return repo.getStatsByClinicAndDate(clinicId, date);
}
}Event Sourcing
Instead of storing the current state, store the sequence of events that led to it. The current state is derived by replaying events.
Append-only event store:
[0] AppointmentCreated {id: APT-1, clinic: CLN-001, patient: PAT-001, date: 2026-04-16}
[1] AppointmentConfirmed{id: APT-1, confirmedBy: staff-1}
[2] AppointmentRescheduled{id: APT-1, newDate: 2026-04-17}
[3] AppointmentCompleted{id: APT-1, duration: 35}
→ Replay events 0-3 → current state: {id: APT-1, status: COMPLETED, date: 2026-04-17, ...}Event Store Implementation
@Entity
@Table(name = "appointment_events")
public class StoredEvent {
@Id
private String eventId;
private String aggregateId; // e.g., appointment ID
private String aggregateType; // e.g., "Appointment"
private String eventType; // e.g., "AppointmentCreated"
@Column(columnDefinition = "jsonb")
private String payload; // JSON
private long sequenceNumber;
private Instant occurredAt;
}
@Repository
public interface EventStoreRepository extends JpaRepository<StoredEvent, String> {
List<StoredEvent> findByAggregateIdOrderBySequenceNumberAsc(String aggregateId);
List<StoredEvent> findByAggregateTypeAndOccurredAtAfter(String type, Instant after);
}
// Aggregate root — applies events to build current state
public class Appointment {
private String id;
private String clinicId;
private AppointmentStatus status;
private LocalDateTime dateTime;
private List<Object> uncommittedEvents = new ArrayList<>();
public static Appointment reconstitute(List<StoredEvent> events) {
Appointment appt = new Appointment();
events.forEach(appt::apply);
return appt;
}
private void apply(StoredEvent stored) {
switch (stored.getEventType()) {
case "AppointmentCreated" -> applyCreated(deserialize(stored));
case "AppointmentConfirmed" -> applyConfirmed(deserialize(stored));
case "AppointmentCancelled" -> status = AppointmentStatus.CANCELLED;
case "AppointmentCompleted" -> status = AppointmentStatus.COMPLETED;
}
}
public void cancel(String reason) {
if (status == AppointmentStatus.COMPLETED) {
throw new IllegalStateException("Cannot cancel a completed appointment");
}
raiseEvent(new AppointmentCancelled(id, clinicId, reason, Instant.now()));
}
private void raiseEvent(Object event) {
apply(toStoredEvent(event));
uncommittedEvents.add(event);
}
}Transactional Outbox Pattern
The problem: you save to the database AND publish to Kafka in the same operation. If the DB commit succeeds but Kafka publish fails (or vice versa), you have inconsistency.
Solution: Outbox pattern — write the event to an outbox table in the same DB transaction, then a relay process publishes it to Kafka.
Transaction:
├── INSERT into appointments
└── INSERT into outbox_events (not yet published)
Relay process (separate):
└── Poll outbox → Publish to Kafka → Mark as published@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
@Id
private String eventId;
private String topic;
private String key;
@Column(columnDefinition = "jsonb")
private String payload;
private boolean published;
private Instant createdAt;
private Instant publishedAt;
}
@Service
@Transactional
public class AppointmentService {
public Appointment create(AppointmentRequest request) {
Appointment appt = new Appointment(...);
appointmentRepository.save(appt);
// Same transaction — guaranteed consistency
OutboxEvent outboxEvent = new OutboxEvent(
UUID.randomUUID().toString(),
"appointments.appointments.created",
appt.getClinic().getId(),
objectMapper.writeValueAsString(AppointmentCreated.from(appt))
);
outboxRepository.save(outboxEvent);
return appt;
}
}
// Relay — runs in background, polls and publishes
@Component
@Scheduled(fixedDelay = 1000)
public class OutboxRelay {
public void relay() {
List<OutboxEvent> pending = outboxRepository.findByPublishedFalse(Limit.of(100));
for (OutboxEvent event : pending) {
try {
kafkaTemplate.send(event.getTopic(), event.getKey(), event.getPayload())
.get(5, SECONDS);
event.setPublished(true);
event.setPublishedAt(Instant.now());
outboxRepository.save(event);
} catch (Exception e) {
log.error("Failed to relay event {}: {}", event.getEventId(), e.getMessage());
}
}
}
}Saga Pattern — Distributed Transactions
When a business process spans multiple services, you need coordination without a distributed lock.
Choreography Saga (event-driven, each service reacts to events):
1. AppointmentService → publishes AppointmentCreated
2. InsuranceService ← listens, verifies insurance
→ publishes InsuranceVerified OR InsuranceFailed
3. NotificationService ← listens to InsuranceVerified
→ sends confirmation email
→ publishes ConfirmationSent
On failure:
InsuranceFailed → AppointmentService cancels appointment (compensating transaction)Orchestration Saga (central coordinator):
@Component
@RequiredArgsConstructor
public class AppointmentSchedulingSaga {
@SagaOrchestrationStart
@KafkaListener(topics = "appointments.appointments.created")
public void onCreated(AppointmentCreated event) {
// Step 1: verify insurance
kafkaTemplate.send("insurance.commands.verify",
new VerifyInsurance(event.appointmentId(), event.patientId(), event.clinicId()));
}
@KafkaListener(topics = "insurance.events.verified")
public void onInsuranceVerified(InsuranceVerified event) {
// Step 2: send confirmation
kafkaTemplate.send("notifications.commands.send",
new SendConfirmation(event.appointmentId(), event.patientId()));
}
@KafkaListener(topics = "insurance.events.failed")
public void onInsuranceFailed(InsuranceFailed event) {
// Compensating transaction — cancel appointment
kafkaTemplate.send("appointments.commands.cancel",
new CancelAppointment(event.appointmentId(), "Insurance verification failed"));
}
@KafkaListener(topics = "notifications.events.sent")
public void onConfirmationSent(ConfirmationSent event) {
// Saga complete — update status
kafkaTemplate.send("appointments.commands.confirm",
new ConfirmAppointment(event.appointmentId()));
}
}Pattern Selection Guide
| Situation | Pattern | |-----------|---------| | Read-heavy, different read/write shapes | CQRS | | Full audit trail required | Event Sourcing | | Guaranteed event publishing with DB write | Outbox | | Business process across multiple services | Saga | | Simple CRUD, single service | None needed | | High query volume, complex filters | CQRS read model |