message-bus
SKILL.md
Message Bus -- Inter-Agent Communication Protocol
File-based message queue enabling workers and board directors to coordinate via shared state.
Directory Structure
conductor/tracks/{track}/.message-bus/
├── queue.jsonl # Append-only message log (all messages)
├── locks.json # Current file locks
├── worker-status.json # Worker heartbeats and states
├── events/ # Signal files for polling
│ ├── TASK_COMPLETE_1.1.event
│ └── FILE_UNLOCK_*.event
└── board/ # Board deliberation sessions
├── session-{ts}.json # Session metadata
├── assessments.json # Director assessments (Phase 1)
├── discussion.jsonl # Discussion messages (Phase 2)
└── votes.json # Final votes (Phase 3)
Message Types
Worker Messages
| Type | Purpose | Payload |
|---|---|---|
PROGRESS |
Task progress update | { task_id, progress_pct, current_subtask } |
TASK_COMPLETE |
Task finished | { task_id, commit_sha, files_modified, unblocks[] } |
TASK_FAILED |
Task failed | { task_id, error, stack_trace } |
FILE_LOCK |
Acquire file lock | { filepath, lock_type, expires_at } |
FILE_UNLOCK |
Release file lock | { filepath } |
BLOCKED |
Waiting on dependency | { task_id, waiting_for, resource } |
Board Messages
| Type | Purpose | Payload |
|---|---|---|
BOARD_ASSESS |
Director assessment | { director, verdict, score, concerns[], recommendations[] } |
BOARD_DISCUSS |
Discussion message | { from, to, type, message, changes_my_verdict } |
BOARD_VOTE |
Final vote | { director, final_verdict, confidence, conditions[] } |
BOARD_RESOLVE |
Aggregated decision | { verdict, vote_summary, conditions[], dissent[] } |
Message Format
All messages follow this structure:
{
"id": "msg-{uuid}",
"type": "PROGRESS | TASK_COMPLETE | BOARD_ASSESS | ...",
"source": "worker-1.1-xxx | CA | orchestrator",
"timestamp": "2026-02-01T12:00:00Z",
"payload": { ... }
}
Worker Protocol
Posting Messages
def post_message(bus_path: str, msg_type: str, source: str, payload: dict):
message = {
"id": f"msg-{uuid4()}",
"type": msg_type,
"source": source,
"timestamp": datetime.utcnow().isoformat() + "Z",
"payload": payload
}
# Append to queue (atomic via file locking)
with open(f"{bus_path}/queue.jsonl", "a") as f:
f.write_file(json.dumps(message) + "\n")
# Create event file for polling
if msg_type in ["TASK_COMPLETE", "FILE_UNLOCK", "BOARD_RESOLVE"]:
event_file = f"{bus_path}/events/{msg_type}_{payload.get('task_id', 'all')}.event"
Path(event_file).touch()
Reading Messages
def read_messages(bus_path: str, since: str = None, msg_type: str = None) -> list:
messages = []
with open(f"{bus_path}/queue.jsonl", "r") as f:
for line in f:
msg = json.loads(line)
if since and msg["timestamp"] < since:
continue
if msg_type and msg["type"] != msg_type:
continue
messages.append(msg)
return messages
Polling for Events
def wait_for_event(bus_path: str, event_pattern: str, timeout: int = 300) -> bool:
"""Wait for event file to appear. Returns True if found, False if timeout."""
import glob
import time
start = time.time()
while time.time() - start < timeout:
matches = glob.glob(f"{bus_path}/events/{event_pattern}")
if matches:
return True
time.sleep(1)
return False
File Lock Protocol
Acquiring Locks
def acquire_lock(bus_path: str, filepath: str, worker_id: str) -> bool:
locks_file = f"{bus_path}/locks.json"
# read_file current locks
locks = json.load(open(locks_file)) if os.path.exists(locks_file) else {}
# Check if already locked
if filepath in locks:
existing = locks[filepath]
# Check if expired (30 min timeout)
if datetime.fromisoformat(existing["expires_at"]) > datetime.utcnow():
if existing["worker_id"] != worker_id:
return False # Locked by another worker
# Acquire lock
locks[filepath] = {
"worker_id": worker_id,
"acquired_at": datetime.utcnow().isoformat() + "Z",
"expires_at": (datetime.utcnow() + timedelta(minutes=30)).isoformat() + "Z"
}
with open(locks_file, "w") as f:
json.dump(locks, f, indent=2)
# Post lock message
post_message(bus_path, "FILE_LOCK", worker_id, {"filepath": filepath})
return True
Releasing Locks
def release_lock(bus_path: str, filepath: str, worker_id: str):
locks_file = f"{bus_path}/locks.json"
locks = json.load(open(locks_file)) if os.path.exists(locks_file) else {}
if filepath in locks and locks[filepath]["worker_id"] == worker_id:
del locks[filepath]
with open(locks_file, "w") as f:
json.dump(locks, f, indent=2)
# Post unlock message and event
post_message(bus_path, "FILE_UNLOCK", worker_id, {"filepath": filepath})
Worker Status Heartbeat
Workers post heartbeats every 5 minutes:
def update_worker_status(bus_path: str, worker_id: str, task_id: str, status: str, progress: int):
status_file = f"{bus_path}/worker-status.json"
statuses = json.load(open(status_file)) if os.path.exists(status_file) else {}
statuses[worker_id] = {
"task_id": task_id,
"status": status, # "RUNNING" | "COMPLETE" | "FAILED" | "BLOCKED"
"progress_pct": progress,
"last_heartbeat": datetime.utcnow().isoformat() + "Z"
}
with open(status_file, "w") as f:
json.dump(statuses, f, indent=2)
Board Deliberation Protocol
Phase 1: Assessment
Each director posts their assessment:
def post_board_assessment(bus_path: str, director: str, assessment: dict):
board_path = f"{bus_path}/board"
# read_file existing assessments
assess_file = f"{board_path}/assessments.json"
assessments = json.load(open(assess_file)) if os.path.exists(assess_file) else {}
# Add this director's assessment
assessments[director] = assessment
with open(assess_file, "w") as f:
json.dump(assessments, f, indent=2)
# Post to main queue too
post_message(bus_path, "BOARD_ASSESS", director, assessment)
Phase 2: Discussion
Directors respond to each other:
def post_board_discussion(bus_path: str, from_dir: str, to_dir: str,
msg_type: str, message: str, changes_verdict: bool):
board_path = f"{bus_path}/board"
discussion_msg = {
"from": from_dir,
"to": to_dir,
"type": msg_type, # "CHALLENGE" | "AGREE" | "QUESTION" | "CLARIFY"
"message": message,
"changes_my_verdict": changes_verdict,
"timestamp": datetime.utcnow().isoformat() + "Z"
}
# Append to discussion log
with open(f"{board_path}/discussion.jsonl", "a") as f:
f.write_file(json.dumps(discussion_msg) + "\n")
# Post to main queue
post_message(bus_path, "BOARD_DISCUSS", from_dir, discussion_msg)
Phase 3: Voting
Directors cast final votes:
def post_board_vote(bus_path: str, director: str, verdict: str,
confidence: float, conditions: list):
board_path = f"{bus_path}/board"
votes_file = f"{board_path}/votes.json"
votes = json.load(open(votes_file)) if os.path.exists(votes_file) else {}
votes[director] = {
"final_verdict": verdict, # "APPROVE" | "REJECT"
"confidence": confidence, # 0.0 - 1.0
"conditions": conditions,
"timestamp": datetime.utcnow().isoformat() + "Z"
}
with open(votes_file, "w") as f:
json.dump(votes, f, indent=2)
post_message(bus_path, "BOARD_VOTE", director, votes[director])
Phase 4: Resolution
Orchestrator aggregates votes:
def resolve_board_vote(bus_path: str) -> dict:
board_path = f"{bus_path}/board"
votes = json.load(open(f"{board_path}/votes.json"))
approve_count = sum(1 for v in votes.values() if v["final_verdict"] == "APPROVE")
reject_count = len(votes) - approve_count
# Determine verdict
if approve_count >= 4:
verdict = "APPROVED"
elif approve_count == 3:
verdict = "APPROVED_WITH_REVIEW"
elif reject_count >= 4:
verdict = "REJECTED"
elif reject_count == 3:
verdict = "REJECTED"
else:
verdict = "ESCALATE"
# Collect conditions
all_conditions = []
for director, vote in votes.items():
for cond in vote.get("conditions", []):
all_conditions.append(f"{cond} ({director})")
resolution = {
"verdict": verdict,
"vote_summary": {d: v["final_verdict"] for d, v in votes.items()},
"conditions": all_conditions,
"timestamp": datetime.utcnow().isoformat() + "Z"
}
# Post resolution
post_message(bus_path, "BOARD_RESOLVE", "orchestrator", resolution)
# Create event file
Path(f"{bus_path}/events/BOARD_RESOLVE.event").touch()
return resolution
Deadlock Detection
Monitor for circular waits:
def detect_deadlock(bus_path: str) -> list:
"""Returns list of workers in deadlock cycle, or empty if none."""
status_file = f"{bus_path}/worker-status.json"
locks_file = f"{bus_path}/locks.json"
statuses = json.load(open(status_file)) if os.path.exists(status_file) else {}
locks = json.load(open(locks_file)) if os.path.exists(locks_file) else {}
# Build wait-for graph
# worker -> worker it's waiting for
wait_for = {}
# Find blocked workers
blocked_msgs = read_messages(bus_path, msg_type="BLOCKED")
for msg in blocked_msgs:
blocker = msg["payload"].get("waiting_for")
if blocker:
wait_for[msg["source"]] = blocker
# Detect cycles using DFS
def find_cycle(start, visited, path):
if start in path:
return path[path.index(start):]
if start in visited:
return []
visited.add(start)
path.append(start)
if start in wait_for:
cycle = find_cycle(wait_for[start], visited, path)
if cycle:
return cycle
path.pop()
return []
visited = set()
for worker in wait_for:
cycle = find_cycle(worker, visited, [])
if cycle:
return cycle
return []
Initialization
Initialize message bus for a track:
def init_message_bus(track_path: str):
bus_path = f"{track_path}/.message-bus"
# Create directories
os.makedirs(bus_path, exist_ok=True)
os.makedirs(f"{bus_path}/events", exist_ok=True)
os.makedirs(f"{bus_path}/board", exist_ok=True)
# Initialize files
Path(f"{bus_path}/queue.jsonl").touch()
with open(f"{bus_path}/locks.json", "w") as f:
json.dump({}, f)
with open(f"{bus_path}/worker-status.json", "w") as f:
json.dump({}, f)
with open(f"{bus_path}/board/assessments.json", "w") as f:
json.dump({}, f)
with open(f"{bus_path}/board/votes.json", "w") as f:
json.dump({}, f)
Path(f"{bus_path}/board/discussion.jsonl").touch()
Usage in Worker Agents
## Worker Protocol
1. **On Start**:
- read_file message bus for TASK_COMPLETE events of dependencies
- Verify all dependencies are met
- Update worker-status.json with RUNNING
2. **Before Modifying Files**:
- Call acquire_lock() for each file
- If lock fails, post BLOCKED message and wait
3. **During Execution**:
- Post PROGRESS every 5 minutes
- Update worker-status.json heartbeat
4. **On Completion**:
- Release all file locks
- Post TASK_COMPLETE with commit SHA and files modified
- Update worker-status.json with COMPLETE
5. **On Failure**:
- Release all file locks
- Post TASK_FAILED with error details
- Update worker-status.json with FAILED
Usage in Board Deliberation
## Board Protocol
1. **Phase 1 (ASSESS)**:
- All 5 directors read_file proposal
- Each posts BOARD_ASSESS to assessments.json
- Wait for all 5 assessments
2. **Phase 2 (DISCUSS)** -- 3 rounds:
- Directors read_file others' assessments
- Post BOARD_DISCUSS messages
- Respond to challenges and questions
3. **Phase 3 (VOTE)**:
- Each director posts BOARD_VOTE
- Include confidence level and conditions
4. **Phase 4 (RESOLVE)**:
- Orchestrator calls resolve_board_vote()
- Posts BOARD_RESOLVE
- Creates event file for completion
Board Session Management
Creating a Board Session
def create_board_session(bus_path: str, checkpoint: str, proposal: dict) -> str:
"""Initialize a new board session for deliberation."""
board_path = f"{bus_path}/board"
session_id = f"board-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}"
session = {
"session_id": session_id,
"checkpoint": checkpoint, # "EVALUATE_PLAN" | "EVALUATE_EXECUTION" | "PRE_LAUNCH"
"status": "ASSESSING",
"proposal": proposal,
"directors": ["CA", "CPO", "CSO", "COO", "CXO"],
"started_at": datetime.utcnow().isoformat() + "Z",
"phases": {
"assess": {"status": "IN_PROGRESS", "complete": 0, "of": 5},
"discuss": {"status": "NOT_STARTED", "rounds": 0, "max_rounds": 3},
"vote": {"status": "NOT_STARTED", "complete": 0, "of": 5},
"resolve": {"status": "NOT_STARTED"}
}
}
# Clear previous session data
with open(f"{board_path}/assessments.json", "w") as f:
json.dump({}, f, indent=2)
with open(f"{board_path}/votes.json", "w") as f:
json.dump({}, f, indent=2)
Path(f"{board_path}/discussion.jsonl").write_text("")
# Save session metadata
with open(f"{board_path}/session-{session_id}.json", "w") as f:
json.dump(session, f, indent=2)
return session_id
Checking Phase Completion
def check_board_phase_complete(bus_path: str, session_id: str) -> dict:
"""Check if current board phase is complete and advance if ready."""
board_path = f"{bus_path}/board"
session_file = f"{board_path}/session-{session_id}.json"
session = json.load(open(session_file))
assessments = json.load(open(f"{board_path}/assessments.json"))
votes = json.load(open(f"{board_path}/votes.json"))
discussions = []
with open(f"{board_path}/discussion.jsonl") as f:
discussions = [json.loads(l) for l in f if l.strip()]
result = {"phase": session["status"], "complete": False, "can_advance": False}
if session["status"] == "ASSESSING":
session["phases"]["assess"]["complete"] = len(assessments)
if len(assessments) >= 5:
result["complete"] = True
result["can_advance"] = True
result["next_phase"] = "DISCUSSING"
elif session["status"] == "DISCUSSING":
current_round = session["phases"]["discuss"]["rounds"]
if current_round >= 3:
result["complete"] = True
result["can_advance"] = True
result["next_phase"] = "VOTING"
elif session["status"] == "VOTING":
session["phases"]["vote"]["complete"] = len(votes)
if len(votes) >= 5:
result["complete"] = True
result["can_advance"] = True
result["next_phase"] = "RESOLVING"
# Save updated session
with open(session_file, "w") as f:
json.dump(session, f, indent=2)
return result
Advancing Board Phase
def advance_board_phase(bus_path: str, session_id: str) -> str:
"""Advance to next deliberation phase."""
board_path = f"{bus_path}/board"
session_file = f"{board_path}/session-{session_id}.json"
session = json.load(open(session_file))
transitions = {
"ASSESSING": "DISCUSSING",
"DISCUSSING": "VOTING",
"VOTING": "RESOLVING",
"RESOLVING": "COMPLETE"
}
current = session["status"]
next_phase = transitions.get(current, current)
session["status"] = next_phase
session["phases"][next_phase.lower().replace("ing", "")]["status"] = "IN_PROGRESS"
with open(session_file, "w") as f:
json.dump(session, f, indent=2)
return next_phase
Orchestrator Board Integration
Invoking Board from Orchestrator
async def invoke_board_meeting(
bus_path: str,
checkpoint: str,
proposal: str,
context: dict
) -> dict:
"""
Full 4-phase board deliberation.
Called by orchestrator at EVALUATE_PLAN or EVALUATE_EXECUTION checkpoints.
"""
# 1. Create session
session_id = create_board_session(bus_path, checkpoint, {
"proposal": proposal,
"context": context
})
# 2. Phase 1: ASSESS -- Dispatch all directors in parallel
director_prompts = {
"CA": f"Evaluate technical aspects: {proposal}",
"CPO": f"Evaluate product value: {proposal}",
"CSO": f"Evaluate security posture: {proposal}",
"COO": f"Evaluate operational feasibility: {proposal}",
"CXO": f"Evaluate user experience: {proposal}"
}
# Dispatch via parallel Task calls (see agent-factory)
assessments = await dispatch_board_directors(director_prompts, bus_path)
# Wait for all assessments
while check_board_phase_complete(bus_path, session_id)["complete"] == False:
await asyncio.sleep(5)
advance_board_phase(bus_path, session_id)
# 3. Phase 2: DISCUSS -- 3 rounds
for round_num in range(3):
await run_discussion_round(bus_path, session_id, round_num)
advance_board_phase(bus_path, session_id)
# 4. Phase 3: VOTE -- All directors vote
await dispatch_final_votes(bus_path, session_id)
while check_board_phase_complete(bus_path, session_id)["complete"] == False:
await asyncio.sleep(5)
advance_board_phase(bus_path, session_id)
# 5. Phase 4: RESOLVE
resolution = resolve_board_vote(bus_path)
return {
"session_id": session_id,
"verdict": resolution["verdict"],
"votes": resolution["vote_summary"],
"conditions": resolution["conditions"]
}
Quick Board Review (No Discussion)
async def invoke_board_review(bus_path: str, proposal: str) -> dict:
"""
Quick board review -- Phase 1 only, no discussion.
Used for execution quality checks or low-stakes decisions.
"""
session_id = create_board_session(bus_path, "QUICK_REVIEW", {
"proposal": proposal,
"quick_mode": True
})
# Dispatch all directors
await dispatch_board_directors(proposal, bus_path)
# Wait for assessments
while check_board_phase_complete(bus_path, session_id)["complete"] == False:
await asyncio.sleep(5)
# Aggregate assessments directly (skip discussion and vote)
board_path = f"{bus_path}/board"
assessments = json.load(open(f"{board_path}/assessments.json"))
approve_count = sum(1 for a in assessments.values()
if a["verdict"] in ["APPROVE", "CONCERNS"])
reject_count = len(assessments) - approve_count
return {
"session_id": session_id,
"verdict": "APPROVED" if approve_count >= 3 else "REJECTED",
"assessments": assessments,
"consensus": approve_count >= 4
}
Event-Driven Director Polling
Directors can poll for messages addressed to them:
def get_messages_for_director(bus_path: str, director: str) -> list:
"""Get all discussion messages addressed to this director."""
board_path = f"{bus_path}/board"
messages = []
with open(f"{board_path}/discussion.jsonl") as f:
for line in f:
if line.strip():
msg = json.loads(line)
if msg["to"] == director or msg["to"] == "ALL":
messages.append(msg)
return messages
Board Session Files
.message-bus/board/
├── session-board-20260201120000.json # Active session metadata
├── assessments.json # Phase 1: Director assessments
│ {
│ "CA": { "verdict": "APPROVE", "score": 8, "concerns": [...] },
│ "CPO": { "verdict": "CONCERNS", "score": 7, "concerns": [...] },
│ ...
│ }
├── discussion.jsonl # Phase 2: Discussion log
│ {"from": "CA", "to": "CPO", "type": "CHALLENGE", "message": "..."}
│ {"from": "CPO", "to": "CA", "type": "CLARIFY", "message": "..."}
├── votes.json # Phase 3: Final votes
│ {
│ "CA": { "final_verdict": "APPROVE", "confidence": 0.9 },
│ ...
│ }
└── resolution.md # Phase 4: Board decision
Weekly Installs
4
Repository
ibrahim-3d/cond…erpowersGitHub Stars
271
First Seen
11 days ago
Security Audits
Installed on
cline4
github-copilot4
codex4
kimi-cli4
gemini-cli4
cursor4