Kafka & Event-Driven Architecture · Lesson 6 of 7
Kafka Connect: Source & Sink Data Pipelines
What Is Kafka Connect?
Kafka Connect is a framework for streaming data between Kafka and external systems — without writing producer/consumer code. You configure connectors (not code) to move data.
PostgreSQL → [Source Connector] → Kafka → [Sink Connector] → DynamoDB
S3 → [Source Connector] → Kafka → [Sink Connector] → ElasticsearchSource connectors pull data into Kafka. Sink connectors push data from Kafka to a target.
Connect handles:
- Fault tolerance and retries
- Exactly-once delivery (with compatible connectors)
- Offset tracking
- Schema integration with Schema Registry
- Horizontal scaling
Running Kafka Connect (Docker)
# docker-compose.yml
connect:
image: confluentinc/cp-kafka-connect:7.6.0
depends_on: [kafka, schema-registry]
ports: ["8083:8083"]
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: connect-cluster
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
volumes:
- ./connect-plugins:/usr/share/confluent-hub-componentsInstall plugins:
# Install Debezium PostgreSQL source + DynamoDB sink
confluent-hub install debezium/debezium-connector-postgresql:2.5.0
confluent-hub install confluentinc/kafka-connect-aws-dynamodb:1.2.0Debezium: CDC from PostgreSQL
Debezium captures every INSERT, UPDATE, DELETE from PostgreSQL's write-ahead log (WAL) and publishes them as Kafka events. This is Change Data Capture (CDC).
Configure PostgreSQL for CDC
-- postgres.conf
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4-- Create replication user
CREATE USER debezium REPLICATION LOGIN PASSWORD 'debezium_password';
GRANT CONNECT ON DATABASE clinic_db TO debezium;
GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
-- Create publication (publish changes for these tables)
CREATE PUBLICATION dbz_publication FOR TABLE appointments, patients, clinics;Register Debezium Connector
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "clinic-pg-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "debezium_password",
"database.dbname": "clinic_db",
"database.server.name": "clinic-pg",
"table.include.list": "public.appointments,public.patients",
"plugin.name": "pgoutput",
"publication.name": "dbz_publication",
"slot.name": "dbz_slot",
"topic.prefix": "clinic",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}'CDC events look like:
{
"before": null,
"after": {
"id": "APT-001",
"clinic_id": "CLN-001",
"patient_id": "PAT-001",
"date_time": "2026-04-16T10:00:00Z",
"status": "SCHEDULED"
},
"op": "c", // c=create, u=update, d=delete, r=read (snapshot)
"ts_ms": 1713268800000
}With ExtractNewRecordState transform, the after field is unwrapped to the top level.
Routing CDC Events with SMTs
Use Single Message Transforms (SMTs) to reshape events in-flight:
"transforms": "route,addTimestamp",
"transforms.route.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.route.include": "id,clinic_id,status,date_time",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "cdc_timestamp"DynamoDB Sink Connector
Sync Kafka events to DynamoDB for a serverless read model:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "appointments-dynamodb-sink",
"config": {
"connector.class": "io.confluent.connect.aws.dynamodb.DynamoDbSinkConnector",
"tasks.max": "3",
"topics": "clinic.public.appointments",
"aws.dynamodb.region":"us-east-1",
"aws.dynamodb.table.name": "portal-prod",
"transforms": "rekey",
"transforms.rekey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.rekey.fields": "id"
}
}'Custom Sink Connector (Java)
When no off-the-shelf connector exists, write one:
public class AppointmentAnalyticsSinkConnector extends SinkConnector {
private Map<String, String> config;
@Override
public void start(Map<String, String> props) {
this.config = props;
}
@Override
public Class<? extends Task> taskClass() {
return AppointmentAnalyticsSinkTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return IntStream.range(0, maxTasks)
.mapToObj(i -> new HashMap<>(config))
.collect(Collectors.toList());
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define("analytics.api.url", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Analytics API URL")
.define("analytics.api.key", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "API key");
}
}
public class AppointmentAnalyticsSinkTask extends SinkTask {
private AnalyticsClient client;
@Override
public void start(Map<String, String> props) {
client = new AnalyticsClient(
props.get("analytics.api.url"),
props.get("analytics.api.key")
);
}
@Override
public void put(Collection<SinkRecord> records) {
List<AnalyticsEvent> events = records.stream()
.map(this::toAnalyticsEvent)
.toList();
client.batchIngest(events);
// Connect handles offset commit after put() returns successfully
}
private AnalyticsEvent toAnalyticsEvent(SinkRecord record) {
Map<String, Object> value = (Map<String, Object>) record.value();
return new AnalyticsEvent(
(String) value.get("id"),
(String) value.get("clinic_id"),
(String) value.get("status"),
record.timestamp()
);
}
}Managing Connectors via REST API
# List all connectors
curl http://localhost:8083/connectors
# Check connector status
curl http://localhost:8083/connectors/clinic-pg-source/status
# Pause a connector
curl -X PUT http://localhost:8083/connectors/clinic-pg-source/pause
# Resume
curl -X PUT http://localhost:8083/connectors/clinic-pg-source/resume
# Restart a failed task
curl -X POST http://localhost:8083/connectors/clinic-pg-source/tasks/0/restart
# Update config
curl -X PUT http://localhost:8083/connectors/clinic-pg-source/config \
-H "Content-Type: application/json" \
-d '{"table.include.list": "public.appointments,public.patients,public.clinics", ...}'
# Delete
curl -X DELETE http://localhost:8083/connectors/clinic-pg-sourceMonitoring Kafka Connect
Key metrics (exposed via JMX / Prometheus JMX exporter):
| Metric | Alert If |
|--------|----------|
| kafka.connect:type=connector-metrics,connector=*,attribute=connector-status | Not RUNNING |
| kafka.connect:type=sink-task-metrics,attribute=offset-commit-failure-percentage | > 0 |
| kafka.connect:type=source-task-metrics,attribute=poll-batch-avg-time-ms | > 1000ms |
| Consumer lag on connector's internal topics | Growing |
# Grafana alert
- alert: ConnectorNotRunning
expr: kafka_connect_connector_status != 1
for: 2m
annotations:
summary: "Kafka Connect connector {{ $labels.connector }} is not running"