agent-factory

SKILL.md

Agent Factory -- Dynamic Worker Creation

Creates ephemeral worker agents from templates, specializing them based on task type.

Worker Creation Flow

Task from DAG -> Determine Type -> Select Template -> Substitute Placeholders -> Spawn Worker

Template Selection

Task Type Template Specialization
code code-worker.template.md TDD, code patterns, tests
ui ui-worker.template.md Design system, accessibility
integration integration-worker.template.md API contracts, error handling
test test-worker.template.md Coverage targets, test patterns
docs task-worker.template.md Base template
config task-worker.template.md Base template

CreateWorkerAgent Procedure

def create_worker_agent(task: dict, track_id: str, message_bus_path: str) -> dict:
    """
    Create a specialized worker agent for a task.

    Args:
        task: Task node from DAG (id, name, type, files, depends_on, acceptance)
        track_id: Current track identifier
        message_bus_path: Path to message bus directory

    Returns:
        dict with worker_id, skill_path, prompt
    """

    # 1. Generate unique worker ID
    timestamp = datetime.utcnow().strftime("%Y%m%d%H%M%S")
    worker_id = f"worker-{task['id']}-{timestamp}"

    # 2. Select template based on task type
    task_type = task.get('type', 'code')
    template_map = {
        'code': 'code-worker.template.md',
        'ui': 'ui-worker.template.md',
        'integration': 'integration-worker.template.md',
        'test': 'test-worker.template.md',
    }
    template_name = template_map.get(task_type, 'task-worker.template.md')
    template_path = f"${CLAUDE_PLUGIN_ROOT}/skills/worker-templates/{template_name}"

    # 3. read_file template
    template = read_file(template_path)

    # 4. Prepare substitution values
    substitutions = {
        '{task_id}': task['id'],
        '{task_name}': task['name'],
        '{track_id}': track_id,
        '{phase}': str(task.get('phase', 1)),
        '{files}': format_list(task.get('files', [])),
        '{depends_on}': format_list(task.get('depends_on', [])),
        '{acceptance}': task.get('acceptance', 'Complete the task as specified'),
        '{message_bus_path}': message_bus_path,
        '{timestamp}': timestamp,
        '{worker_id}': worker_id,
        '{unblocks}': format_list(find_unblocked_tasks(task['id'])),
    }

    # 5. Substitute placeholders
    worker_skill = template
    for placeholder, value in substitutions.items():
        worker_skill = worker_skill.replace(placeholder, value)

    # 6. Add task-specific instructions
    if task.get('task_instructions'):
        worker_skill = worker_skill.replace(
            '{task_instructions}',
            task['task_instructions']
        )
    else:
        worker_skill = worker_skill.replace(
            '{task_instructions}',
            f"Implement: {task['name']}\n\nAcceptance: {task.get('acceptance', 'N/A')}"
        )

    # 7. Add base protocol
    base_protocol = read_file("${CLAUDE_PLUGIN_ROOT}/skills/worker-templates/task-worker.template.md")
    base_protocol_section = extract_section(base_protocol, "## Execution Protocol")
    worker_skill = worker_skill.replace('{base_worker_protocol}', base_protocol_section)

    # 8. Create worker skill directory (ephemeral)
    worker_skill_path = f"${CLAUDE_PLUGIN_ROOT}/skills/workers/{worker_id}/SKILL.md"
    os.makedirs(os.path.dirname(worker_skill_path), exist_ok=True)
    write_file(worker_skill_path, worker_skill)

    # 9. Generate dispatch prompt
    dispatch_prompt = f"""You are worker agent {worker_id}.

Your task: {task['name']} (Task {task['id']})

MESSAGE BUS: {message_bus_path}

Follow your worker skill instructions at: {worker_skill_path}

Protocol:
1. Check dependencies via message bus
2. Acquire file locks before modifying
3. Post progress every 5 min
4. Post TASK_COMPLETE when done

Execute autonomously. Do NOT wait for user input."""

    return {
        'worker_id': worker_id,
        'skill_path': worker_skill_path,
        'prompt': dispatch_prompt,
        'task_id': task['id'],
        'task_type': task_type
    }

Batch Worker Creation

For parallel groups, create all workers at once:

def create_workers_for_parallel_group(
    parallel_group: dict,
    dag: dict,
    track_id: str,
    message_bus_path: str
) -> list:
    """
    Create workers for all tasks in a parallel group.

    Args:
        parallel_group: Parallel group definition (id, tasks, conflict_free)
        dag: Full DAG with all task nodes
        track_id: Current track identifier
        message_bus_path: Path to message bus

    Returns:
        List of worker definitions ready for dispatch
    """

    workers = []

    for task_id in parallel_group['tasks']:
        # Find task in DAG
        task = next((n for n in dag['nodes'] if n['id'] == task_id), None)
        if not task:
            continue

        # Create worker
        worker = create_worker_agent(task, track_id, message_bus_path)

        # Add coordination info if not conflict-free
        if not parallel_group.get('conflict_free', True):
            worker['requires_coordination'] = True
            worker['shared_resources'] = parallel_group.get('shared_resources', [])

        workers.append(worker)

    return workers

