message-broker
Message Broker Instructions
Guidelines for building TCP-based pub/sub message brokers for inter-process communication between Python applications and AutoHotkey scripts.
Architecture Overview
┌─────────────────────────────────────────┐
│ Message Broker Server │
│ (localhost:15234) │
│ Topic Router | Message Queue | Health │
└─────────────────────────────────────────┘
▲ ▲ ▲
│ TCP │ TCP │ TCP
┌───────┴───────┐ ┌─────┴─────┐ ┌───────┴───────┐
│ Python App │ │ AHK App │ │ Qt App │
│ BrokerClient │ │ TCP Socket│ │QtBrokerClient │
└───────────────┘ └───────────┘ └───────────────┘
Protocol Specification
Transport
- Protocol: TCP
- Default Port: 15234
- Host: localhost
Frame Format
┌─────────────────┬────────────────────────┐
│ Length (4 bytes)│ JSON Payload (UTF-8) │
│ (big-endian) │ │
└─────────────────┴────────────────────────┘
JSON-RPC 2.0 Messages
Request:
{"jsonrpc": "2.0", "method": "subscribe", "params": {"patterns": ["jobs.*"]}, "id": 1}
Response:
{"jsonrpc": "2.0", "result": {"subscribed": ["jobs.*"]}, "id": 1}
Notification (no response):
{"jsonrpc": "2.0", "method": "message", "params": {"topic": "jobs.activated", "payload": {...}, "sender": "app1"}}
Methods
| Method | Description | Params |
|---|---|---|
connect |
Initialize connection | client_id, client_type, version |
disconnect |
Clean disconnect | - |
subscribe |
Subscribe to patterns | patterns[] |
unsubscribe |
Unsubscribe | patterns[] |
publish |
Publish message | topic, payload, qos, ttl |
ping |
Heartbeat | - |
status |
Get broker status | - |
Broker Server
Configuration
from dataclasses import dataclass
@dataclass
class BrokerConfig:
host: str = "localhost"
port: int = 15234
max_connections: int = 100
heartbeat_interval: float = 5.0
heartbeat_timeout: float = 30.0
max_message_size: int = 1024 * 1024 # 1 MB
max_queue_size: int = 10000
Basic Server
import asyncio
from message_broker import MessageBroker, BrokerConfig
async def main():
config = BrokerConfig(host="localhost", port=15234)
broker = MessageBroker(config)
await broker.start()
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
await broker.stop()
asyncio.run(main())
System Topics
| Topic | Payload | Description |
|---|---|---|
system.broker.ready |
{host, port} |
Broker started |
system.broker.shutdown |
{reason} |
Broker stopping |
system.client.connected |
{client_id} |
Client connected |
system.client.disconnected |
{client_id} |
Client disconnected |
Python Async Client
from message_broker import BrokerClient, Message
async def main():
client = BrokerClient("my_app", host="localhost", port=15234)
await client.connect()
# Subscribe
async def handler(msg: Message):
print(f"{msg.topic}: {msg.payload}")
await client.subscribe(["jobs.*", "scripts.+"], handler)
# Publish
await client.publish("jobs.activated", {"job_id": "123"})
# Request/Response
response = await client.request("scripts.status", {"id": "saw1"}, timeout=5.0)
await client.disconnect()
Client Properties
client.client_id: str
client.is_connected: bool
await client.connect() -> bool
await client.disconnect()
await client.subscribe(patterns, callback) -> list[str]
await client.unsubscribe(patterns) -> list[str]
await client.publish(topic, payload, qos=0, ttl=0) -> str
await client.request(topic, payload, timeout=5.0) -> dict
Qt Integration (QtBrokerClient)
Thread-safe client with Qt signals for PySide6/PyQt applications.
from PySide6.QtWidgets import QMainWindow
from PySide6.QtCore import Slot
from message_broker.qt_client import QtBrokerClient
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self._broker = QtBrokerClient("main_app", parent=self)
# Connect signals
self._broker.connected.connect(self._on_connected)
self._broker.disconnected.connect(self._on_disconnected)
self._broker.message_received.connect(self._on_message)
self._broker.error_occurred.connect(self._on_error)
self._broker.connect_to_broker()
@Slot()
def _on_connected(self):
self._broker.subscribe(["jobs.*"])
@Slot(str, dict, str, str)
def _on_message(self, topic, payload, sender, msg_id):
print(f"{topic} from {sender}: {payload}")
def publish_event(self, job_id):
self._broker.publish("jobs.activated", {"job_id": job_id})
def closeEvent(self, event):
self._broker.disconnect_from_broker()
super().closeEvent(event)
QtBrokerClient Signals
| Signal | Parameters | Description |
|---|---|---|
connected |
- | Connected |
disconnected |
- | Disconnected |
connection_failed |
str |
Connection error |
message_received |
str, dict, str, str |
topic, payload, sender, id |
subscribed |
list[str] |
Patterns subscribed |
published |
str |
Message ID |
error_occurred |
str |
Error message |
Topic Patterns
Naming Convention
<domain>.<entity>.<action>
Examples:
jobs.activatedjobs.12345.updatedscripts.saw1.startedsystem.broker.ready
Pattern Matching
| Pattern | Matches | Description |
|---|---|---|
jobs.activated |
Exact match | Single topic |
jobs.* |
jobs.activated, jobs.created |
Single level wildcard |
jobs.+ |
jobs.123, jobs.abc |
Single level (≥1 char) |
scripts.# |
scripts.saw1.started, scripts.saw1.hotkey.pressed |
Multi-level wildcard |
scripts.+.started |
scripts.saw1.started |
Middle wildcard |
Reserved Topics
| Pattern | Purpose |
|---|---|
system.* |
Broker internal events |
_internal.* |
Internal operations |
Common Patterns
Job Activation Flow
# Publisher (main app)
await broker.publish("jobs.activated", {
"job_id": job_id,
"job_number": "ABC123",
"timestamp": datetime.now().isoformat(),
})
# Subscriber (other apps)
async def on_message(msg):
if msg.topic == "jobs.activated":
job_id = msg.payload["job_id"]
update_ui(job_id)
await client.subscribe(["jobs.*"], on_message)
Script Status Updates
# Runner publishes lifecycle
await broker.publish(f"scripts.{script_id}.started", {"pid": pid})
await broker.publish(f"scripts.{script_id}.stopped", {"exit_code": 0})
# Monitor subscribes
await client.subscribe(["scripts.+.started", "scripts.+.stopped"], handler)
Request/Response (RPC)
# Client requests
response = await client.request("scripts.status", {"script_id": "saw1"}, timeout=5.0)
# Service handles
async def handle_request(msg):
script_id = msg.payload["script_id"]
return {"running": script_id in running_scripts}
AHK Integration
Direct TCP (AHK v2)
class BrokerClient {
__New(clientId, host := "localhost", port := 15234) {
this.clientId := clientId
this.host := host
this.port := port
this.requestId := 0
}
Connect() {
this.socket := ComObject("MSWinsock.Winsock.1")
this.socket.RemoteHost := this.host
this.socket.RemotePort := this.port
this.socket.Connect()
; Send connect request
this.requestId++
request := '{"jsonrpc":"2.0","method":"connect","params":{"client_id":"' . this.clientId . '"},"id":' . this.requestId . '}'
this._Send(request)
}
Publish(topic, payload) {
this.requestId++
request := '{"jsonrpc":"2.0","method":"publish","params":{"topic":"' . topic . '","payload":' . payload . '},"id":' . this.requestId . '}'
this._Send(request)
}
_Send(data) {
; Length prefix (4 bytes big-endian)
len := StrLen(data)
header := Chr(len >> 24 & 0xFF) . Chr(len >> 16 & 0xFF) . Chr(len >> 8 & 0xFF) . Chr(len & 0xFF)
this.socket.SendData(header . data)
}
}
Python Bridge (Recommended)
from message_broker import BrokerClient
from message_broker.ahk_bridge import AHKBridge
async def main():
client = BrokerClient("ahk_bridge")
await client.connect()
bridge = AHKBridge(broker_client=client)
await bridge.start()
await bridge.run_script("scripts/hotkeys.ahk")
Error Codes
| Code | Name | Description |
|---|---|---|
| -32700 | PARSE_ERROR | Invalid JSON |
| -32600 | INVALID_REQUEST | Not valid JSON-RPC |
| -32601 | METHOD_NOT_FOUND | Unknown method |
| -32602 | INVALID_PARAMS | Invalid parameters |
| -32603 | INTERNAL_ERROR | Internal broker error |
| -32001 | NOT_CONNECTED | Client not connected |
| -32002 | TIMEOUT | Request timed out |
Testing
Mock Client
from unittest.mock import AsyncMock, MagicMock
@pytest.fixture
def mock_broker():
client = MagicMock()
client.is_connected = True
client.connect = AsyncMock(return_value=True)
client.publish = AsyncMock(return_value="msg-123")
client.subscribe = AsyncMock(return_value=["jobs.*"])
return client
Integration Test
@pytest.fixture
async def broker():
broker = MessageBroker()
await broker.start()
yield broker
await broker.stop()
async def test_pub_sub(broker):
client = BrokerClient("test")
await client.connect()
received = []
await client.subscribe(["test.*"], lambda m: received.append(m))
await client.publish("test.msg", {"data": "hello"})
await asyncio.sleep(0.1)
assert received[0].payload["data"] == "hello"
Troubleshooting
Connection Refused
- Check broker is running
- Verify port is correct
- Check firewall settings
Messages Not Received
- Verify topic matches subscription pattern
- Check subscription was active before publish
- Enable debug logging:
logging.getLogger("message_broker").setLevel(logging.DEBUG)
Qt Client Freezing
- Never use
awaitin Qt main thread - Use
QtBrokerClientwhich handles threading
Debug Tools
# Enable message logging
config = BrokerConfig(log_messages=True)
# Get broker status
status = await client.request("status", {})
print(f"Clients: {status['client_count']}")
# Subscribe to all for debugging
await client.subscribe(["#"], debug_handler)
Configuration Defaults
DEFAULT_HOST = "localhost"
DEFAULT_PORT = 15234
DEFAULT_HEARTBEAT_INTERVAL = 5.0
DEFAULT_HEARTBEAT_TIMEOUT = 30.0
DEFAULT_MAX_MESSAGE_SIZE = 1024 * 1024
References
More from ds-codi/project-memory-mcp
pyside6-mvc
Use this skill when building Python desktop applications using PySide6 with strict MVC architecture where all UI is defined by .ui files. Covers architecture patterns, controller/model/view separation, signal handling, and .ui file workflows.
95pyside6-qml-views
Use this skill when creating QML view files, designing QML component hierarchies, building layouts, styling QML controls, creating reusable QML components, implementing QML navigation / page switching, or working with QML resources. Covers QML file structure, component patterns, Material/Controls styling, resource management, and common QML idioms for desktop applications.
49pyside6-qml-architecture
Use this skill when creating a new PySide6 + QML desktop application with MVC architecture, setting up project structure, implementing the application bootstrap / DI container, or understanding how the MVC layers connect. Covers project scaffolding, entry points, singleton application class, service locator, signal registry, and lifecycle management.
47mvc-architecture
Use this skill when implementing Model-View-Controller architecture. Covers core MVC principles, layer separation, dependency injection, event-driven communication, and patterns for controllers, models, and views.
40pyside6-qml-models-services
Use this skill when creating domain models with Qt signal support, implementing the repository pattern for data persistence, building service classes for external interactions, designing the central signal registry, or working with application state management. Covers BaseModel, model serialization, database repositories, service patterns, signal definitions, and the ApplicationState singleton.
34pyside6-qml-bridge
Use this skill when exposing Python objects to QML, creating bridge classes, defining Qt properties with NOTIFY signals, implementing invokable methods / slots, or connecting QML user actions to Python controllers. Covers the QObject bridge pattern, property decorators, type conversions, context properties, and QML type registration.
32