Back to blog
Distributed Systemsadvanced

Kafka Connect: Source & Sink Data Pipelines

Build data pipelines with Kafka Connect — stream database changes with Debezium CDC, sync data to DynamoDB and PostgreSQL with custom sink connectors, manage connectors via REST API, and monitor pipeline health.

LearnixoApril 16, 20264 min read
Kafka ConnectKafkaCDCDebeziumData PipelineDynamoDBPostgreSQL
Share:𝕏

What Is Kafka Connect?

Kafka Connect is a framework for streaming data between Kafka and external systems — without writing producer/consumer code. You configure connectors (not code) to move data.

PostgreSQL → [Source Connector] → Kafka → [Sink Connector] → DynamoDB
S3         → [Source Connector] → Kafka → [Sink Connector] → Elasticsearch

Source connectors pull data into Kafka. Sink connectors push data from Kafka to a target.

Connect handles:

  • Fault tolerance and retries
  • Exactly-once delivery (with compatible connectors)
  • Offset tracking
  • Schema integration with Schema Registry
  • Horizontal scaling

Running Kafka Connect (Docker)

YAML
# docker-compose.yml
  connect:
    image: confluentinc/cp-kafka-connect:7.6.0
    depends_on: [kafka, schema-registry]
    ports: ["8083:8083"]
    environment:
      CONNECT_BOOTSTRAP_SERVERS:          kafka:9092
      CONNECT_REST_PORT:                  8083
      CONNECT_GROUP_ID:                   connect-cluster
      CONNECT_CONFIG_STORAGE_TOPIC:       _connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC:       _connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC:       _connect-status
      CONNECT_KEY_CONVERTER:              org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER:            io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_PLUGIN_PATH:                /usr/share/java,/usr/share/confluent-hub-components
    volumes:
      - ./connect-plugins:/usr/share/confluent-hub-components

Install plugins:

Bash
# Install Debezium PostgreSQL source + DynamoDB sink
confluent-hub install debezium/debezium-connector-postgresql:2.5.0
confluent-hub install confluentinc/kafka-connect-aws-dynamodb:1.2.0

Debezium: CDC from PostgreSQL

Debezium captures every INSERT, UPDATE, DELETE from PostgreSQL's write-ahead log (WAL) and publishes them as Kafka events. This is Change Data Capture (CDC).

Configure PostgreSQL for CDC

SQL
-- postgres.conf
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4
SQL
-- Create replication user
CREATE USER debezium REPLICATION LOGIN PASSWORD 'debezium_password';
GRANT CONNECT ON DATABASE clinic_db TO debezium;
GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;

-- Create publication (publish changes for these tables)
CREATE PUBLICATION dbz_publication FOR TABLE appointments, patients, clinics;

Register Debezium Connector

Bash
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "clinic-pg-source",
    "config": {
      "connector.class":          "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname":        "postgres",
      "database.port":            "5432",
      "database.user":            "debezium",
      "database.password":        "debezium_password",
      "database.dbname":          "clinic_db",
      "database.server.name":     "clinic-pg",
      "table.include.list":       "public.appointments,public.patients",
      "plugin.name":              "pgoutput",
      "publication.name":         "dbz_publication",
      "slot.name":                "dbz_slot",
      "topic.prefix":             "clinic",
      "transforms":               "unwrap",
      "transforms.unwrap.type":   "io.debezium.transforms.ExtractNewRecordState",
      "transforms.unwrap.drop.tombstones": "false",
      "key.converter":            "org.apache.kafka.connect.storage.StringConverter",
      "value.converter":          "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "http://schema-registry:8081"
    }
  }'

CDC events look like:

JSON
{
  "before": null,
  "after": {
    "id":           "APT-001",
    "clinic_id":    "CLN-001",
    "patient_id":   "PAT-001",
    "date_time":    "2026-04-16T10:00:00Z",
    "status":       "SCHEDULED"
  },
  "op":  "c",   // c=create, u=update, d=delete, r=read (snapshot)
  "ts_ms": 1713268800000
}

With ExtractNewRecordState transform, the after field is unwrapped to the top level.


