Kafka & Event-Driven Architecture · Lesson 7 of 7

Project: Event-Driven Scheduling System

What We're Building

A production-grade appointment scheduling system for an optometry call center, built event-first:

React Dashboard
    ↓  REST commands
Spring Boot API (write side)
    ↓  persists + publishes (outbox)
Kafka
    ├── Scheduling Service (saga coordinator)
    ├── Insurance Verification Service
    ├── Notification Service
    └── Kafka Streams (real-time metrics)
              ↓
          Read Model DB (PostgreSQL)
              ↓
React Dashboard ← REST queries

System Events

appointments.commands.schedule     → ScheduleAppointment
appointments.commands.cancel       → CancelAppointment
appointments.appointments.created  → AppointmentCreated
appointments.appointments.confirmed → AppointmentConfirmed
appointments.appointments.cancelled → AppointmentCancelled
insurance.commands.verify          → VerifyInsurance
insurance.events.verified          → InsuranceVerified
insurance.events.failed            → InsuranceFailed
notifications.commands.send        → SendConfirmation
calls.calls.started                → CallStarted
calls.calls.ended                  → CallEnded

Step 1: Avro Schemas

JSON
// schemas/AppointmentCreated.avsc
{
  "type": "record",
  "name": "AppointmentCreated",
  "namespace": "com.clinic.events",
  "fields": [
    { "name": "appointment_id", "type": "string" },
    { "name": "clinic_id",      "type": "string" },
    { "name": "patient_id",     "type": "string" },
    { "name": "patient_name",   "type": "string" },
    { "name": "patient_email",  "type": "string" },
    { "name": "scheduled_for",  "type": "string" },
    { "name": "type",           "type": { "type": "enum", "name": "AppointmentType",
        "symbols": ["ROUTINE", "FOLLOW_UP", "EMERGENCY", "CONTACT_LENS"] }},
    { "name": "saga_id",        "type": "string",
      "doc": "Correlates events in the same saga" },
    { "name": "occurred_at",    "type": "long", "logicalType": "timestamp-millis" }
  ]
}

Register all schemas before running services:

