Kafka & Event-Driven Architecture · Lesson 5 of 7
Schema Registry & Avro: Data Contracts
The Problem: Schema Evolution Without Contracts
Without Schema Registry, producers and consumers share an implicit contract — the event structure is documented (maybe), enforced (never). A team refactors a field name, deployments stagger, and consumers start crashing with deserialization errors.
Schema Registry enforces that:
- Every message has a registered schema
- Schema changes are validated against a compatibility rule
- Consumers always know what they're getting
Avro
Avro is the most popular serialization format for Kafka. It's:
- Compact — binary, much smaller than JSON
- Fast — no field names in each record (only in the schema)
- Schema-first — schema is required to read or write data
- Evolvable — supports forward/backward compatible changes
Schema Definition
JSON
// src/main/avro/AppointmentCreated.avsc
{
"type": "record",
"name": "AppointmentCreated",
"namespace": "com.clinic.events",
"doc": "Fired when a new appointment is successfully scheduled",
"fields": [
{ "name": "appointment_id", "type": "string",
"doc": "UUID of the appointment" },
{ "name": "clinic_id", "type": "string" },
{ "name": "patient_id", "type": "string" },
{ "name": "scheduled_for", "type": "string",
"doc": "ISO-8601 datetime string" },
{ "name": "type", "type": {
"type": "enum",
"name": "AppointmentType",
"symbols": ["ROUTINE", "FOLLOW_UP", "EMERGENCY", "CONTACT_LENS"]
}
},
{ "name": "notes", "type": ["null", "string"], "default": null },
{ "name": "occurred_at", "type": "long",
"logicalType": "timestamp-millis" }
]
}Schema Registry Setup
Local Development (Docker)
YAML
# docker-compose.yml
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on: [zookeeper]
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
ports: ["9092:9092"]
schema-registry:
image: confluentinc/cp-schema-registry:7.6.0
depends_on: [kafka]
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
ports: ["8081:8081"]Register a Schema
Bash
# Register schema via REST API
curl -X POST http://localhost:8081/subjects/appointments.appointments.created-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d "{\"schema\": $(cat src/main/avro/AppointmentCreated.avsc | jq -Rsc .)}"
# Get latest schema
curl http://localhost:8081/subjects/appointments.appointments.created-value/versions/latest
# Check compatibility before registering
curl -X POST http://localhost:8081/compatibility/subjects/appointments.appointments.created-value/versions/latest \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d "{\"schema\": $(cat AppointmentCreated_v2.avsc | jq -Rsc .)}"Java: Spring Boot + Avro
Maven Plugin — Generate Java Classes from Avro
XML
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.3</version>
<executions>
<execution>
<goals><goal>schema</goal></goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro</sourceDirectory>
<outputDirectory>${project.build.directory}/generated-sources/avro</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.6.0</version>
</dependency>Configuration
YAML
spring:
kafka:
producer:
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
consumer:
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
specific.avro.reader: true # use generated classes, not GenericRecordProducer with Avro
JAVA
@Service
@RequiredArgsConstructor
public class AppointmentEventPublisher {
private final KafkaTemplate<String, AppointmentCreated> kafkaTemplate;
public void publishCreated(Appointment appointment) {
// AppointmentCreated is the Avro-generated class
AppointmentCreated event = AppointmentCreated.newBuilder()
.setAppointmentId(appointment.getId())
.setClinicId(appointment.getClinic().getId())
.setPatientId(appointment.getPatient().getId())
.setScheduledFor(appointment.getDateTime().toString())
.setType(AppointmentType.valueOf(appointment.getType().name()))
.setNotes(appointment.getNotes())
.setOccurredAt(Instant.now().toEpochMilli())
.build();
kafkaTemplate.send("appointments.appointments.created",
appointment.getClinic().getId(), event);
}
}Consumer with Avro
JAVA
@KafkaListener(topics = "appointments.appointments.created", groupId = "analytics")
public void onAppointmentCreated(AppointmentCreated event) {
// Fully typed — IDE autocomplete works
String clinicId = event.getClinicId().toString();
String type = event.getType().toString();
Long occurredAt = event.getOccurredAt();
analyticsService.record(clinicId, type, Instant.ofEpochMilli(occurredAt));
}Python with Schema Registry
Bash
pip install confluent-kafka[avro]Python
import json
from confluent_kafka import Producer, Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField
# Schema Registry client
schema_registry = SchemaRegistryClient({"url": "http://localhost:8081"})
# Load schema
with open("schemas/AppointmentCreated.avsc") as f:
schema_str = f.read()
avro_serializer = AvroSerializer(
schema_registry,
schema_str,
lambda obj, ctx: obj # dict → dict (no transformation needed)
)
avro_deserializer = AvroDeserializer(
schema_registry,
schema_str,
lambda obj, ctx: obj
)
# Producer
producer = Producer({"bootstrap.servers": "localhost:9092"})
def publish_appointment_created(appointment: dict) -> None:
event = {
"appointment_id": appointment["id"],
"clinic_id": appointment["clinic_id"],
"patient_id": appointment["patient_id"],
"scheduled_for": appointment["datetime"],
"type": appointment["type"].upper(),
"notes": appointment.get("notes"),
"occurred_at": int(time.time() * 1000),
}
producer.produce(
topic = "appointments.appointments.created",
key = appointment["clinic_id"],
value = avro_serializer(
event,
SerializationContext("appointments.appointments.created", MessageField.VALUE)
),
on_delivery = lambda err, msg: print(f"Delivered: {msg.offset()}" if not err else f"Error: {err}")
)
producer.flush()Schema Evolution Rules
Backward Compatible Changes (BACKWARD — the default)
New schema can read data written with the old schema:
- Add a field with a default value ✓
- Remove a field ✓
- Change a field type to a compatible type ✓ (e.g., int → long)
Forward Compatible Changes (FORWARD)
Old schema can read data written with the new schema:
- Add a field ✓ (old consumer ignores unknown fields)
- Remove an optional field ✓
Breaking Changes (never allowed with FULL compatibility)
- Remove a required field with no default
- Rename a field (it's a delete + add, which breaks)
- Change field type to an incompatible type
Evolving the Schema Safely
JSON
// Version 1
{"name": "notes", "type": ["null", "string"], "default": null}
// Version 2 — add new optional field (backward compatible)
{"name": "notes", "type": ["null", "string"], "default": null},
{"name": "appointment_tag","type": ["null", "string"], "default": null}
// Rename (BREAKING — don't do this, add new field instead)
// Instead of renaming "type" to "appointment_type":
{"name": "type", "type": "AppointmentType"},
{"name": "appointment_type","type": ["null", "string"], "default": null}
// Deprecate "type" and migrate consumers over timeSetting Compatibility Per Subject
Bash
# Set FULL_TRANSITIVE — most strict, best for production
curl -X PUT http://localhost:8081/config/appointments.appointments.created-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "FULL_TRANSITIVE"}'| Level | Old reads new | New reads old | |-------|--------------|--------------| | BACKWARD | ✓ | ✗ | | FORWARD | ✗ | ✓ | | FULL | ✓ | ✓ | | FULL_TRANSITIVE | ✓ vs all versions | ✓ vs all versions |