Routing CDC Events with SMTs

Use Single Message Transforms (SMTs) to reshape events in-flight:

JSON
"transforms":                          "route,addTimestamp",
"transforms.route.type":               "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.route.include":            "id,clinic_id,status,date_time",

"transforms.addTimestamp.type":        "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "cdc_timestamp"

DynamoDB Sink Connector

Sync Kafka events to DynamoDB for a serverless read model:

Bash
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "appointments-dynamodb-sink",
    "config": {
      "connector.class":    "io.confluent.connect.aws.dynamodb.DynamoDbSinkConnector",
      "tasks.max":          "3",
      "topics":             "clinic.public.appointments",
      "aws.dynamodb.region":"us-east-1",
      "aws.dynamodb.table.name": "portal-prod",

      "transforms":         "rekey",
      "transforms.rekey.type": "org.apache.kafka.connect.transforms.ValueToKey",
      "transforms.rekey.fields": "id"
    }
  }'

Custom Sink Connector (Java)

When no off-the-shelf connector exists, write one:

JAVA
public class AppointmentAnalyticsSinkConnector extends SinkConnector {

    private Map<String, String> config;

    @Override
    public void start(Map<String, String> props) {
        this.config = props;
    }

    @Override
    public Class<? extends Task> taskClass() {
        return AppointmentAnalyticsSinkTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        return IntStream.range(0, maxTasks)
            .mapToObj(i -> new HashMap<>(config))
            .collect(Collectors.toList());
    }

    @Override
    public ConfigDef config() {
        return new ConfigDef()
            .define("analytics.api.url", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Analytics API URL")
            .define("analytics.api.key", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "API key");
    }
}

public class AppointmentAnalyticsSinkTask extends SinkTask {

    private AnalyticsClient client;

    @Override
    public void start(Map<String, String> props) {
        client = new AnalyticsClient(
            props.get("analytics.api.url"),
            props.get("analytics.api.key")
        );
    }

    @Override
    public void put(Collection<SinkRecord> records) {
        List<AnalyticsEvent> events = records.stream()
            .map(this::toAnalyticsEvent)
            .toList();

        client.batchIngest(events);
        // Connect handles offset commit after put() returns successfully
    }

    private AnalyticsEvent toAnalyticsEvent(SinkRecord record) {
        Map<String, Object> value = (Map<String, Object>) record.value();
        return new AnalyticsEvent(
            (String) value.get("id"),
            (String) value.get("clinic_id"),
            (String) value.get("status"),
            record.timestamp()
        );
    }
}

Managing Connectors via REST API

Bash
# List all connectors
curl http://localhost:8083/connectors

# Check connector status
curl http://localhost:8083/connectors/clinic-pg-source/status

# Pause a connector
curl -X PUT http://localhost:8083/connectors/clinic-pg-source/pause

# Resume
curl -X PUT http://localhost:8083/connectors/clinic-pg-source/resume

# Restart a failed task
curl -X POST http://localhost:8083/connectors/clinic-pg-source/tasks/0/restart

# Update config
curl -X PUT http://localhost:8083/connectors/clinic-pg-source/config \
  -H "Content-Type: application/json" \
  -d '{"table.include.list": "public.appointments,public.patients,public.clinics", ...}'

# Delete
curl -X DELETE http://localhost:8083/connectors/clinic-pg-source

Monitoring Kafka Connect

Key metrics (exposed via JMX / Prometheus JMX exporter):

| Metric | Alert If | |--------|----------| | kafka.connect:type=connector-metrics,connector=*,attribute=connector-status | Not RUNNING | | kafka.connect:type=sink-task-metrics,attribute=offset-commit-failure-percentage | > 0 | | kafka.connect:type=source-task-metrics,attribute=poll-batch-avg-time-ms | > 1000ms | | Consumer lag on connector's internal topics | Growing |

YAML
# Grafana alert
- alert: ConnectorNotRunning
  expr: kafka_connect_connector_status != 1
  for: 2m
  annotations:
    summary: "Kafka Connect connector {{ $labels.connector }} is not running"

Enjoyed this article?

Explore the Distributed Systems learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.