Worker Dispatch

Dispatch workers via parallel Task calls:

def dispatch_workers(workers: list) -> list:
    """
    Dispatch multiple workers in parallel using Task tool.

    Returns list of Task call results.
    """

    # Create Task calls for all workers
    task_calls = []
    for worker in workers:
        task_calls.append({
            'subagent_type': 'general-purpose',
            'description': f"Execute {worker['task_id']}: {worker.get('task_name', 'task')}",
            'prompt': worker['prompt'],
            'run_in_background': True  # Run in background for true parallelism
        })

    # Dispatch all at once (Claude Code handles parallel calls)
    results = []
    for call in task_calls:
        result = Task(**call)
        results.append(result)

    return results

Worker Cleanup

After task completion, cleanup worker artifacts:

def cleanup_worker(worker_id: str):
    """
    Remove ephemeral worker skill directory.
    Called by orchestrator after worker reports completion.
    """

    worker_skill_path = f"${CLAUDE_PLUGIN_ROOT}/skills/workers/{worker_id}"

    if os.path.exists(worker_skill_path):
        shutil.rmtree(worker_skill_path)

    # Log cleanup
    print(f"Cleaned up worker: {worker_id}")

Cleanup All Workers

After parallel group completes:

def cleanup_parallel_group_workers(parallel_group_id: str, workers: list):
    """
    Cleanup all workers from a completed parallel group.
    """

    for worker in workers:
        cleanup_worker(worker['worker_id'])

    # Remove workers directory if empty
    workers_dir = "${CLAUDE_PLUGIN_ROOT}/skills/workers"
    if os.path.exists(workers_dir) and not os.listdir(workers_dir):
        os.rmdir(workers_dir)

Helper Functions

def format_list(items: list) -> str:
    """Format list for template substitution."""
    if not items:
        return "None"
    return "\n".join(f"- {item}" for item in items)


def find_unblocked_tasks(task_id: str, dag: dict) -> list:
    """Find tasks that will be unblocked when task_id completes."""
    unblocked = []
    for node in dag.get('nodes', []):
        if task_id in node.get('depends_on', []):
            # Check if this is the only remaining dependency
            remaining_deps = [d for d in node['depends_on'] if d != task_id]
            if not remaining_deps:
                unblocked.append(node['id'])
    return unblocked


def extract_section(content: str, section_header: str) -> str:
    """Extract a section from markdown content."""
    lines = content.split('\n')
    in_section = False
    section_lines = []

    for line in lines:
        if line.startswith(section_header):
            in_section = True
            continue
        elif in_section and line.startswith('## '):
            break
        elif in_section:
            section_lines.append(line)

    return '\n'.join(section_lines).strip()

Integration with Orchestrator

The orchestrator calls the agent factory during PARALLEL_EXECUTE:

# In conductor-orchestrator

async def execute_parallel_phase(phase: Phase, dag: dict):
    # 1. Get parallel groups for this phase
    parallel_groups = [
        pg for pg in dag.get('parallel_groups', [])
        if all(task_in_phase(t, phase) for t in pg['tasks'])
    ]

    for pg in parallel_groups:
        # 2. Create workers via agent factory
        workers = create_workers_for_parallel_group(
            pg, dag, track_id, message_bus_path
        )

        # 3. Dispatch workers in parallel
        results = dispatch_workers(workers)

        # 4. Monitor message bus for completion
        await wait_for_group_completion(pg, message_bus_path)

        # 5. Cleanup workers
        cleanup_parallel_group_workers(pg['id'], workers)

Worker Lifecycle

+---------------------------------------------------------------+
|                      WORKER LIFECYCLE                          |
|                                                                |
|  1. CREATE                                                     |
|     Agent Factory -> Template -> Substitution -> Skill Dir     |
|                                                                |
|  2. DISPATCH                                                   |
|     Orchestrator -> Task(prompt, run_in_background) -> Worker  |
|                                                                |
|  3. EXECUTE                                                    |
|     Worker -> Check Deps -> Lock Files -> Implement -> Commit  |
|                                                                |
|  4. REPORT                                                     |
|     Worker -> Message Bus -> TASK_COMPLETE/TASK_FAILED         |
|                                                                |
|  5. CLEANUP                                                    |
|     Orchestrator -> cleanup_worker() -> Remove Skill Dir       |
|                                                                |
+---------------------------------------------------------------+

Error Handling

def handle_worker_failure(worker: dict, error: str, message_bus_path: str):
    """
    Handle worker failure gracefully.

    1. Post failure to message bus
    2. Release any held locks
    3. Cleanup worker artifacts
    4. Notify orchestrator
    """

    # Post failure message
    post_message(message_bus_path, "TASK_FAILED", worker['worker_id'], {
        "task_id": worker['task_id'],
        "error": error
    })

    # Release all locks held by this worker
    release_all_locks_for_worker(message_bus_path, worker['worker_id'])

    # Cleanup worker
    cleanup_worker(worker['worker_id'])
Weekly Installs
4
GitHub Stars
271
First Seen
11 days ago
Installed on
cline4
gemini-cli4
github-copilot4
codex4
kimi-cli4
cursor4