skills/dawiddutoit/custom-claude/kafka-integration-testing

kafka-integration-testing

SKILL.md

Kafka Integration Testing

Purpose

Write production-grade integration tests for Kafka producers and consumers using testcontainers. Covers setting up temporary test brokers, testing producer/consumer workflows, verifying message ordering guarantees, testing error scenarios, and validating delivery semantics without mocking external services.

When to Use This Skill

Use when testing Kafka producer/consumer workflows end-to-end with "test Kafka integration", "verify message ordering", "test Kafka roundtrip", or "validate exactly-once semantics".

Do NOT use for unit testing with mocked Kafka (use pytest-adapter-integration-testing), implementing producers/consumers (use respective kafka-*-implementation skills), or schema validation (use kafka-schema-management).

Quick Start

Create a producer/consumer round-trip test in 3 steps:

  1. Add dependency:
pip install testcontainers[kafka]>=4.0.0
  1. Write test:
import pytest
from testcontainers.kafka import KafkaContainer
from app.extraction.adapters.kafka.producer import OrderEventPublisher
from app.storage.adapters.kafka.consumer import OrderEventConsumer

@pytest.fixture
def kafka_container():
    with KafkaContainer() as kafka:
        yield kafka

def test_producer_consumer_roundtrip(kafka_container):
    brokers = [kafka_container.get_bootstrap_server()]

    # Produce
    publisher = OrderEventPublisher(brokers, "test-orders")
    event = OrderEventMessage(order_id="test_123", ...)
    publisher.publish_order(event)
    publisher.flush()

    # Consume
    consumer = OrderEventConsumer(brokers, "test-orders", "test-group")
    message = consumer.consume(timeout=5.0)

    assert message is not None
    assert message.order_id == "test_123"
  1. Run:
pytest tests/integration/test_kafka_roundtrip.py -v

Implementation Steps

1. Set Up Test Environment

Configure pytest fixtures for Kafka container management:

# tests/integration/conftest.py
import pytest
from testcontainers.kafka import KafkaContainer

@pytest.fixture(scope="function")
def kafka_container() -> KafkaContainer:
    """Start isolated Kafka container for each test."""
    container = KafkaContainer()
    container.start()

    try:
        import time
        time.sleep(2)  # Give broker time to become ready
        yield container
    finally:
        container.stop()

@pytest.fixture
def kafka_brokers(kafka_container: KafkaContainer) -> list[str]:
    """Get broker addresses."""
    return [kafka_container.get_bootstrap_server()]

2. Test Producer Functionality

def test_publisher_publishes_message_to_kafka(kafka_brokers: list[str]) -> None:
    """Test publisher successfully publishes message."""
    publisher = OrderEventPublisher(brokers=kafka_brokers, topic="orders")

    event = OrderEventMessage(order_id="order_123", ...)
    publisher.publish_order(event)
    publisher.flush()

3. Test Consumer Functionality

def test_consumer_receives_published_message(kafka_brokers: list[str]) -> None:
    """Test consumer receives published message."""
    topic = "test-orders"

    # Publish
    publisher = OrderEventPublisher(brokers=kafka_brokers, topic=topic)
    publisher.publish_order(event)
    publisher.flush()

    # Consume
    consumer = OrderEventConsumer(brokers=kafka_brokers, topic=topic, group_id="test-group")
    message = consumer.consume(timeout=5.0)

    assert message is not None
    assert message.order_id == "order_123"
    consumer.close()

4. Test Error Scenarios

def test_consumer_handles_malformed_message(kafka_brokers: list[str]) -> None:
    """Test consumer raises error on malformed JSON."""
    from confluent_kafka import Producer

    producer = Producer({"bootstrap.servers": ",".join(kafka_brokers)})
    producer.produce("test-orders", key=b"bad", value=b"not valid json")
    producer.flush()

    consumer = OrderEventConsumer(brokers=kafka_brokers, topic="test-orders", group_id="test-group")
    with pytest.raises(KafkaConsumerException):
        consumer.consume(timeout=5.0)

5. Test Message Ordering

def test_messages_ordered_within_partition(kafka_brokers: list[str]) -> None:
    """Test messages with same key maintain order."""
    publisher = OrderEventPublisher(brokers=kafka_brokers, topic="ordered-orders")
    order_id = "order_123"

    # Publish 5 messages with same order_id (same partition key)
    for i in range(5):
        event = OrderEventMessage(order_id=order_id, created_at=f"2024-01-01T12:00:{i:02d}Z", ...)
        publisher.publish_order(event)
    publisher.flush()

    # Consume and verify order
    consumer = OrderEventConsumer(brokers=kafka_brokers, topic="ordered-orders", group_id="test-group")
    for i in range(5):
        message = consumer.consume(timeout=5.0)
        assert message is not None
        assert message.created_at == f"2024-01-01T12:00:{i:02d}Z"

Requirements

  • testcontainers>=4.0.0 - Container management
  • testcontainers[kafka]>=4.0.0 - Kafka container support
  • confluent-kafka>=2.3.0 - Kafka client
  • msgspec>=0.18.6 - Message serialization
  • pytest>=7.4.3 - Test framework
  • pytest-asyncio>=0.21.1 - Async test support
  • Docker - Required for testcontainers
  • Python 3.11+ with type checking

Running Tests

# All integration tests
pytest tests/integration/ -v

# Specific test class
pytest tests/integration/test_kafka_producer_integration.py::TestOrderEventPublisherIntegration -v

# With coverage
pytest tests/integration/ --cov=app.extraction.adapters.kafka --cov=app.storage.adapters.kafka

Debugging Failed Tests

Container logs:

docker logs <container_id>

Verbose output:

pytest tests/integration/test_kafka.py::test_name -vv -s

Common Issues

Container fails to start: Check Docker is running and has available resources.

Test hangs on consume(): Verify publisher called flush() before consuming.

Malformed message exceptions: Check message schema matches expected structure.

Port already in use: Testcontainers uses random ports, conflicts are rare.

Intermittent failures: Add 2s sleep after container start for broker readiness.

Best Practices

  • Use unique consumer groups per test for isolation
  • Always flush producers before consuming
  • Clean up resources with try/finally or fixtures
  • Use timeouts on consume() to avoid hanging tests
  • Test with real Kafka (don't mock) for confidence

Example Test Patterns

See examples/examples.md for comprehensive examples:

  • Basic producer/consumer tests
  • Round-trip workflow testing
  • Message ordering verification
  • Exactly-once semantics validation
  • Error handling scenarios
  • Async testing patterns
  • Custom fixtures for pre-populated topics

Troubleshooting Guide

See references/reference.md for detailed troubleshooting:

  • Container startup issues
  • Network configuration
  • Performance optimization
  • Docker Compose integration
  • CI/CD pipeline setup

See Also

  • kafka-producer-implementation skill - Producer implementation
  • kafka-consumer-implementation skill - Consumer implementation
  • kafka-schema-management skill - Schema design
  • examples/examples.md - Complete test examples
  • references/reference.md - Troubleshooting guide
Weekly Installs
3
First Seen
Feb 24, 2026
Installed on
mcpjam3
iflow-cli3
claude-code3
junie3
windsurf3
zencoder3