data-engineering-streaming
SKILL.md
Streaming Data Systems
Real-time data ingestion and stream processing with Apache Kafka, MQTT, and NATS JetStream. Covers producers, consumers, and stream processing patterns for data engineering pipelines.
Quick Comparison
| Feature | Apache Kafka | MQTT | NATS JetStream |
|---|---|---|---|
| Use Case | High-throughput event streaming | IoT, mobile, constrained devices | Cloud-native, microservices |
| Throughput | Millions/sec | Thousands/sec | Hundreds of thousands/sec |
| Durability | Disk-based log, replayable | Ephemeral (configurable) | Disk-based persistence |
| Ordering | Per-partition | N/A (topic-based) | Per-subject |
| Python Client | confluent-kafka | paho-mqtt | nats-py |
| Best For | Event sourcing, CDC, log aggregation | Sensor data, telemetry | Service-to-service messaging |
When to Use Which?
- Kafka: High-volume event streams, log aggregation, CDC, data lake ingestion
- MQTT: IoT devices, mobile push, constrained networks
- NATS JetStream: Microservices, request-reply, cloud-native, simpler ops than Kafka
Skill Dependencies
@data-engineering-core- Process stream data with Polars/DuckDB@data-engineering-orchestration- Orchestrate stream processing jobs@data-engineering-quality- Validate streaming data@data-engineering-storage-lakehouse- Persist streams to Delta/Iceberg
Detailed Guides
Apache Kafka
Install: pip install confluent-kafka
Producer
from confluent_kafka import Producer
import socket
import json
def delivery_report(err, msg):
if err is not None:
print(f"Delivery failed: {err}")
else:
print(f"Delivered to {msg.topic()} [{msg.partition()}]")
conf = {
'bootstrap.servers': 'localhost:9092',
'client.id': socket.gethostname(),
'acks': 'all' # Wait for all replicas
}
producer = Producer(conf)
# Send messages asynchronously
for i in range(100):
data = {'id': i, 'event': 'user_activity', 'value': i * 10}
producer.produce(
topic='user_activity_events',
key=str(i),
value=json.dumps(data).encode('utf-8'),
callback=delivery_report
)
producer.poll(0) # Trigger callbacks
producer.flush() # Wait for delivery
# With Schema Registry (Avro)
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
schema_registry_client = SchemaRegistryClient({'url': 'http://localhost:8081'})
avro_serializer = AvroSerializer(schema_registry_client, schema_str)
producer = SerializingProducer({
'bootstrap.servers': 'localhost:9092',
'key.serializer': StringSerializer('utf_8'),
'value.serializer': avro_serializer,
})
Consumer
from confluent_kafka import Consumer, KafkaError
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my_consumer_group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False, # Manual commit
'max.poll.interval.ms': 300000
}
consumer = Consumer(conf)
consumer.subscribe(['user_activity_events'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise KafkaException(msg.error())
# Process message
data = json.loads(msg.value().decode('utf-8'))
print(f"Received: {data}")
# Manual commit after processing
consumer.commit(asynchronous=False)
except KeyboardInterrupt:
pass
finally:
consumer.close()
Stream Processing (ksqlDB pattern)
For complex transformations, use ksqlDB or Kafka Streams. REST API example:
import requests
ksql_query = {
"ksql": """
SELECT id,
COUNT(*) AS event_count,
SUM(value) AS total_value
FROM user_activity_events
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY id
EMIT CHANGES
"""
}
response = requests.post("http://localhost:8088/query", json=ksql_query)
for line in response.iter_lines():
print(line)
MQTT for IoT
Install: pip install paho-mqtt
Publisher
import paho.mqtt.client as mqtt
import json
import time
broker = "broker.emqx.io"
topic = "iot/sensors/temperature"
client = mqtt.Client(client_id="publisher_1", protocol=mqtt.MQTTv5)
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to broker")
else:
print(f"Connection failed: {rc}")
client.on_connect = on_connect
client.connect(broker)
client.loop_start()
for i in range(10):
payload = {
"sensor_id": "temp_sensor_1",
"temperature": 20 + i * 0.5,
"humidity": 45 + i,
"timestamp": time.time()
}
client.publish(
topic=topic,
payload=json.dumps(payload),
qos=1 # At-least-once delivery
)
time.sleep(5)
client.loop_stop()
client.disconnect()
Subscriber
def on_connect(client, userdata, flags, rc):
client.subscribe(topic, qos=1)
def on_message(client, userdata, msg):
payload = json.loads(msg.payload.decode("utf-8"))
print(f"[{msg.topic}] {payload}")
client.on_connect = on_connect
client.on_message = on_message
client.connect(broker)
client.loop_forever()
NATS JetStream
Install: pip install nats-py
Producer/Consumer
import asyncio
import nats
async def main():
nc = await nats.connect("nats://localhost:4222")
js = nc.jetstream()
# Create stream
await js.add_stream(
name="events",
subjects=["events.*"],
storage="file",
max_msgs=10000,
max_age=3600
)
# Publish
await js.publish("events.page_loaded", b'{"page": "/home"}')
# Push consumer
sub = await js.subscribe("events.*", durable="worker-1")
async for msg in sub:
print(f"Received: {msg.data.decode()}")
await msg.ack()
await nc.close()
asyncio.run(main())
Work Queue Pattern
async def worker(name: str):
nc = await nats.connect("nats://localhost:4222")
js = nc.jetstream()
sub = await js.subscribe("jobs.*", durable="workers", queue_group="processing")
async for msg in sub:
job_data = msg.data.decode()
print(f"Worker {name} processing: {job_data}")
await msg.ack()
await nc.close()
Production Patterns
Idempotent Processing
Stream processors may receive duplicates. Design idempotent consumers:
processed_ids = load_checkpoint() # From DB/Redis
if msg.id in processed_ids:
ack() # Skip duplicate
else:
process(msg)
save_checkpoint(msg.id)
ack()
Batch Processing
# Accumulate messages before writing to reduce DB load
batch = []
while True:
msg = consumer.poll(timeout=0.1)
if msg:
batch.append(msg.value())
if len(batch) >= BATCH_SIZE or timeout_reached:
write_to_db(batch)
consumer.commit() # Commit after batch write
batch.clear()
Error Handling (Dead Letter Queue)
try:
process(msg)
except Exception as e:
if is_retryable(e):
nack(requeue=True) # Retry
else:
produce_to_dlq(msg, str(e)) # Send to dead letter queue
ack()
Schema Evolution
- Use Avro/Protobuf with Schema Registry for compatibility
- Evolve schemas additively (new fields optional, old fields preserved)
- Register schemas per topic
References
- Apache Kafka Documentation
- confluent-kafka Python Client
- MQTT 5.0 Specification
- NATS JetStream Documentation
@data-engineering-orchestration- Orchestrate consumers/producers as flows
Weekly Installs
5
Repository
legout/data-pla…t-skillsFirst Seen
Feb 11, 2026
Security Audits
Installed on
pi5
mcpjam3
claude-code3
junie3
windsurf3
zencoder3