confluent-kafka-connect
Confluent Kafka Connect Skill
Expert knowledge of Kafka Connect for building data pipelines with source and sink connectors.
What I Know
Connector Types
Source Connectors (External System → Kafka):
- JDBC Source: Databases → Kafka
- Debezium: CDC (MySQL, PostgreSQL, MongoDB) → Kafka
- S3 Source: AWS S3 files → Kafka
- File Source: Local files → Kafka
Sink Connectors (Kafka → External System):
- JDBC Sink: Kafka → Databases
- Elasticsearch Sink: Kafka → Elasticsearch
- S3 Sink: Kafka → AWS S3
- HDFS Sink: Kafka → Hadoop HDFS
Single Message Transforms (SMTs):
- Field operations: Insert, Mask, Replace, TimestampConverter
- Routing: RegexRouter, TimestampRouter
- Filtering: Filter, Predicates
When to Use This Skill
Activate me when you need help with:
- Connector setup ("Configure JDBC connector")
- CDC patterns ("Debezium MySQL CDC")
- Data pipelines ("Stream database changes to Kafka")
- SMT transforms ("Mask sensitive fields")
- Connector troubleshooting ("Connector task failed")
Common Patterns
Pattern 1: JDBC Source (Database → Kafka)
Use Case: Stream database table changes to Kafka
Configuration:
{
"name": "jdbc-source-users",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://localhost:5432/mydb",
"connection.user": "postgres",
"connection.password": "password",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "postgres-",
"table.whitelist": "users,orders",
"poll.interval.ms": "5000"
}
}
Modes:
incrementing: Track by auto-increment IDtimestamp: Track by timestamp columntimestamp+incrementing: Both (most reliable)
Pattern 2: Debezium CDC (MySQL → Kafka)
Use Case: Capture all database changes (INSERT/UPDATE/DELETE)
Configuration:
{
"name": "debezium-mysql-cdc",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "1",
"database.server.name": "mysql",
"database.include.list": "mydb",
"table.include.list": "mydb.users,mydb.orders",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"schema.history.internal.kafka.topic": "schema-changes.mydb"
}
}
Output Format (Debezium Envelope):
{
"before": null,
"after": {
"id": 1,
"name": "John Doe",
"email": "john@example.com"
},
"source": {
"version": "1.9.0",
"connector": "mysql",
"name": "mysql",
"ts_ms": 1620000000000,
"snapshot": "false",
"db": "mydb",
"table": "users",
"server_id": 1,
"gtid": null,
"file": "mysql-bin.000001",
"pos": 12345,
"row": 0,
"thread": null,
"query": null
},
"op": "c", // c=CREATE, u=UPDATE, d=DELETE, r=READ
"ts_ms": 1620000000000
}
Pattern 3: JDBC Sink (Kafka → Database)
Use Case: Write Kafka events to PostgreSQL
Configuration:
{
"name": "jdbc-sink-enriched-orders",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "3",
"topics": "enriched-orders",
"connection.url": "jdbc:postgresql://localhost:5432/analytics",
"connection.user": "postgres",
"connection.password": "password",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.fields": "order_id",
"table.name.format": "orders_${topic}"
}
}
Insert Modes:
insert: Append only (fails on duplicate)update: Update only (requires PK)upsert: INSERT or UPDATE (recommended)
Pattern 4: S3 Sink (Kafka → AWS S3)
Use Case: Archive Kafka topics to S3
Configuration:
{
"name": "s3-sink-events",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "3",
"topics": "user-events,order-events",
"s3.region": "us-east-1",
"s3.bucket.name": "my-kafka-archive",
"s3.part.size": "5242880",
"flush.size": "1000",
"rotate.interval.ms": "60000",
"rotate.schedule.interval.ms": "3600000",
"timezone": "UTC",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "US",
"timestamp.extractor": "Record"
}
}
Partitioning (S3 folder structure):
s3://my-kafka-archive/
topics/user-events/year=2025/month=01/day=15/hour=10/
user-events+0+0000000000.json
user-events+0+0000001000.json
topics/order-events/year=2025/month=01/day=15/hour=10/
order-events+0+0000000000.json
Pattern 5: Elasticsearch Sink (Kafka → Elasticsearch)
Use Case: Index Kafka events for search
Configuration:
{
"name": "elasticsearch-sink-logs",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "3",
"topics": "application-logs",
"connection.url": "http://localhost:9200",
"connection.username": "elastic",
"connection.password": "password",
"key.ignore": "true",
"schema.ignore": "true",
"type.name": "_doc",
"index.write.wait_for_active_shards": "1"
}
}
Single Message Transforms (SMTs)
Transform 1: Mask Sensitive Fields
Use Case: Hide email/phone in Kafka topics
Configuration:
{
"transforms": "maskEmail",
"transforms.maskEmail.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskEmail.fields": "email,phone"
}
Before:
{"id": 1, "name": "John", "email": "john@example.com", "phone": "555-1234"}
After:
{"id": 1, "name": "John", "email": null, "phone": null}
Transform 2: Add Timestamp
Use Case: Add processing timestamp to all messages
Configuration:
{
"transforms": "insertTimestamp",
"transforms.insertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTimestamp.timestamp.field": "processed_at"
}
Transform 3: Route by Field Value
Use Case: Route high-value orders to separate topic
Configuration:
{
"transforms": "routeByValue",
"transforms.routeByValue.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.routeByValue.regex": "(.*)",
"transforms.routeByValue.replacement": "$1-high-value",
"transforms.routeByValue.predicate": "isHighValue",
"predicates": "isHighValue",
"predicates.isHighValue.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHighValue.pattern": "orders"
}
Transform 4: Flatten Nested JSON
Use Case: Flatten nested structures for JDBC sink
Configuration:
{
"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_"
}
Before:
{
"user": {
"id": 1,
"profile": {
"name": "John",
"email": "john@example.com"
}
}
}
After:
{
"user_id": 1,
"user_profile_name": "John",
"user_profile_email": "john@example.com"
}
Best Practices
1. Use Idempotent Connectors
✅ DO:
// JDBC Sink with upsert mode
{
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.fields": "id"
}
❌ DON'T:
// WRONG: insert mode (duplicates on restart!)
{
"insert.mode": "insert"
}
2. Monitor Connector Status
# Check connector status
curl http://localhost:8083/connectors/jdbc-source-users/status
# Check task status
curl http://localhost:8083/connectors/jdbc-source-users/tasks/0/status
3. Use Schema Registry
✅ DO:
{
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081"
}
4. Configure Error Handling
{
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": "dlq-jdbc-sink",
"errors.deadletterqueue.context.headers.enable": "true"
}
Connector Management
Deploy Connector
# Create connector via REST API
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @jdbc-source.json
# Update connector
curl -X PUT http://localhost:8083/connectors/jdbc-source-users/config \
-H "Content-Type: application/json" \
-d @jdbc-source.json
Monitor Connectors
# List all connectors
curl http://localhost:8083/connectors
# Get connector info
curl http://localhost:8083/connectors/jdbc-source-users
# Get connector status
curl http://localhost:8083/connectors/jdbc-source-users/status
# Get connector tasks
curl http://localhost:8083/connectors/jdbc-source-users/tasks
Pause/Resume Connectors
# Pause connector
curl -X PUT http://localhost:8083/connectors/jdbc-source-users/pause
# Resume connector
curl -X PUT http://localhost:8083/connectors/jdbc-source-users/resume
# Restart connector
curl -X POST http://localhost:8083/connectors/jdbc-source-users/restart
# Restart task
curl -X POST http://localhost:8083/connectors/jdbc-source-users/tasks/0/restart
Common Issues & Solutions
Issue 1: Connector Task Failed
Symptoms: Task state = FAILED
Solutions:
- Check connector logs:
docker logs connect-worker - Validate configuration:
curl http://localhost:8083/connector-plugins/<class>/config/validate - Restart task:
curl -X POST .../tasks/0/restart
Issue 2: Schema Evolution Error
Error: Incompatible schema detected
Solution: Enable auto-evolution:
{
"auto.create": "true",
"auto.evolve": "true"
}
Issue 3: JDBC Connection Pool Exhausted
Error: Could not get JDBC connection
Solution: Increase pool size:
{
"connection.attempts": "3",
"connection.backoff.ms": "10000"
}
References
- Kafka Connect Documentation: https://kafka.apache.org/documentation/#connect
- Confluent Hub: https://www.confluent.io/hub/
- Debezium Documentation: https://debezium.io/documentation/
- Transform Reference: https://kafka.apache.org/documentation/#connect_transforms
Invoke me when you need Kafka Connect, connectors, CDC, or data pipeline expertise!
More from anton-abyzov/specweave
technical-writing
Technical writing expert for API documentation, README files, tutorials, changelog management, and developer documentation. Covers style guides, information architecture, versioning docs, OpenAPI/Swagger, and documentation-as-code. Activates for technical writing, API docs, README, changelog, tutorial writing, documentation, technical communication, style guide, OpenAPI, Swagger, developer docs.
45spec-driven-brainstorming
Spec-driven brainstorming and product discovery expert. Helps teams ideate features, break down epics, conduct story mapping sessions, prioritize using MoSCoW/RICE/Kano, and validate ideas with lean startup methods. Activates for brainstorming, product discovery, story mapping, feature ideation, prioritization, MoSCoW, RICE, Kano model, lean startup, MVP definition, product backlog, feature breakdown.
43kafka-architecture
Apache Kafka architecture expert for cluster design, capacity planning, and high availability. Use when designing Kafka clusters, choosing partition strategies, or sizing brokers for production workloads.
34docusaurus
Docusaurus 3.x documentation framework - MDX authoring, theming, versioning, i18n. Use for documentation sites or spec-weave.com.
29frontend
Expert frontend developer for React, Vue, Angular, and modern JavaScript/TypeScript. Use when creating components, implementing hooks, handling state management, or building responsive web interfaces. Covers React 18+ features, custom hooks, form handling, and accessibility best practices.
29reflect
Self-improving AI memory system that persists learnings across sessions in CLAUDE.md. Use when capturing corrections, remembering user preferences, or extracting patterns from successful implementations. Enables continual learning without starting from zero each conversation.
27