Kafka & Event-Driven Architecture · Lesson 7 of 7
Project: Event-Driven Scheduling System
What We're Building
A production-grade appointment scheduling system for an optometry call center, built event-first:
React Dashboard
↓ REST commands
Spring Boot API (write side)
↓ persists + publishes (outbox)
Kafka
├── Scheduling Service (saga coordinator)
├── Insurance Verification Service
├── Notification Service
└── Kafka Streams (real-time metrics)
↓
Read Model DB (PostgreSQL)
↓
React Dashboard ← REST queriesSystem Events
appointments.commands.schedule → ScheduleAppointment
appointments.commands.cancel → CancelAppointment
appointments.appointments.created → AppointmentCreated
appointments.appointments.confirmed → AppointmentConfirmed
appointments.appointments.cancelled → AppointmentCancelled
insurance.commands.verify → VerifyInsurance
insurance.events.verified → InsuranceVerified
insurance.events.failed → InsuranceFailed
notifications.commands.send → SendConfirmation
calls.calls.started → CallStarted
calls.calls.ended → CallEndedStep 1: Avro Schemas
JSON
// schemas/AppointmentCreated.avsc
{
"type": "record",
"name": "AppointmentCreated",
"namespace": "com.clinic.events",
"fields": [
{ "name": "appointment_id", "type": "string" },
{ "name": "clinic_id", "type": "string" },
{ "name": "patient_id", "type": "string" },
{ "name": "patient_name", "type": "string" },
{ "name": "patient_email", "type": "string" },
{ "name": "scheduled_for", "type": "string" },
{ "name": "type", "type": { "type": "enum", "name": "AppointmentType",
"symbols": ["ROUTINE", "FOLLOW_UP", "EMERGENCY", "CONTACT_LENS"] }},
{ "name": "saga_id", "type": "string",
"doc": "Correlates events in the same saga" },
{ "name": "occurred_at", "type": "long", "logicalType": "timestamp-millis" }
]
}Register all schemas before running services:
Bash
# register-schemas.sh
for schema in schemas/*.avsc; do
subject=$(basename $schema .avsc)
curl -X POST http://localhost:8081/subjects/${subject}-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d "{\"schema\": $(cat $schema | jq -Rsc .)}"
doneStep 2: Write Service (Spring Boot)
JAVA
// AppointmentCommandController.java
@RestController
@RequestMapping("/api/v1/appointments")
public class AppointmentCommandController {
private final AppointmentCommandHandler handler;
@PostMapping
public ResponseEntity<Map<String, String>> schedule(
@Valid @RequestBody ScheduleAppointmentRequest req,
Authentication auth
) {
String commandId = UUID.randomUUID().toString();
String sagaId = UUID.randomUUID().toString();
String appointmentId = handler.handle(new ScheduleAppointment(
commandId, sagaId,
req.clinicId(), req.patientId(),
req.dateTime(), req.type(), req.notes()
));
return ResponseEntity.accepted().body(Map.of(
"appointmentId", appointmentId,
"sagaId", sagaId,
"status", "PROCESSING"
));
}
}
// AppointmentCommandHandler.java
@Service
@Transactional
public class AppointmentCommandHandler {
public String handle(ScheduleAppointment cmd) {
Appointment appt = new Appointment(
UUID.randomUUID().toString(),
cmd.clinicId(), cmd.patientId(),
cmd.dateTime(), AppointmentStatus.PENDING_INSURANCE,
cmd.type()
);
appointmentRepo.save(appt);
// Outbox — same transaction
outboxRepo.save(OutboxEvent.of(
"appointments.appointments.created",
cmd.clinicId(),
AppointmentCreated.newBuilder()
.setAppointmentId(appt.getId())
.setClinicId(cmd.clinicId())
.setPatientId(cmd.patientId())
.setSagaId(cmd.sagaId())
.setScheduledFor(cmd.dateTime().toString())
.setType(toAvroType(cmd.type()))
.setOccurredAt(Instant.now().toEpochMilli())
.build()
));
return appt.getId();
}
}Step 3: Scheduling Saga
JAVA
@Component
@RequiredArgsConstructor
public class AppointmentSchedulingSaga {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final SagaStateRepository sagaRepo;
@KafkaListener(topics = "appointments.appointments.created", groupId = "saga-coordinator")
public void onAppointmentCreated(AppointmentCreated event) {
SagaState saga = new SagaState(
event.getSagaId().toString(),
event.getAppointmentId().toString(),
SagaStep.INSURANCE_VERIFICATION
);
sagaRepo.save(saga);
// Step 1: verify insurance
kafkaTemplate.send("insurance.commands.verify",
event.getClinicId().toString(),
new VerifyInsurance(
event.getAppointmentId().toString(),
event.getPatientId().toString(),
event.getSagaId().toString()
)
);
}
@KafkaListener(topics = "insurance.events.verified", groupId = "saga-coordinator")
public void onInsuranceVerified(InsuranceVerified event) {
SagaState saga = sagaRepo.findBySagaId(event.getSagaId());
saga.setStep(SagaStep.SENDING_CONFIRMATION);
sagaRepo.save(saga);
// Step 2: send confirmation email
kafkaTemplate.send("notifications.commands.send",
event.getClinicId(),
new SendConfirmation(
saga.getAppointmentId(),
event.getPatientEmail(),
event.getSagaId()
)
);
}
@KafkaListener(topics = "insurance.events.failed", groupId = "saga-coordinator")
public void onInsuranceFailed(InsuranceFailed event) {
// Compensating transaction — cancel the appointment
kafkaTemplate.send("appointments.commands.cancel",
event.getClinicId(),
new CancelAppointment(
event.getAppointmentId(),
"Insurance verification failed: " + event.getReason()
)
);
}
@KafkaListener(topics = "notifications.events.sent", groupId = "saga-coordinator")
public void onConfirmationSent(ConfirmationSent event) {
SagaState saga = sagaRepo.findBySagaId(event.getSagaId());
saga.setStep(SagaStep.COMPLETED);
sagaRepo.save(saga);
// Final step — confirm the appointment
kafkaTemplate.send("appointments.commands.confirm",
saga.getAppointmentId(),
new ConfirmAppointment(saga.getAppointmentId())
);
}
}Step 4: CQRS Read Model Projection
JAVA
@Component
public class AppointmentProjection {
@KafkaListener(topics = {
"appointments.appointments.created",
"appointments.appointments.confirmed",
"appointments.appointments.cancelled",
"appointments.appointments.completed"
}, groupId = "read-model-projector")
public void project(ConsumerRecord<String, Object> record, Acknowledgment ack) {
Object event = record.value();
if (event instanceof AppointmentCreated e) {
summaryRepo.save(AppointmentSummary.builder()
.id(e.getAppointmentId().toString())
.clinicId(e.getClinicId().toString())
.patientId(e.getPatientId().toString())
.scheduledFor(LocalDateTime.parse(e.getScheduledFor().toString()))
.status("PENDING_INSURANCE")
.type(e.getType().toString())
.build());
} else if (event instanceof AppointmentConfirmed e) {
summaryRepo.findById(e.getAppointmentId().toString())
.ifPresent(s -> { s.setStatus("CONFIRMED"); summaryRepo.save(s); });
} else if (event instanceof AppointmentCancelled e) {
summaryRepo.findById(e.getAppointmentId().toString())
.ifPresent(s -> { s.setStatus("CANCELLED"); s.setCancelReason(e.getReason().toString()); summaryRepo.save(s); });
}
ack.acknowledge();
}
}Step 5: Kafka Streams — Real-Time Metrics
JAVA
@Autowired
public void buildMetricsTopology(StreamsBuilder builder) {
// Count appointments per clinic per hour
builder.<String, AppointmentCreated>stream("appointments.appointments.created")
.selectKey((k, v) -> v.getClinicId().toString())
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.count(Materialized.as("hourly-appointment-counts"))
.toStream()
.mapValues((windowedKey, count) -> Map.of(
"clinic_id", windowedKey.key(),
"count", count,
"window_start", windowedKey.window().startTime().toString()
))
.to("appointments.metrics.hourly-counts");
// Real-time funnel: created → confirmed → completed rates
KStream<String, Long> created = builder.<String, AppointmentCreated>stream("appointments.appointments.created").mapValues(v -> 1L);
KStream<String, Long> confirmed = builder.<String, AppointmentConfirmed>stream("appointments.appointments.confirmed").mapValues(v -> 1L);
KStream<String, Long> cancelled = builder.<String, AppointmentCancelled>stream("appointments.appointments.cancelled").mapValues(v -> 1L);
Stream.of(
created.groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(30))).reduce(Long::sum, Materialized.as("created-30m")),
confirmed.groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(30))).reduce(Long::sum, Materialized.as("confirmed-30m")),
cancelled.groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(30))).reduce(Long::sum, Materialized.as("cancelled-30m"))
).forEach(t -> {}); // materialize for interactive queries
}Step 6: React Dashboard
TSX
// src/pages/AppointmentDashboard.tsx
import { useQuery } from '@tanstack/react-query'
import { AreaChart, Area, XAxis, YAxis, Tooltip, ResponsiveContainer } from 'recharts'
export function AppointmentDashboard() {
const { data: metrics } = useQuery({
queryKey: ['appointment-metrics'],
queryFn: () => api.getAppointmentMetrics(),
refetchInterval: 30_000,
})
const { data: recent } = useQuery({
queryKey: ['recent-appointments'],
queryFn: () => api.getRecentAppointments({ page: 0, size: 20 }),
refetchInterval: 15_000,
})
return (
<div className="space-y-6 p-6">
{/* Funnel metrics */}
<div className="grid grid-cols-4 gap-4">
{[
{ label: 'Scheduled', value: metrics?.created ?? 0, color: 'text-blue-500' },
{ label: 'Confirmed', value: metrics?.confirmed ?? 0, color: 'text-green-500' },
{ label: 'Completed', value: metrics?.completed ?? 0, color: 'text-violet-500'},
{ label: 'Cancelled', value: metrics?.cancelled ?? 0, color: 'text-red-500' },
].map(kpi => (
<div key={kpi.label} className="rounded-xl border bg-card p-5">
<p className="text-sm text-muted-foreground">{kpi.label}</p>
<p className={`text-3xl font-bold mt-1 ${kpi.color}`}>{kpi.value}</p>
</div>
))}
</div>
{/* Hourly trend */}
<div className="rounded-xl border bg-card p-5">
<h3 className="font-semibold mb-4">Appointments Today</h3>
<ResponsiveContainer width="100%" height={200}>
<AreaChart data={metrics?.hourly ?? []}>
<defs>
<linearGradient id="areaGrad" x1="0" y1="0" x2="0" y2="1">
<stop offset="5%" stopColor="#6366f1" stopOpacity={0.2} />
<stop offset="95%" stopColor="#6366f1" stopOpacity={0} />
</linearGradient>
</defs>
<XAxis dataKey="hour" tick={{ fontSize: 11 }} />
<YAxis tick={{ fontSize: 11 }} />
<Tooltip />
<Area type="monotone" dataKey="count" stroke="#6366f1" fill="url(#areaGrad)" strokeWidth={2} />
</AreaChart>
</ResponsiveContainer>
</div>
{/* Recent appointments table */}
<div className="rounded-xl border bg-card overflow-hidden">
<div className="p-4 border-b"><h3 className="font-semibold">Recent Appointments</h3></div>
<table className="w-full text-sm">
<thead className="bg-muted/50">
<tr>
{['Patient', 'Clinic', 'Date & Time', 'Type', 'Status'].map(h => (
<th key={h} className="px-4 py-3 text-left font-medium text-muted-foreground">{h}</th>
))}
</tr>
</thead>
<tbody>
{recent?.items.map(appt => (
<tr key={appt.id} className="border-t hover:bg-muted/30">
<td className="px-4 py-3 font-medium">{appt.patientName}</td>
<td className="px-4 py-3 text-muted-foreground">{appt.clinicName}</td>
<td className="px-4 py-3">{new Date(appt.scheduledFor).toLocaleString()}</td>
<td className="px-4 py-3 capitalize">{appt.type.replace('_', ' ')}</td>
<td className="px-4 py-3">
<span className={`inline-flex px-2 py-1 rounded-full text-xs font-medium ${
appt.status === 'CONFIRMED' ? 'bg-green-500/15 text-green-500' :
appt.status === 'CANCELLED' ? 'bg-red-500/15 text-red-500' :
appt.status === 'COMPLETED' ? 'bg-violet-500/15 text-violet-500' :
'bg-blue-500/15 text-blue-500'
}`}>{appt.status}</span>
</td>
</tr>
))}
</tbody>
</table>
</div>
</div>
)
}Infrastructure (Terraform)
HCL
# Kafka topics
resource "kafka_topic" "appointment_created" {
name = "appointments.appointments.created"
replication_factor = 3
partitions = 12
config = {
"retention.ms" = "604800000"
"cleanup.policy" = "delete"
}
}
# ECS services for each microservice
resource "aws_ecs_service" "appointment_api" {
name = "appointment-api"
cluster = aws_ecs_cluster.main.id
task_definition = aws_ecs_task_definition.appointment_api.arn
desired_count = 2
}
resource "aws_ecs_service" "saga_coordinator" {
name = "saga-coordinator"
desired_count = 1 # single coordinator to avoid concurrent saga processing
}
resource "aws_ecs_service" "streams_processor" {
name = "streams-processor"
desired_count = 3 # 3 instances × 12 partitions = 4 partitions each
}