skills/anton-abyzov/specweave/confluent-kafka-connect

confluent-kafka-connect

SKILL.md

Confluent Kafka Connect Skill

Expert knowledge of Kafka Connect for building data pipelines with source and sink connectors.

What I Know

Connector Types

Source Connectors (External System → Kafka):

  • JDBC Source: Databases → Kafka
  • Debezium: CDC (MySQL, PostgreSQL, MongoDB) → Kafka
  • S3 Source: AWS S3 files → Kafka
  • File Source: Local files → Kafka

Sink Connectors (Kafka → External System):

  • JDBC Sink: Kafka → Databases
  • Elasticsearch Sink: Kafka → Elasticsearch
  • S3 Sink: Kafka → AWS S3
  • HDFS Sink: Kafka → Hadoop HDFS

Single Message Transforms (SMTs):

  • Field operations: Insert, Mask, Replace, TimestampConverter
  • Routing: RegexRouter, TimestampRouter
  • Filtering: Filter, Predicates

When to Use This Skill

Activate me when you need help with:

  • Connector setup ("Configure JDBC connector")
  • CDC patterns ("Debezium MySQL CDC")
  • Data pipelines ("Stream database changes to Kafka")
  • SMT transforms ("Mask sensitive fields")
  • Connector troubleshooting ("Connector task failed")

Common Patterns

Pattern 1: JDBC Source (Database → Kafka)

Use Case: Stream database table changes to Kafka

Configuration:

{
  "name": "jdbc-source-users",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:postgresql://localhost:5432/mydb",
    "connection.user": "postgres",
    "connection.password": "password",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "topic.prefix": "postgres-",
    "table.whitelist": "users,orders",
    "poll.interval.ms": "5000"
  }
}

Modes:

  • incrementing: Track by auto-increment ID
  • timestamp: Track by timestamp column
  • timestamp+incrementing: Both (most reliable)

Pattern 2: Debezium CDC (MySQL → Kafka)

Use Case: Capture all database changes (INSERT/UPDATE/DELETE)

Configuration:

{
  "name": "debezium-mysql-cdc",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "password",
    "database.server.id": "1",
    "database.server.name": "mysql",
    "database.include.list": "mydb",
    "table.include.list": "mydb.users,mydb.orders",
    "schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
    "schema.history.internal.kafka.topic": "schema-changes.mydb"
  }
}

Output Format (Debezium Envelope):

{
  "before": null,
  "after": {
    "id": 1,
    "name": "John Doe",
    "email": "john@example.com"
  },
  "source": {
    "version": "1.9.0",
    "connector": "mysql",
    "name": "mysql",
    "ts_ms": 1620000000000,
    "snapshot": "false",
    "db": "mydb",
    "table": "users",
    "server_id": 1,
    "gtid": null,
    "file": "mysql-bin.000001",
    "pos": 12345,
    "row": 0,
    "thread": null,
    "query": null
  },
  "op": "c",  // c=CREATE, u=UPDATE, d=DELETE, r=READ
  "ts_ms": 1620000000000
}

Pattern 3: JDBC Sink (Kafka → Database)

Use Case: Write Kafka events to PostgreSQL

Configuration:

{
  "name": "jdbc-sink-enriched-orders",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "3",
    "topics": "enriched-orders",
    "connection.url": "jdbc:postgresql://localhost:5432/analytics",
    "connection.user": "postgres",
    "connection.password": "password",
    "auto.create": "true",
    "auto.evolve": "true",
    "insert.mode": "upsert",
    "pk.mode": "record_value",
    "pk.fields": "order_id",
    "table.name.format": "orders_${topic}"
  }
}

Insert Modes:

  • insert: Append only (fails on duplicate)
  • update: Update only (requires PK)
  • upsert: INSERT or UPDATE (recommended)

Pattern 4: S3 Sink (Kafka → AWS S3)

Use Case: Archive Kafka topics to S3

Configuration:

{
  "name": "s3-sink-events",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "3",
    "topics": "user-events,order-events",
    "s3.region": "us-east-1",
    "s3.bucket.name": "my-kafka-archive",
    "s3.part.size": "5242880",
    "flush.size": "1000",
    "rotate.interval.ms": "60000",
    "rotate.schedule.interval.ms": "3600000",
    "timezone": "UTC",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "locale": "US",
    "timestamp.extractor": "Record"
  }
}

Partitioning (S3 folder structure):

s3://my-kafka-archive/
  topics/user-events/year=2025/month=01/day=15/hour=10/
    user-events+0+0000000000.json
    user-events+0+0000001000.json
  topics/order-events/year=2025/month=01/day=15/hour=10/
    order-events+0+0000000000.json

Pattern 5: Elasticsearch Sink (Kafka → Elasticsearch)

