Kafka Connect: Source & Sink Data Pipelines
Build data pipelines with Kafka Connect — stream database changes with Debezium CDC, sync data to DynamoDB and PostgreSQL with custom sink connectors, manage connectors via REST API, and monitor pipeline health.
What Is Kafka Connect?
Kafka Connect is a framework for streaming data between Kafka and external systems — without writing producer/consumer code. You configure connectors (not code) to move data.
PostgreSQL → [Source Connector] → Kafka → [Sink Connector] → DynamoDB
S3 → [Source Connector] → Kafka → [Sink Connector] → ElasticsearchSource connectors pull data into Kafka. Sink connectors push data from Kafka to a target.
Connect handles:
- Fault tolerance and retries
- Exactly-once delivery (with compatible connectors)
- Offset tracking
- Schema integration with Schema Registry
- Horizontal scaling
Running Kafka Connect (Docker)
# docker-compose.yml
connect:
image: confluentinc/cp-kafka-connect:7.6.0
depends_on: [kafka, schema-registry]
ports: ["8083:8083"]
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: connect-cluster
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
volumes:
- ./connect-plugins:/usr/share/confluent-hub-componentsInstall plugins:
# Install Debezium PostgreSQL source + DynamoDB sink
confluent-hub install debezium/debezium-connector-postgresql:2.5.0
confluent-hub install confluentinc/kafka-connect-aws-dynamodb:1.2.0Debezium: CDC from PostgreSQL
Debezium captures every INSERT, UPDATE, DELETE from PostgreSQL's write-ahead log (WAL) and publishes them as Kafka events. This is Change Data Capture (CDC).
Configure PostgreSQL for CDC
-- postgres.conf
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4-- Create replication user
CREATE USER debezium REPLICATION LOGIN PASSWORD 'debezium_password';
GRANT CONNECT ON DATABASE clinic_db TO debezium;
GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
-- Create publication (publish changes for these tables)
CREATE PUBLICATION dbz_publication FOR TABLE appointments, patients, clinics;Register Debezium Connector
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "clinic-pg-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "debezium_password",
"database.dbname": "clinic_db",
"database.server.name": "clinic-pg",
"table.include.list": "public.appointments,public.patients",
"plugin.name": "pgoutput",
"publication.name": "dbz_publication",
"slot.name": "dbz_slot",
"topic.prefix": "clinic",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}'CDC events look like:
{
"before": null,
"after": {
"id": "APT-001",
"clinic_id": "CLN-001",
"patient_id": "PAT-001",
"date_time": "2026-04-16T10:00:00Z",
"status": "SCHEDULED"
},
"op": "c", // c=create, u=update, d=delete, r=read (snapshot)
"ts_ms": 1713268800000
}With ExtractNewRecordState transform, the after field is unwrapped to the top level.
Routing CDC Events with SMTs
Use Single Message Transforms (SMTs) to reshape events in-flight:
"transforms": "route,addTimestamp",
"transforms.route.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.route.include": "id,clinic_id,status,date_time",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "cdc_timestamp"DynamoDB Sink Connector
Sync Kafka events to DynamoDB for a serverless read model:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "appointments-dynamodb-sink",
"config": {
"connector.class": "io.confluent.connect.aws.dynamodb.DynamoDbSinkConnector",
"tasks.max": "3",
"topics": "clinic.public.appointments",
"aws.dynamodb.region":"us-east-1",
"aws.dynamodb.table.name": "portal-prod",
"transforms": "rekey",
"transforms.rekey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.rekey.fields": "id"
}
}'Custom Sink Connector (Java)
When no off-the-shelf connector exists, write one:
public class AppointmentAnalyticsSinkConnector extends SinkConnector {
private Map<String, String> config;
@Override
public void start(Map<String, String> props) {
this.config = props;
}
@Override
public Class<? extends Task> taskClass() {
return AppointmentAnalyticsSinkTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return IntStream.range(0, maxTasks)
.mapToObj(i -> new HashMap<>(config))
.collect(Collectors.toList());
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define("analytics.api.url", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Analytics API URL")
.define("analytics.api.key", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "API key");
}
}
public class AppointmentAnalyticsSinkTask extends SinkTask {
private AnalyticsClient client;
@Override
public void start(Map<String, String> props) {
client = new AnalyticsClient(
props.get("analytics.api.url"),
props.get("analytics.api.key")
);
}
@Override
public void put(Collection<SinkRecord> records) {
List<AnalyticsEvent> events = records.stream()
.map(this::toAnalyticsEvent)
.toList();
client.batchIngest(events);
// Connect handles offset commit after put() returns successfully
}
private AnalyticsEvent toAnalyticsEvent(SinkRecord record) {
Map<String, Object> value = (Map<String, Object>) record.value();
return new AnalyticsEvent(
(String) value.get("id"),
(String) value.get("clinic_id"),
(String) value.get("status"),
record.timestamp()
);
}
}Managing Connectors via REST API
# List all connectors
curl http://localhost:8083/connectors
# Check connector status
curl http://localhost:8083/connectors/clinic-pg-source/status
# Pause a connector
curl -X PUT http://localhost:8083/connectors/clinic-pg-source/pause
# Resume
curl -X PUT http://localhost:8083/connectors/clinic-pg-source/resume
# Restart a failed task
curl -X POST http://localhost:8083/connectors/clinic-pg-source/tasks/0/restart
# Update config
curl -X PUT http://localhost:8083/connectors/clinic-pg-source/config \
-H "Content-Type: application/json" \
-d '{"table.include.list": "public.appointments,public.patients,public.clinics", ...}'
# Delete
curl -X DELETE http://localhost:8083/connectors/clinic-pg-sourceMonitoring Kafka Connect
Key metrics (exposed via JMX / Prometheus JMX exporter):
| Metric | Alert If |
|--------|----------|
| kafka.connect:type=connector-metrics,connector=*,attribute=connector-status | Not RUNNING |
| kafka.connect:type=sink-task-metrics,attribute=offset-commit-failure-percentage | > 0 |
| kafka.connect:type=source-task-metrics,attribute=poll-batch-avg-time-ms | > 1000ms |
| Consumer lag on connector's internal topics | Growing |
# Grafana alert
- alert: ConnectorNotRunning
expr: kafka_connect_connector_status != 1
for: 2m
annotations:
summary: "Kafka Connect connector {{ $labels.connector }} is not running"Enjoyed this article?
Explore the Distributed Systems learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.