Bash
# register-schemas.sh
for schema in schemas/*.avsc; do
  subject=$(basename $schema .avsc)
  curl -X POST http://localhost:8081/subjects/${subject}-value/versions \
    -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    -d "{\"schema\": $(cat $schema | jq -Rsc .)}"
done

Step 2: Write Service (Spring Boot)

JAVA
// AppointmentCommandController.java
@RestController
@RequestMapping("/api/v1/appointments")
public class AppointmentCommandController {

    private final AppointmentCommandHandler handler;

    @PostMapping
    public ResponseEntity<Map<String, String>> schedule(
        @Valid @RequestBody ScheduleAppointmentRequest req,
        Authentication auth
    ) {
        String commandId     = UUID.randomUUID().toString();
        String sagaId        = UUID.randomUUID().toString();
        String appointmentId = handler.handle(new ScheduleAppointment(
            commandId, sagaId,
            req.clinicId(), req.patientId(),
            req.dateTime(), req.type(), req.notes()
        ));
        return ResponseEntity.accepted().body(Map.of(
            "appointmentId", appointmentId,
            "sagaId",        sagaId,
            "status",        "PROCESSING"
        ));
    }
}

// AppointmentCommandHandler.java
@Service
@Transactional
public class AppointmentCommandHandler {

    public String handle(ScheduleAppointment cmd) {
        Appointment appt = new Appointment(
            UUID.randomUUID().toString(),
            cmd.clinicId(), cmd.patientId(),
            cmd.dateTime(), AppointmentStatus.PENDING_INSURANCE,
            cmd.type()
        );
        appointmentRepo.save(appt);

        // Outbox — same transaction
        outboxRepo.save(OutboxEvent.of(
            "appointments.appointments.created",
            cmd.clinicId(),
            AppointmentCreated.newBuilder()
                .setAppointmentId(appt.getId())
                .setClinicId(cmd.clinicId())
                .setPatientId(cmd.patientId())
                .setSagaId(cmd.sagaId())
                .setScheduledFor(cmd.dateTime().toString())
                .setType(toAvroType(cmd.type()))
                .setOccurredAt(Instant.now().toEpochMilli())
                .build()
        ));

        return appt.getId();
    }
}

Step 3: Scheduling Saga

JAVA
@Component
@RequiredArgsConstructor
public class AppointmentSchedulingSaga {

    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final SagaStateRepository sagaRepo;

    @KafkaListener(topics = "appointments.appointments.created", groupId = "saga-coordinator")
    public void onAppointmentCreated(AppointmentCreated event) {
        SagaState saga = new SagaState(
            event.getSagaId().toString(),
            event.getAppointmentId().toString(),
            SagaStep.INSURANCE_VERIFICATION
        );
        sagaRepo.save(saga);

        // Step 1: verify insurance
        kafkaTemplate.send("insurance.commands.verify",
            event.getClinicId().toString(),
            new VerifyInsurance(
                event.getAppointmentId().toString(),
                event.getPatientId().toString(),
                event.getSagaId().toString()
            )
        );
    }

    @KafkaListener(topics = "insurance.events.verified", groupId = "saga-coordinator")
    public void onInsuranceVerified(InsuranceVerified event) {
        SagaState saga = sagaRepo.findBySagaId(event.getSagaId());
        saga.setStep(SagaStep.SENDING_CONFIRMATION);
        sagaRepo.save(saga);

        // Step 2: send confirmation email
        kafkaTemplate.send("notifications.commands.send",
            event.getClinicId(),
            new SendConfirmation(
                saga.getAppointmentId(),
                event.getPatientEmail(),
                event.getSagaId()
            )
        );
    }

    @KafkaListener(topics = "insurance.events.failed", groupId = "saga-coordinator")
    public void onInsuranceFailed(InsuranceFailed event) {
        // Compensating transaction — cancel the appointment
        kafkaTemplate.send("appointments.commands.cancel",
            event.getClinicId(),
            new CancelAppointment(
                event.getAppointmentId(),
                "Insurance verification failed: " + event.getReason()
            )
        );
    }

    @KafkaListener(topics = "notifications.events.sent", groupId = "saga-coordinator")
    public void onConfirmationSent(ConfirmationSent event) {
        SagaState saga = sagaRepo.findBySagaId(event.getSagaId());
        saga.setStep(SagaStep.COMPLETED);
        sagaRepo.save(saga);

        // Final step — confirm the appointment
        kafkaTemplate.send("appointments.commands.confirm",
            saga.getAppointmentId(),
            new ConfirmAppointment(saga.getAppointmentId())
        );
    }
}

Step 4: CQRS Read Model Projection

JAVA
@Component
public class AppointmentProjection {

    @KafkaListener(topics = {
        "appointments.appointments.created",
        "appointments.appointments.confirmed",
        "appointments.appointments.cancelled",
        "appointments.appointments.completed"
    }, groupId = "read-model-projector")
    public void project(ConsumerRecord<String, Object> record, Acknowledgment ack) {
        Object event = record.value();

        if (event instanceof AppointmentCreated e) {
            summaryRepo.save(AppointmentSummary.builder()
                .id(e.getAppointmentId().toString())
                .clinicId(e.getClinicId().toString())
                .patientId(e.getPatientId().toString())
                .scheduledFor(LocalDateTime.parse(e.getScheduledFor().toString()))
                .status("PENDING_INSURANCE")
                .type(e.getType().toString())
                .build());

        } else if (event instanceof AppointmentConfirmed e) {
            summaryRepo.findById(e.getAppointmentId().toString())
                .ifPresent(s -> { s.setStatus("CONFIRMED"); summaryRepo.save(s); });

        } else if (event instanceof AppointmentCancelled e) {
            summaryRepo.findById(e.getAppointmentId().toString())
                .ifPresent(s -> { s.setStatus("CANCELLED"); s.setCancelReason(e.getReason().toString()); summaryRepo.save(s); });
        }

        ack.acknowledge();
    }
}

Step 5: Kafka Streams — Real-Time Metrics

JAVA
@Autowired
public void buildMetricsTopology(StreamsBuilder builder) {
    // Count appointments per clinic per hour
    builder.<String, AppointmentCreated>stream("appointments.appointments.created")
        .selectKey((k, v) -> v.getClinicId().toString())
        .groupByKey()
        .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
        .count(Materialized.as("hourly-appointment-counts"))
        .toStream()
        .mapValues((windowedKey, count) -> Map.of(
            "clinic_id", windowedKey.key(),
            "count", count,
            "window_start", windowedKey.window().startTime().toString()
        ))
        .to("appointments.metrics.hourly-counts");

    // Real-time funnel: created → confirmed → completed rates
    KStream<String, Long> created   = builder.<String, AppointmentCreated>stream("appointments.appointments.created").mapValues(v -> 1L);
    KStream<String, Long> confirmed = builder.<String, AppointmentConfirmed>stream("appointments.appointments.confirmed").mapValues(v -> 1L);
    KStream<String, Long> cancelled = builder.<String, AppointmentCancelled>stream("appointments.appointments.cancelled").mapValues(v -> 1L);

    Stream.of(
        created.groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(30))).reduce(Long::sum, Materialized.as("created-30m")),
        confirmed.groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(30))).reduce(Long::sum, Materialized.as("confirmed-30m")),
        cancelled.groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(30))).reduce(Long::sum, Materialized.as("cancelled-30m"))
    ).forEach(t -> {}); // materialize for interactive queries
}

Step 6: React Dashboard

TSX
// src/pages/AppointmentDashboard.tsx
import { useQuery } from '@tanstack/react-query'
import { AreaChart, Area, XAxis, YAxis, Tooltip, ResponsiveContainer } from 'recharts'

export function AppointmentDashboard() {
  const { data: metrics } = useQuery({
    queryKey: ['appointment-metrics'],
    queryFn: () => api.getAppointmentMetrics(),
    refetchInterval: 30_000,
  })

  const { data: recent } = useQuery({
    queryKey: ['recent-appointments'],
    queryFn: () => api.getRecentAppointments({ page: 0, size: 20 }),
    refetchInterval: 15_000,
  })

  return (
    <div className="space-y-6 p-6">
      {/* Funnel metrics */}
      <div className="grid grid-cols-4 gap-4">
        {[
          { label: 'Scheduled', value: metrics?.created ?? 0,   color: 'text-blue-500'  },
          { label: 'Confirmed', value: metrics?.confirmed ?? 0,  color: 'text-green-500' },
          { label: 'Completed', value: metrics?.completed ?? 0,  color: 'text-violet-500'},
          { label: 'Cancelled', value: metrics?.cancelled ?? 0,  color: 'text-red-500'   },
        ].map(kpi => (
          <div key={kpi.label} className="rounded-xl border bg-card p-5">
            <p className="text-sm text-muted-foreground">{kpi.label}</p>
            <p className={`text-3xl font-bold mt-1 ${kpi.color}`}>{kpi.value}</p>
          </div>
        ))}
      </div>

      {/* Hourly trend */}
      <div className="rounded-xl border bg-card p-5">
        <h3 className="font-semibold mb-4">Appointments Today</h3>
        <ResponsiveContainer width="100%" height={200}>
          <AreaChart data={metrics?.hourly ?? []}>
            <defs>
              <linearGradient id="areaGrad" x1="0" y1="0" x2="0" y2="1">
                <stop offset="5%"  stopColor="#6366f1" stopOpacity={0.2} />
                <stop offset="95%" stopColor="#6366f1" stopOpacity={0} />
              </linearGradient>
            </defs>
            <XAxis dataKey="hour" tick={{ fontSize: 11 }} />
            <YAxis tick={{ fontSize: 11 }} />
            <Tooltip />
            <Area type="monotone" dataKey="count" stroke="#6366f1" fill="url(#areaGrad)" strokeWidth={2} />
          </AreaChart>
        </ResponsiveContainer>
      </div>

      {/* Recent appointments table */}
      <div className="rounded-xl border bg-card overflow-hidden">
        <div className="p-4 border-b"><h3 className="font-semibold">Recent Appointments</h3></div>
        <table className="w-full text-sm">
          <thead className="bg-muted/50">
            <tr>
              {['Patient', 'Clinic', 'Date & Time', 'Type', 'Status'].map(h => (
                <th key={h} className="px-4 py-3 text-left font-medium text-muted-foreground">{h}</th>
              ))}
            </tr>
          </thead>
          <tbody>
            {recent?.items.map(appt => (
              <tr key={appt.id} className="border-t hover:bg-muted/30">
                <td className="px-4 py-3 font-medium">{appt.patientName}</td>
                <td className="px-4 py-3 text-muted-foreground">{appt.clinicName}</td>
                <td className="px-4 py-3">{new Date(appt.scheduledFor).toLocaleString()}</td>
                <td className="px-4 py-3 capitalize">{appt.type.replace('_', ' ')}</td>
                <td className="px-4 py-3">
                  <span className={`inline-flex px-2 py-1 rounded-full text-xs font-medium ${
                    appt.status === 'CONFIRMED'  ? 'bg-green-500/15 text-green-500' :
                    appt.status === 'CANCELLED'  ? 'bg-red-500/15 text-red-500'    :
                    appt.status === 'COMPLETED'  ? 'bg-violet-500/15 text-violet-500' :
                    'bg-blue-500/15 text-blue-500'
                  }`}>{appt.status}</span>
                </td>
              </tr>
            ))}
          </tbody>
        </table>
      </div>
    </div>
  )
}

Infrastructure (Terraform)

HCL
# Kafka topics
resource "kafka_topic" "appointment_created" {
  name               = "appointments.appointments.created"
  replication_factor = 3
  partitions         = 12
  config = {
    "retention.ms" = "604800000"
    "cleanup.policy" = "delete"
  }
}

# ECS services for each microservice
resource "aws_ecs_service" "appointment_api" {
  name            = "appointment-api"
  cluster         = aws_ecs_cluster.main.id
  task_definition = aws_ecs_task_definition.appointment_api.arn
  desired_count   = 2
}

resource "aws_ecs_service" "saga_coordinator" {
  name          = "saga-coordinator"
  desired_count = 1  # single coordinator to avoid concurrent saga processing
}

resource "aws_ecs_service" "streams_processor" {
  name          = "streams-processor"
  desired_count = 3  # 3 instances × 12 partitions = 4 partitions each
}