Kafka & Event-Driven Architecture · Lesson 6 of 7

Kafka Connect: Source & Sink Data Pipelines

What Is Kafka Connect?

Kafka Connect is a framework for streaming data between Kafka and external systems — without writing producer/consumer code. You configure connectors (not code) to move data.

PostgreSQL → [Source Connector] → Kafka → [Sink Connector] → DynamoDB
S3         → [Source Connector] → Kafka → [Sink Connector] → Elasticsearch

Source connectors pull data into Kafka. Sink connectors push data from Kafka to a target.

Connect handles:

  • Fault tolerance and retries
  • Exactly-once delivery (with compatible connectors)
  • Offset tracking
  • Schema integration with Schema Registry
  • Horizontal scaling

Running Kafka Connect (Docker)

YAML
# docker-compose.yml
  connect:
    image: confluentinc/cp-kafka-connect:7.6.0
    depends_on: [kafka, schema-registry]
    ports: ["8083:8083"]
    environment:
      CONNECT_BOOTSTRAP_SERVERS:          kafka:9092
      CONNECT_REST_PORT:                  8083
      CONNECT_GROUP_ID:                   connect-cluster
      CONNECT_CONFIG_STORAGE_TOPIC:       _connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC:       _connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC:       _connect-status
      CONNECT_KEY_CONVERTER:              org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER:            io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_PLUGIN_PATH:                /usr/share/java,/usr/share/confluent-hub-components
    volumes:
      - ./connect-plugins:/usr/share/confluent-hub-components

Install plugins:

Bash
# Install Debezium PostgreSQL source + DynamoDB sink
confluent-hub install debezium/debezium-connector-postgresql:2.5.0
confluent-hub install confluentinc/kafka-connect-aws-dynamodb:1.2.0

Debezium: CDC from PostgreSQL

Debezium captures every INSERT, UPDATE, DELETE from PostgreSQL's write-ahead log (WAL) and publishes them as Kafka events. This is Change Data Capture (CDC).

Configure PostgreSQL for CDC

SQL
-- postgres.conf
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4
SQL
-- Create replication user
CREATE USER debezium REPLICATION LOGIN PASSWORD 'debezium_password';
GRANT CONNECT ON DATABASE clinic_db TO debezium;
GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;

-- Create publication (publish changes for these tables)
CREATE PUBLICATION dbz_publication FOR TABLE appointments, patients, clinics;

Register Debezium Connector

Bash
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "clinic-pg-source",
    "config": {
      "connector.class":          "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname":        "postgres",
      "database.port":            "5432",
      "database.user":            "debezium",
      "database.password":        "debezium_password",
      "database.dbname":          "clinic_db",
      "database.server.name":     "clinic-pg",
      "table.include.list":       "public.appointments,public.patients",
      "plugin.name":              "pgoutput",
      "publication.name":         "dbz_publication",
      "slot.name":                "dbz_slot",
      "topic.prefix":             "clinic",
      "transforms":               "unwrap",
      "transforms.unwrap.type":   "io.debezium.transforms.ExtractNewRecordState",
      "transforms.unwrap.drop.tombstones": "false",
      "key.converter":            "org.apache.kafka.connect.storage.StringConverter",
      "value.converter":          "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "http://schema-registry:8081"
    }
  }'

CDC events look like:

JSON
{
  "before": null,
  "after": {
    "id":           "APT-001",
    "clinic_id":    "CLN-001",
    "patient_id":   "PAT-001",
    "date_time":    "2026-04-16T10:00:00Z",
    "status":       "SCHEDULED"
  },
  "op":  "c",   // c=create, u=update, d=delete, r=read (snapshot)
  "ts_ms": 1713268800000
}

With ExtractNewRecordState transform, the after field is unwrapped to the top level.


Routing CDC Events with SMTs

Use Single Message Transforms (SMTs) to reshape events in-flight:

