kafka-streams-topology
Kafka Streams Topology Skill
Expert knowledge of Kafka Streams library for building stream processing topologies in Java/Kotlin.
What I Know
Core Abstractions
KStream (Event Stream - Unbounded, Append-Only):
- Represents immutable event sequences
- Each record is an independent event
- Use for: Clickstreams, transactions, sensor readings
KTable (Changelog Stream - Latest State by Key):
- Represents mutable state (compacted topic)
- Updates override previous values (by key)
- Use for: User profiles, product catalog, account balances
GlobalKTable (Replicated Table - Available on All Instances):
- Full table replicated to every stream instance
- No partitioning (broadcast)
- Use for: Reference data (countries, products), lookups
Key Differences:
// KStream: Every event is independent
KStream<Long, Click> clicks = builder.stream("clicks");
clicks.foreach((key, value) -> {
System.out.println(value); // Prints every click event
});
// KTable: Latest value wins (by key)
KTable<Long, User> users = builder.table("users");
users.toStream().foreach((key, value) -> {
System.out.println(value); // Prints only current user state
});
// GlobalKTable: Replicated to all instances (no partitioning)
GlobalKTable<Long, Product> products = builder.globalTable("products");
// Available for lookups on any instance (no repartitioning needed)
When to Use This Skill
Activate me when you need help with:
- Topology design ("How to design Kafka Streams topology?")
- KStream vs KTable ("When to use KStream vs KTable?")
- Stream operations ("Filter and transform events")
- Joins ("Join KStream with KTable")
- Windowing ("Tumbling vs hopping vs session windows")
- Exactly-once semantics ("Enable EOS")
- Topology optimization ("Optimize stream processing")
Common Patterns
Pattern 1: Filter and Transform
Use Case: Clean and enrich events
StreamsBuilder builder = new StreamsBuilder();
// Input stream
KStream<Long, ClickEvent> clicks = builder.stream("clicks");
// Filter out bot clicks
KStream<Long, ClickEvent> humanClicks = clicks
.filter((key, value) -> !value.isBot());
// Transform: Extract page from URL
KStream<Long, String> pages = humanClicks
.mapValues(click -> extractPage(click.getUrl()));
// Write to output topic
pages.to("pages");
Pattern 2: Branch by Condition
Use Case: Route events to different paths
Map<String, KStream<Long, Order>> branches = orders
.split(Named.as("order-"))
.branch((key, order) -> order.getTotal() > 1000, Branched.as("high-value"))
.branch((key, order) -> order.getTotal() > 100, Branched.as("medium-value"))
.defaultBranch(Branched.as("low-value"));
// High-value orders → priority processing
branches.get("order-high-value").to("priority-orders");
// Low-value orders → standard processing
branches.get("order-low-value").to("standard-orders");
Pattern 3: Enrich Stream with Table (Stream-Table Join)
Use Case: Add user details to click events
// Users table (current state)
KTable<Long, User> users = builder.table("users");
// Clicks stream
KStream<Long, ClickEvent> clicks = builder.stream("clicks");
// Enrich clicks with user data (left join)
KStream<Long, EnrichedClick> enriched = clicks.leftJoin(
users,
(click, user) -> new EnrichedClick(
click.getPage(),
user != null ? user.getName() : "unknown",
user != null ? user.getEmail() : "unknown"
),
Joined.with(Serdes.Long(), clickSerde, userSerde)
);
enriched.to("enriched-clicks");
Pattern 4: Aggregate with Windowing
Use Case: Count clicks per user, per 5-minute window
KTable<Windowed<Long>, Long> clickCounts = clicks
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count(Materialized.as("click-counts-store"));
// Convert to stream for output
clickCounts.toStream()
.map((windowedKey, count) -> {
Long userId = windowedKey.key();
Instant start = windowedKey.window().startTime();
Instant end = windowedKey.window().endTime();
return KeyValue.pair(userId, new WindowedCount(userId, start, end, count));
})
.to("click-counts");
Pattern 5: Stateful Processing with State Store
Use Case: Detect duplicate events within 10 minutes
// Define state store
StoreBuilder<KeyValueStore<Long, Long>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("dedup-store"),
Serdes.Long(),
Serdes.Long()
);
builder.addStateStore(storeBuilder);
// Deduplicate events
KStream<Long, Event> deduplicated = events.transformValues(
() -> new ValueTransformerWithKey<Long, Event, Event>() {
private KeyValueStore<Long, Long> store;
@Override
public void init(ProcessorContext context) {
this.store = context.getStateStore("dedup-store");
}
@Override
public Event transform(Long key, Event value) {
Long lastSeen = store.get(key);
long now = System.currentTimeMillis();
// Duplicate detected (within 10 minutes)
if (lastSeen != null && (now - lastSeen) < 600_000) {
return null; // Drop duplicate
}
// Not duplicate, store timestamp
store.put(key, now);
return value;
}
},
"dedup-store"
).filter((key, value) -> value != null); // Remove nulls
deduplicated.to("unique-events");
Join Types
1. Stream-Stream Join (Inner)
Use Case: Correlate related events within time window
// Page views and clicks within 10 minutes
KStream<Long, PageView> views = builder.stream("page-views");
KStream<Long, Click> clicks = builder.stream("clicks");
KStream<Long, ClickWithView> joined = clicks.join(
views,
(click, view) -> new ClickWithView(click, view),
JoinWindows.of(Duration.ofMinutes(10)),
StreamJoined.with(Serdes.Long(), clickSerde, viewSerde)
);
2. Stream-Table Join (Left)
Use Case: Enrich events with current state
// Add product details to order items
KTable<Long, Product> products = builder.table("products");
KStream<Long, OrderItem> items = builder.stream("order-items");
KStream<Long, EnrichedOrderItem> enriched = items.leftJoin(
products,
(item, product) -> new EnrichedOrderItem(
item,
product != null ? product.getName() : "Unknown",
product != null ? product.getPrice() : 0.0
)
);
3. Table-Table Join (Inner)
Use Case: Combine two tables (latest state)
// Join users with their current shopping cart
KTable<Long, User> users = builder.table("users");
KTable<Long, Cart> carts = builder.table("shopping-carts");
KTable<Long, UserWithCart> joined = users.join(
carts,
(user, cart) -> new UserWithCart(user.getName(), cart.getTotal())
);
4. Stream-GlobalKTable Join
Use Case: Enrich with reference data (no repartitioning)
// Add country details to user registrations
GlobalKTable<String, Country> countries = builder.globalTable("countries");
KStream<Long, UserRegistration> registrations = builder.stream("registrations");
KStream<Long, EnrichedRegistration> enriched = registrations.leftJoin(
countries,
(userId, registration) -> registration.getCountryCode(), // Key extractor
(registration, country) -> new EnrichedRegistration(
registration,
country != null ? country.getName() : "Unknown"
)
);
Windowing Strategies
Tumbling Windows (Non-Overlapping)
Use Case: Aggregate per fixed time period
// Count events every 5 minutes
KTable<Windowed<Long>, Long> counts = events
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count();
// Windows: [0:00-0:05), [0:05-0:10), [0:10-0:15)
Hopping Windows (Overlapping)
Use Case: Moving average or overlapping aggregates
// Count events in 10-minute windows, advancing every 5 minutes
KTable<Windowed<Long>, Long> counts = events
.groupByKey()
.windowedBy(TimeWindows.ofSizeAndGrace(
Duration.ofMinutes(10),
Duration.ofMinutes(5)
).advanceBy(Duration.ofMinutes(5)))
.count();
// Windows: [0:00-0:10), [0:05-0:15), [0:10-0:20)
Session Windows (Event-Based)
Use Case: User sessions with inactivity gap
// Session ends after 30 minutes of inactivity
KTable<Windowed<Long>, Long> sessionCounts = events
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
.count();
Sliding Windows (Continuous)
Use Case: Anomaly detection over sliding time window
// Detect >100 events in any 1-minute period
KTable<Windowed<Long>, Long> slidingCounts = events
.groupByKey()
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(1)))
.count();
Best Practices
1. Partition Keys Correctly
✅ DO:
// Repartition by user_id before aggregation
KStream<Long, Event> byUser = events
.selectKey((key, value) -> value.getUserId());
// Now aggregation is efficient
KTable<Long, Long> userCounts = byUser
.groupByKey()
.count();
❌ DON'T:
// WRONG: groupBy with different key (triggers repartitioning!)
KTable<Long, Long> userCounts = events
.groupBy((key, value) -> KeyValue.pair(value.getUserId(), value))
.count();
2. Use Appropriate Serdes
✅ DO:
// Define custom serde for complex types
Serde<User> userSerde = new JsonSerde<>(User.class);
KStream<Long, User> users = builder.stream(
"users",
Consumed.with(Serdes.Long(), userSerde)
);
❌ DON'T:
// WRONG: No serde specified (uses default String serde!)
KStream<Long, User> users = builder.stream("users");
3. Enable Exactly-Once Semantics
✅ DO:
Properties props = new Properties();
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2); // EOS v2 (recommended)
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); // Commit frequently
4. Use Materialized Stores for Queries
✅ DO:
// Named store for interactive queries
KTable<Long, Long> counts = events
.groupByKey()
.count(Materialized.<Long, Long, KeyValueStore<Bytes, byte[]>>as("user-counts")
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.Long()));
// Query store from REST API
ReadOnlyKeyValueStore<Long, Long> store =
streams.store(StoreQueryParameters.fromNameAndType(
"user-counts",
QueryableStoreTypes.keyValueStore()
));
Long count = store.get(userId);
Topology Optimization
1. Combine Operations
GOOD (Single pass):
KStream<Long, String> result = events
.filter((key, value) -> value.isValid())
.mapValues(value -> value.toUpperCase())
.filterNot((key, value) -> value.contains("test"));
BAD (Multiple intermediate topics):
KStream<Long, Event> valid = events.filter((key, value) -> value.isValid());
valid.to("valid-events"); // Unnecessary write
KStream<Long, Event> fromValid = builder.stream("valid-events");
KStream<Long, String> upper = fromValid.mapValues(v -> v.toUpperCase());
2. Reuse KTables
GOOD (Shared table):
KTable<Long, User> users = builder.table("users");
KStream<Long, EnrichedClick> enrichedClicks = clicks.leftJoin(users, ...);
KStream<Long, EnrichedOrder> enrichedOrders = orders.leftJoin(users, ...);
BAD (Duplicate tables):
KTable<Long, User> users1 = builder.table("users");
KTable<Long, User> users2 = builder.table("users"); // Duplicate!
Testing Topologies
Topology Test Driver
@Test
public void testClickFilter() {
// Setup topology
StreamsBuilder builder = new StreamsBuilder();
KStream<Long, Click> clicks = builder.stream("clicks");
clicks.filter((key, value) -> !value.isBot())
.to("human-clicks");
Topology topology = builder.build();
// Create test driver
TopologyTestDriver testDriver = new TopologyTestDriver(topology);
// Input topic
TestInputTopic<Long, Click> inputTopic = testDriver.createInputTopic(
"clicks",
Serdes.Long().serializer(),
clickSerde.serializer()
);
// Output topic
TestOutputTopic<Long, Click> outputTopic = testDriver.createOutputTopic(
"human-clicks",
Serdes.Long().deserializer(),
clickSerde.deserializer()
);
// Send test data
inputTopic.pipeInput(1L, new Click(1L, "page1", false)); // Human
inputTopic.pipeInput(2L, new Click(2L, "page2", true)); // Bot
// Assert output
List<Click> output = outputTopic.readValuesToList();
assertEquals(1, output.size()); // Only human click
assertFalse(output.get(0).isBot());
testDriver.close();
}
Common Issues & Solutions
Issue 1: StreamsException - Not Co-Partitioned
Error: Topics not co-partitioned for join
Root Cause: Joined streams/tables have different partition counts
Solution: Repartition to match:
// Ensure same partition count
KStream<Long, Event> repartitioned = events
.through("events-repartitioned",
Produced.with(Serdes.Long(), eventSerde)
.withStreamPartitioner((topic, key, value, numPartitions) ->
(int) (key % 12) // Match target partition count
)
);
Issue 2: Out of Memory (Large State Store)
Error: Java heap space
Root Cause: State store too large, windowing not used
Solution: Add time-based cleanup:
// Use windowing to limit state size
KTable<Windowed<Long>, Long> counts = events
.groupByKey()
.windowedBy(TimeWindows.ofSizeAndGrace(
Duration.ofHours(24), // Window size
Duration.ofHours(1) // Grace period
))
.count();
Issue 3: High Lag, Slow Processing
Root Cause: Blocking operations, inefficient transformations
Solution: Use async processing:
// BAD: Blocking HTTP call
events.mapValues(value -> {
return httpClient.get(value.getUrl()); // BLOCKS!
});
// GOOD: Async processing with state store
events.transformValues(() -> new AsyncEnricher());
References
- Kafka Streams Documentation: https://kafka.apache.org/documentation/streams/
- Kafka Streams Tutorial: https://kafka.apache.org/documentation/streams/tutorial
- Testing Guide: https://kafka.apache.org/documentation/streams/developer-guide/testing.html
Invoke me when you need topology design, joins, windowing, or exactly-once semantics expertise!
More from anton-abyzov/specweave
technical-writing
Technical writing expert for API documentation, README files, tutorials, changelog management, and developer documentation. Covers style guides, information architecture, versioning docs, OpenAPI/Swagger, and documentation-as-code. Activates for technical writing, API docs, README, changelog, tutorial writing, documentation, technical communication, style guide, OpenAPI, Swagger, developer docs.
45spec-driven-brainstorming
Spec-driven brainstorming and product discovery expert. Helps teams ideate features, break down epics, conduct story mapping sessions, prioritize using MoSCoW/RICE/Kano, and validate ideas with lean startup methods. Activates for brainstorming, product discovery, story mapping, feature ideation, prioritization, MoSCoW, RICE, Kano model, lean startup, MVP definition, product backlog, feature breakdown.
43kafka-architecture
Apache Kafka architecture expert for cluster design, capacity planning, and high availability. Use when designing Kafka clusters, choosing partition strategies, or sizing brokers for production workloads.
34docusaurus
Docusaurus 3.x documentation framework - MDX authoring, theming, versioning, i18n. Use for documentation sites or spec-weave.com.
29frontend
Expert frontend developer for React, Vue, Angular, and modern JavaScript/TypeScript. Use when creating components, implementing hooks, handling state management, or building responsive web interfaces. Covers React 18+ features, custom hooks, form handling, and accessibility best practices.
29reflect
Self-improving AI memory system that persists learnings across sessions in CLAUDE.md. Use when capturing corrections, remembering user preferences, or extracting patterns from successful implementations. Enables continual learning without starting from zero each conversation.
27