Kafka & Event-Driven Architecture · Lesson 5 of 7

Schema Registry & Avro: Data Contracts

The Problem: Schema Evolution Without Contracts

Without Schema Registry, producers and consumers share an implicit contract — the event structure is documented (maybe), enforced (never). A team refactors a field name, deployments stagger, and consumers start crashing with deserialization errors.

Schema Registry enforces that:

  • Every message has a registered schema
  • Schema changes are validated against a compatibility rule
  • Consumers always know what they're getting

Avro

Avro is the most popular serialization format for Kafka. It's:

  • Compact — binary, much smaller than JSON
  • Fast — no field names in each record (only in the schema)
  • Schema-first — schema is required to read or write data
  • Evolvable — supports forward/backward compatible changes

Schema Definition

JSON
// src/main/avro/AppointmentCreated.avsc
{
  "type":      "record",
  "name":      "AppointmentCreated",
  "namespace": "com.clinic.events",
  "doc":       "Fired when a new appointment is successfully scheduled",
  "fields": [
    { "name": "appointment_id", "type": "string",
      "doc": "UUID of the appointment" },
    { "name": "clinic_id",      "type": "string" },
    { "name": "patient_id",     "type": "string" },
    { "name": "scheduled_for",  "type": "string",
      "doc": "ISO-8601 datetime string" },
    { "name": "type",           "type": {
        "type": "enum",
        "name": "AppointmentType",
        "symbols": ["ROUTINE", "FOLLOW_UP", "EMERGENCY", "CONTACT_LENS"]
      }
    },
    { "name": "notes",          "type": ["null", "string"], "default": null },
    { "name": "occurred_at",    "type": "long",
      "logicalType": "timestamp-millis" }
  ]
}

Schema Registry Setup

Local Development (Docker)

YAML
# docker-compose.yml
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on: [zookeeper]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
    ports: ["9092:9092"]

  schema-registry:
    image: confluentinc/cp-schema-registry:7.6.0
    depends_on: [kafka]
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
    ports: ["8081:8081"]

Register a Schema

Bash
# Register schema via REST API
curl -X POST http://localhost:8081/subjects/appointments.appointments.created-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d "{\"schema\": $(cat src/main/avro/AppointmentCreated.avsc | jq -Rsc .)}"

# Get latest schema
curl http://localhost:8081/subjects/appointments.appointments.created-value/versions/latest

# Check compatibility before registering
curl -X POST http://localhost:8081/compatibility/subjects/appointments.appointments.created-value/versions/latest \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d "{\"schema\": $(cat AppointmentCreated_v2.avsc | jq -Rsc .)}"

Java: Spring Boot + Avro

Maven Plugin — Generate Java Classes from Avro

XML
<plugin>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-maven-plugin</artifactId>
    <version>1.11.3</version>
    <executions>
        <execution>
            <goals><goal>schema</goal></goals>
            <configuration>
                <sourceDirectory>${project.basedir}/src/main/avro</sourceDirectory>
                <outputDirectory>${project.build.directory}/generated-sources/avro</outputDirectory>
            </configuration>
        </execution>
    </executions>
</plugin>

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>7.6.0</version>
</dependency>

Configuration

YAML
spring:
  kafka:
    producer:
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    consumer:
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
    properties:
      schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
      specific.avro.reader: true   # use generated classes, not GenericRecord

Producer with Avro

JAVA
@Service
@RequiredArgsConstructor
public class AppointmentEventPublisher {

    private final KafkaTemplate<String, AppointmentCreated> kafkaTemplate;

    public void publishCreated(Appointment appointment) {
        // AppointmentCreated is the Avro-generated class
        AppointmentCreated event = AppointmentCreated.newBuilder()
            .setAppointmentId(appointment.getId())
            .setClinicId(appointment.getClinic().getId())
            .setPatientId(appointment.getPatient().getId())
            .setScheduledFor(appointment.getDateTime().toString())
            .setType(AppointmentType.valueOf(appointment.getType().name()))
            .setNotes(appointment.getNotes())
            .setOccurredAt(Instant.now().toEpochMilli())
            .build();

        kafkaTemplate.send("appointments.appointments.created",
            appointment.getClinic().getId(), event);
    }
}