JSON
"transforms":                          "route,addTimestamp",
"transforms.route.type":               "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.route.include":            "id,clinic_id,status,date_time",

"transforms.addTimestamp.type":        "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "cdc_timestamp"

DynamoDB Sink Connector

Sync Kafka events to DynamoDB for a serverless read model:

Bash
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "appointments-dynamodb-sink",
    "config": {
      "connector.class":    "io.confluent.connect.aws.dynamodb.DynamoDbSinkConnector",
      "tasks.max":          "3",
      "topics":             "clinic.public.appointments",
      "aws.dynamodb.region":"us-east-1",
      "aws.dynamodb.table.name": "portal-prod",

      "transforms":         "rekey",
      "transforms.rekey.type": "org.apache.kafka.connect.transforms.ValueToKey",
      "transforms.rekey.fields": "id"
    }
  }'

Custom Sink Connector (Java)

When no off-the-shelf connector exists, write one:

JAVA
public class AppointmentAnalyticsSinkConnector extends SinkConnector {

    private Map<String, String> config;

    @Override
    public void start(Map<String, String> props) {
        this.config = props;
    }

    @Override
    public Class<? extends Task> taskClass() {
        return AppointmentAnalyticsSinkTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        return IntStream.range(0, maxTasks)
            .mapToObj(i -> new HashMap<>(config))
            .collect(Collectors.toList());
    }

    @Override
    public ConfigDef config() {
        return new ConfigDef()
            .define("analytics.api.url", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Analytics API URL")
            .define("analytics.api.key", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "API key");
    }
}

public class AppointmentAnalyticsSinkTask extends SinkTask {

    private AnalyticsClient client;

    @Override
    public void start(Map<String, String> props) {
        client = new AnalyticsClient(
            props.get("analytics.api.url"),
            props.get("analytics.api.key")
        );
    }

    @Override
    public void put(Collection<SinkRecord> records) {
        List<AnalyticsEvent> events = records.stream()
            .map(this::toAnalyticsEvent)
            .toList();

        client.batchIngest(events);
        // Connect handles offset commit after put() returns successfully
    }

    private AnalyticsEvent toAnalyticsEvent(SinkRecord record) {
        Map<String, Object> value = (Map<String, Object>) record.value();
        return new AnalyticsEvent(
            (String) value.get("id"),
            (String) value.get("clinic_id"),
            (String) value.get("status"),
            record.timestamp()
        );
    }
}

Managing Connectors via REST API

Bash
# List all connectors
curl http://localhost:8083/connectors

# Check connector status
curl http://localhost:8083/connectors/clinic-pg-source/status

# Pause a connector
curl -X PUT http://localhost:8083/connectors/clinic-pg-source/pause

# Resume
curl -X PUT http://localhost:8083/connectors/clinic-pg-source/resume

# Restart a failed task
curl -X POST http://localhost:8083/connectors/clinic-pg-source/tasks/0/restart

# Update config
curl -X PUT http://localhost:8083/connectors/clinic-pg-source/config \
  -H "Content-Type: application/json" \
  -d '{"table.include.list": "public.appointments,public.patients,public.clinics", ...}'

# Delete
curl -X DELETE http://localhost:8083/connectors/clinic-pg-source

Monitoring Kafka Connect

Key metrics (exposed via JMX / Prometheus JMX exporter):

| Metric | Alert If | |--------|----------| | kafka.connect:type=connector-metrics,connector=*,attribute=connector-status | Not RUNNING | | kafka.connect:type=sink-task-metrics,attribute=offset-commit-failure-percentage | > 0 | | kafka.connect:type=source-task-metrics,attribute=poll-batch-avg-time-ms | > 1000ms | | Consumer lag on connector's internal topics | Growing |

YAML
# Grafana alert
- alert: ConnectorNotRunning
  expr: kafka_connect_connector_status != 1
  for: 2m
  annotations:
    summary: "Kafka Connect connector {{ $labels.connector }} is not running"