websocket-realtime
SKILL.md
WebSocket Real-time Skill
Quick Start
- Read Phase 5 Constitution -
constitution-prompt-phase-5.md - Create WebSocket service - New microservice on port 8005
- Subscribe to Kafka events - Via Dapr pub/sub
- Broadcast to clients - WebSocket connections per user
- Update frontend - Connect to WebSocket service
- Handle reconnection - Automatic reconnect logic
Architecture
┌─────────────┐ ┌───────────────┐ ┌─────────────────┐
│ Backend │────▶│ Kafka │────▶│ WebSocket Svc │
│ (Events) │ │ task-updates │ │ (Port 8005) │
└─────────────┘ └───────────────┘ └────────┬────────┘
│
WebSocket Connections │
┌──────────────────────────────┼──────────────────────────────┐
│ │ │
┌─────▼─────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ Client 1 │ │ Client 2 │ │ Client 3 │
│ (User A) │ │ (User A) │ │ (User B) │
└───────────┘ └─────────────┘ └─────────────┘
WebSocket Service Implementation
Project Structure
services/websocket/
├── src/
│ ├── __init__.py
│ ├── main.py # FastAPI app with WebSocket
│ ├── connection.py # Connection manager
│ ├── events.py # Event handlers
│ └── auth.py # Token validation
├── pyproject.toml
└── Dockerfile
Main Application
# services/websocket/src/main.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query, Depends
from fastapi.middleware.cors import CORSMiddleware
from dapr.ext.fastapi import DaprApp
import json
from .connection import ConnectionManager
from .auth import verify_token
app = FastAPI(title="WebSocket Service")
dapr_app = DaprApp(app)
manager = ConnectionManager()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.websocket("/ws")
async def websocket_endpoint(
websocket: WebSocket,
token: str = Query(...)
):
"""WebSocket endpoint for real-time updates."""
# Verify JWT token
user_id = await verify_token(token)
if not user_id:
await websocket.close(code=4001)
return
await manager.connect(websocket, user_id)
try:
while True:
# Keep connection alive, handle client messages
data = await websocket.receive_text()
message = json.loads(data)
if message.get("type") == "ping":
await websocket.send_json({"type": "pong"})
except WebSocketDisconnect:
manager.disconnect(websocket, user_id)
# Dapr subscription for task events
@dapr_app.subscribe(pubsub="taskpubsub", topic="task-updates")
async def handle_task_update(event: dict):
"""Handle task update events from Kafka."""
user_id = event.get("user_id")
event_type = event.get("event_type")
task_data = event.get("task")
# Broadcast to all connections for this user
await manager.broadcast_to_user(user_id, {
"type": "task_update",
"event": event_type,
"task": task_data
})
@app.get("/health")
async def health_check():
return {"status": "healthy", "connections": manager.active_connections_count}
Connection Manager
# services/websocket/src/connection.py
from fastapi import WebSocket
from collections import defaultdict
import asyncio
import logging
logger = logging.getLogger(__name__)
class ConnectionManager:
"""Manage WebSocket connections per user."""
def __init__(self):
# user_id -> list of WebSocket connections
self.active_connections: dict[str, list[WebSocket]] = defaultdict(list)
self._lock = asyncio.Lock()
async def connect(self, websocket: WebSocket, user_id: str):
"""Accept and store a new WebSocket connection."""
await websocket.accept()
async with self._lock:
self.active_connections[user_id].append(websocket)
logger.info(f"User {user_id} connected. Total connections: {self.active_connections_count}")
def disconnect(self, websocket: WebSocket, user_id: str):
"""Remove a WebSocket connection."""
if user_id in self.active_connections:
if websocket in self.active_connections[user_id]:
self.active_connections[user_id].remove(websocket)
if not self.active_connections[user_id]:
del self.active_connections[user_id]
logger.info(f"User {user_id} disconnected. Total connections: {self.active_connections_count}")
async def broadcast_to_user(self, user_id: str, message: dict):
"""Send message to all connections for a specific user."""
if user_id not in self.active_connections:
return
disconnected = []
for connection in self.active_connections[user_id]:
try:
await connection.send_json(message)
except Exception as e:
logger.error(f"Failed to send to user {user_id}: {e}")
disconnected.append(connection)
# Clean up disconnected
for conn in disconnected:
self.disconnect(conn, user_id)
async def broadcast_to_all(self, message: dict):
"""Send message to all connected users."""
for user_id in list(self.active_connections.keys()):
await self.broadcast_to_user(user_id, message)
@property
def active_connections_count(self) -> int:
"""Total number of active connections."""
return sum(len(conns) for conns in self.active_connections.values())
Token Verification
# services/websocket/src/auth.py
import jwt
import os
from typing import Optional
SECRET_KEY = os.getenv("BETTER_AUTH_SECRET", "")
ALGORITHM = "HS256"
async def verify_token(token: str) -> Optional[str]:
"""Verify JWT token and return user_id."""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload.get("sub") # user_id
except jwt.InvalidTokenError:
return None
Frontend WebSocket Client
WebSocket Hook
// frontend/lib/websocket/use-websocket.ts
import { useEffect, useRef, useCallback, useState } from "react";
import { useAuthStore } from "@/stores/auth-store";
import { useTaskStore } from "@/stores/task-store";
interface WebSocketMessage {
type: string;
event?: string;
task?: Task;
}
export function useWebSocket() {
const { token } = useAuthStore();
const { updateTask, addTask, removeTask } = useTaskStore();
const ws = useRef<WebSocket | null>(null);
const [isConnected, setIsConnected] = useState(false);
const reconnectAttempts = useRef(0);
const maxReconnectAttempts = 5;
const connect = useCallback(() => {
if (!token) return;
const wsUrl = `${process.env.NEXT_PUBLIC_WS_URL}/ws?token=${token}`;
ws.current = new WebSocket(wsUrl);
ws.current.onopen = () => {
console.log("WebSocket connected");
setIsConnected(true);
reconnectAttempts.current = 0;
};
ws.current.onmessage = (event) => {
const message: WebSocketMessage = JSON.parse(event.data);
handleMessage(message);
};
ws.current.onclose = () => {
console.log("WebSocket disconnected");
setIsConnected(false);
// Attempt reconnection with exponential backoff
if (reconnectAttempts.current < maxReconnectAttempts) {
const delay = Math.pow(2, reconnectAttempts.current) * 1000;
reconnectAttempts.current++;
setTimeout(connect, delay);
}
};
ws.current.onerror = (error) => {
console.error("WebSocket error:", error);
};
}, [token]);
const handleMessage = (message: WebSocketMessage) => {
if (message.type === "task_update" && message.task) {
switch (message.event) {
case "task.created":
addTask(message.task);
break;
case "task.updated":
updateTask(message.task);
break;
case "task.deleted":
removeTask(message.task.id);
break;
case "task.completed":
updateTask({ ...message.task, status: "completed" });
break;
}
}
};
// Heartbeat to keep connection alive
useEffect(() => {
if (!isConnected) return;
const interval = setInterval(() => {
if (ws.current?.readyState === WebSocket.OPEN) {
ws.current.send(JSON.stringify({ type: "ping" }));
}
}, 30000); // Every 30 seconds
return () => clearInterval(interval);
}, [isConnected]);
// Connect on mount
useEffect(() => {
connect();
return () => {
ws.current?.close();
};
}, [connect]);
return { isConnected };
}
WebSocket Provider
// frontend/providers/websocket-provider.tsx
"use client";
import { createContext, useContext, ReactNode } from "react";
import { useWebSocket } from "@/lib/websocket/use-websocket";
interface WebSocketContextType {
isConnected: boolean;
}
const WebSocketContext = createContext<WebSocketContextType>({
isConnected: false,
});
export function WebSocketProvider({ children }: { children: ReactNode }) {
const { isConnected } = useWebSocket();
return (
<WebSocketContext.Provider value={{ isConnected }}>
{children}
</WebSocketContext.Provider>
);
}
export function useWebSocketStatus() {
return useContext(WebSocketContext);
}
Connection Status Indicator
// frontend/components/websocket/connection-status.tsx
"use client";
import { useWebSocketStatus } from "@/providers/websocket-provider";
import { Wifi, WifiOff } from "lucide-react";
export function ConnectionStatus() {
const { isConnected } = useWebSocketStatus();
return (
<div className="flex items-center gap-2">
{isConnected ? (
<>
<Wifi className="h-4 w-4 text-green-500" />
<span className="text-sm text-green-500">Connected</span>
</>
) : (
<>
<WifiOff className="h-4 w-4 text-red-500" />
<span className="text-sm text-red-500">Disconnected</span>
</>
)}
</div>
);
}
Backend Event Publishing
# backend/src/services/task_service.py
from dapr.clients import DaprClient
async def publish_task_update(event_type: str, task: Task):
"""Publish task update for real-time sync."""
with DaprClient() as client:
client.publish_event(
pubsub_name="taskpubsub",
topic_name="task-updates",
data={
"event_type": event_type,
"user_id": str(task.user_id),
"task": task.model_dump(mode="json")
}
)
Kubernetes Deployment
# k8s/websocket-service.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: websocket-service
namespace: todo-app
spec:
replicas: 2
selector:
matchLabels:
app: websocket-service
template:
metadata:
labels:
app: websocket-service
annotations:
dapr.io/enabled: "true"
dapr.io/app-id: "websocket-service"
dapr.io/app-port: "8005"
spec:
containers:
- name: websocket-service
image: evolution-todo/websocket-service:latest
ports:
- containerPort: 8005
env:
- name: BETTER_AUTH_SECRET
valueFrom:
secretKeyRef:
name: todo-secrets
key: better-auth-secret
---
apiVersion: v1
kind: Service
metadata:
name: websocket-service
namespace: todo-app
spec:
selector:
app: websocket-service
ports:
- port: 8005
targetPort: 8005
Dapr Configuration
# dapr-components/subscription.yaml
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: task-updates-subscription
spec:
pubsubname: taskpubsub
topic: task-updates
routes:
default: /task-updates
scopes:
- websocket-service
Verification Checklist
- WebSocket service created on port 8005
- Connection manager handles multiple users
- Token verification working
- Dapr subscription configured
- Frontend WebSocket hook created
- Automatic reconnection working
- Heartbeat/ping-pong implemented
- Connection status indicator displayed
- Real-time updates propagate to all clients
- Service deployed to Kubernetes
Troubleshooting
| Issue | Cause | Solution |
|---|---|---|
| Connection refused | Wrong URL | Check WS_URL env var |
| Auth failed | Invalid token | Verify token format |
| No updates received | Dapr not connected | Check dapr sidecar logs |
| Connection drops | No heartbeat | Implement ping/pong |
| High latency | Too many connections | Scale horizontally |
References
Weekly Installs
3
Repository
maneeshanif/tod…c-drivenFirst Seen
Feb 13, 2026
Security Audits
Installed on
claude-code3
github-copilot3
codex3
kimi-cli3
gemini-cli3
amp3