Consumer with Avro

JAVA
@KafkaListener(topics = "appointments.appointments.created", groupId = "analytics")
public void onAppointmentCreated(AppointmentCreated event) {
    // Fully typed — IDE autocomplete works
    String clinicId    = event.getClinicId().toString();
    String type        = event.getType().toString();
    Long   occurredAt  = event.getOccurredAt();

    analyticsService.record(clinicId, type, Instant.ofEpochMilli(occurredAt));
}

Python with Schema Registry

Bash
pip install confluent-kafka[avro]
Python
import json
from confluent_kafka import Producer, Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField

# Schema Registry client
schema_registry = SchemaRegistryClient({"url": "http://localhost:8081"})

# Load schema
with open("schemas/AppointmentCreated.avsc") as f:
    schema_str = f.read()

avro_serializer = AvroSerializer(
    schema_registry,
    schema_str,
    lambda obj, ctx: obj   # dict  dict (no transformation needed)
)

avro_deserializer = AvroDeserializer(
    schema_registry,
    schema_str,
    lambda obj, ctx: obj
)

# Producer
producer = Producer({"bootstrap.servers": "localhost:9092"})

def publish_appointment_created(appointment: dict) -> None:
    event = {
        "appointment_id": appointment["id"],
        "clinic_id":      appointment["clinic_id"],
        "patient_id":     appointment["patient_id"],
        "scheduled_for":  appointment["datetime"],
        "type":           appointment["type"].upper(),
        "notes":          appointment.get("notes"),
        "occurred_at":    int(time.time() * 1000),
    }

    producer.produce(
        topic     = "appointments.appointments.created",
        key       = appointment["clinic_id"],
        value     = avro_serializer(
            event,
            SerializationContext("appointments.appointments.created", MessageField.VALUE)
        ),
        on_delivery = lambda err, msg: print(f"Delivered: {msg.offset()}" if not err else f"Error: {err}")
    )
    producer.flush()

Schema Evolution Rules

Backward Compatible Changes (BACKWARD — the default)

New schema can read data written with the old schema:

  • Add a field with a default value ✓
  • Remove a field ✓
  • Change a field type to a compatible type ✓ (e.g., int → long)

Forward Compatible Changes (FORWARD)

Old schema can read data written with the new schema:

  • Add a field ✓ (old consumer ignores unknown fields)
  • Remove an optional field ✓

Breaking Changes (never allowed with FULL compatibility)

  • Remove a required field with no default
  • Rename a field (it's a delete + add, which breaks)
  • Change field type to an incompatible type

Evolving the Schema Safely

JSON
// Version 1
{"name": "notes", "type": ["null", "string"], "default": null}

// Version 2 — add new optional field (backward compatible)
{"name": "notes",          "type": ["null", "string"], "default": null},
{"name": "appointment_tag","type": ["null", "string"], "default": null}

// Rename (BREAKING — don't do this, add new field instead)
// Instead of renaming "type" to "appointment_type":
{"name": "type",            "type": "AppointmentType"},
{"name": "appointment_type","type": ["null", "string"], "default": null}
// Deprecate "type" and migrate consumers over time

Setting Compatibility Per Subject

Bash
# Set FULL_TRANSITIVE  most strict, best for production
curl -X PUT http://localhost:8081/config/appointments.appointments.created-value \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "FULL_TRANSITIVE"}'

| Level | Old reads new | New reads old | |-------|--------------|--------------| | BACKWARD | ✓ | ✗ | | FORWARD | ✗ | ✓ | | FULL | ✓ | ✓ | | FULL_TRANSITIVE | ✓ vs all versions | ✓ vs all versions |