Use Case: Index Kafka events for search

Configuration:

{
  "name": "elasticsearch-sink-logs",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "3",
    "topics": "application-logs",
    "connection.url": "http://localhost:9200",
    "connection.username": "elastic",
    "connection.password": "password",
    "key.ignore": "true",
    "schema.ignore": "true",
    "type.name": "_doc",
    "index.write.wait_for_active_shards": "1"
  }
}

Single Message Transforms (SMTs)

Transform 1: Mask Sensitive Fields

Use Case: Hide email/phone in Kafka topics

Configuration:

{
  "transforms": "maskEmail",
  "transforms.maskEmail.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.maskEmail.fields": "email,phone"
}

Before:

{"id": 1, "name": "John", "email": "john@example.com", "phone": "555-1234"}

After:

{"id": 1, "name": "John", "email": null, "phone": null}

Transform 2: Add Timestamp

Use Case: Add processing timestamp to all messages

Configuration:

{
  "transforms": "insertTimestamp",
  "transforms.insertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.insertTimestamp.timestamp.field": "processed_at"
}

Transform 3: Route by Field Value

Use Case: Route high-value orders to separate topic

Configuration:

{
  "transforms": "routeByValue",
  "transforms.routeByValue.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.routeByValue.regex": "(.*)",
  "transforms.routeByValue.replacement": "$1-high-value",
  "transforms.routeByValue.predicate": "isHighValue",
  "predicates": "isHighValue",
  "predicates.isHighValue.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
  "predicates.isHighValue.pattern": "orders"
}

Transform 4: Flatten Nested JSON

Use Case: Flatten nested structures for JDBC sink

Configuration:

{
  "transforms": "flatten",
  "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
  "transforms.flatten.delimiter": "_"
}

Before:

{
  "user": {
    "id": 1,
    "profile": {
      "name": "John",
      "email": "john@example.com"
    }
  }
}

After:

{
  "user_id": 1,
  "user_profile_name": "John",
  "user_profile_email": "john@example.com"
}

Best Practices

1. Use Idempotent Connectors

DO:

// JDBC Sink with upsert mode
{
  "insert.mode": "upsert",
  "pk.mode": "record_value",
  "pk.fields": "id"
}

DON'T:

// WRONG: insert mode (duplicates on restart!)
{
  "insert.mode": "insert"
}

2. Monitor Connector Status

# Check connector status
curl http://localhost:8083/connectors/jdbc-source-users/status

# Check task status
curl http://localhost:8083/connectors/jdbc-source-users/tasks/0/status

3. Use Schema Registry

DO:

{
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://localhost:8081"
}

4. Configure Error Handling

{
  "errors.tolerance": "all",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "errors.deadletterqueue.topic.name": "dlq-jdbc-sink",
  "errors.deadletterqueue.context.headers.enable": "true"
}

Connector Management

Deploy Connector

# Create connector via REST API
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @jdbc-source.json

# Update connector
curl -X PUT http://localhost:8083/connectors/jdbc-source-users/config \
  -H "Content-Type: application/json" \
  -d @jdbc-source.json

Monitor Connectors

# List all connectors
curl http://localhost:8083/connectors

# Get connector info
curl http://localhost:8083/connectors/jdbc-source-users

# Get connector status
curl http://localhost:8083/connectors/jdbc-source-users/status

# Get connector tasks
curl http://localhost:8083/connectors/jdbc-source-users/tasks

Pause/Resume Connectors

# Pause connector
curl -X PUT http://localhost:8083/connectors/jdbc-source-users/pause

# Resume connector
curl -X PUT http://localhost:8083/connectors/jdbc-source-users/resume

# Restart connector
curl -X POST http://localhost:8083/connectors/jdbc-source-users/restart

# Restart task
curl -X POST http://localhost:8083/connectors/jdbc-source-users/tasks/0/restart

Common Issues & Solutions

Issue 1: Connector Task Failed

Symptoms: Task state = FAILED

Solutions:

  1. Check connector logs: docker logs connect-worker
  2. Validate configuration: curl http://localhost:8083/connector-plugins/<class>/config/validate
  3. Restart task: curl -X POST .../tasks/0/restart

Issue 2: Schema Evolution Error

Error: Incompatible schema detected

Solution: Enable auto-evolution:

{
  "auto.create": "true",
  "auto.evolve": "true"
}

Issue 3: JDBC Connection Pool Exhausted

Error: Could not get JDBC connection

Solution: Increase pool size:

{
  "connection.attempts": "3",
  "connection.backoff.ms": "10000"
}

References


Invoke me when you need Kafka Connect, connectors, CDC, or data pipeline expertise!

Weekly Installs
11
Installed on
claude-code10
cursor8
opencode7
codex7
antigravity7
gemini-cli7