Lesson 2 of 46 ~25 min
Course progress
0%

Agent Teams Architecture

How agent teams work under the hood — the coordination protocol, task distribution, result merging, and the fan-out/fan-in execution model.

Agent teams in Opus 4.6 let you decompose a complex task into independent subtasks, dispatch each to a separate Claude agent running in parallel, and merge the results into a single coherent output. This is not prompt chaining — each agent operates with its own context window, system prompt, and tool access.

The Coordination Protocol

Every agent team execution follows a three-phase protocol:

graph TD
    C[Coordinator] -->|1. Fan-Out| A1[Agent 1]
    C -->|1. Fan-Out| A2[Agent 2]
    C -->|1. Fan-Out| A3[Agent 3]
    A1 -->|2. Execute| R1[Result 1]
    A2 -->|2. Execute| R2[Result 2]
    A3 -->|2. Execute| R3[Result 3]
    R1 -->|3. Fan-In| M[Merger]
    R2 -->|3. Fan-In| M
    R3 -->|3. Fan-In| M
    M --> F[Final Output]

Phase 1 — Fan-Out: The coordinator decomposes the task into subtask definitions. Each subtask includes a system prompt, user message, and optional tool configuration. Subtasks are dispatched concurrently.

Phase 2 — Execute: Each agent processes its subtask independently. Agents have no awareness of each other. They cannot communicate during execution.

Phase 3 — Fan-In: Results from all agents are collected. A merger step — either deterministic or LLM-based — combines them into the final output.

Task Definition Structure

Each subtask is a self-contained unit that tells one agent exactly what to do:

from dataclasses import dataclass, field

@dataclass
class SubTask:
    """Definition of a single agent's work unit."""
    agent_id: str
    system_prompt: str
    user_message: str
    model: str = "claude-opus-4-6-20260205"
    max_tokens: int = 4096
    thinking: dict = field(default_factory=lambda: {"type": "adaptive"})
    tools: list[dict] = field(default_factory=list)
    timeout_seconds: int = 120

@dataclass
class TeamTask:
    """Complete definition of a multi-agent task."""
    task_id: str
    subtasks: list[SubTask]
    merge_strategy: str = "llm"  # "llm", "concatenate", "vote"
    merge_prompt: str | None = None

Launching an Agent Team

The core execution engine dispatches subtasks concurrently using asyncio:

import asyncio
import anthropic
from dataclasses import dataclass

@dataclass
class AgentResult:
    agent_id: str
    content: str
    thinking: str | None
    usage: dict
    latency_ms: int
    error: str | None = None

class AgentTeam:
    def __init__(self):
        self.client = anthropic.AsyncAnthropic()

    async def _run_agent(self, subtask: SubTask) -> AgentResult:
        """Execute a single agent's subtask."""
        import time
        start = time.monotonic()

        try:
            response = await asyncio.wait_for(
                self.client.messages.create(
                    model=subtask.model,
                    max_tokens=subtask.max_tokens,
                    thinking=subtask.thinking,
                    system=subtask.system_prompt,
                    messages=[{"role": "user", "content": subtask.user_message}],
                ),
                timeout=subtask.timeout_seconds,
            )

            elapsed = int((time.monotonic() - start) * 1000)
            text = next(
                (b.text for b in response.content if b.type == "text"), ""
            )
            thinking = next(
                (b.thinking for b in response.content if b.type == "thinking"),
                None,
            )

            return AgentResult(
                agent_id=subtask.agent_id,
                content=text,
                thinking=thinking,
                usage={
                    "input_tokens": response.usage.input_tokens,
                    "output_tokens": response.usage.output_tokens,
                },
                latency_ms=elapsed,
            )
        except Exception as e:
            elapsed = int((time.monotonic() - start) * 1000)
            return AgentResult(
                agent_id=subtask.agent_id,
                content="",
                thinking=None,
                usage={},
                latency_ms=elapsed,
                error=str(e),
            )

    async def run(self, task: TeamTask) -> list[AgentResult]:
        """Execute all subtasks in parallel."""
        results = await asyncio.gather(
            *[self._run_agent(st) for st in task.subtasks]
        )
        return list(results)

Result Merging

The merge phase is where raw agent outputs become a unified answer. Three strategies cover most use cases:

class ResultMerger:
    def __init__(self):
        self.client = anthropic.Anthropic()

    def merge(self, results: list[AgentResult], strategy: str,
              merge_prompt: str | None = None) -> str:
        if strategy == "concatenate":
            return self._concatenate(results)
        elif strategy == "vote":
            return self._majority_vote(results)
        elif strategy == "llm":
            return self._llm_merge(results, merge_prompt)
        raise ValueError(f"Unknown strategy: {strategy}")

    def _concatenate(self, results: list[AgentResult]) -> str:
        """Simple concatenation with headers."""
        sections = []
        for r in results:
            if r.error:
                sections.append(f"## {r.agent_id} (FAILED)\n{r.error}")
            else:
                sections.append(f"## {r.agent_id}\n{r.content}")
        return "\n\n".join(sections)

    def _majority_vote(self, results: list[AgentResult]) -> str:
        """Return the most common answer (for classification tasks)."""
        from collections import Counter
        answers = [r.content.strip() for r in results if not r.error]
        if not answers:
            raise RuntimeError("All agents failed")
        winner, count = Counter(answers).most_common(1)[0]
        return f"{winner} ({count}/{len(answers)} agents agreed)"

    def _llm_merge(self, results: list[AgentResult],
                   merge_prompt: str | None) -> str:
        """Use Claude to intelligently merge agent outputs."""
        agent_outputs = "\n\n".join(
            f"### {r.agent_id}\n{r.content}"
            for r in results if not r.error
        )
        prompt = merge_prompt or (
            "Merge the following agent outputs into a single coherent response. "
            "Remove duplicates, resolve conflicts, and present a unified answer."
        )
        response = self.client.messages.create(
            model="claude-opus-4-6-20260205",
            max_tokens=8192,
            thinking={"type": "adaptive"},
            messages=[{
                "role": "user",
                "content": f"{prompt}\n\n{agent_outputs}",
            }],
        )
        return next(b.text for b in response.content if b.type == "text")

How Agents Share a Common Task Definition

Agents in a team do not share memory or state. Instead, they share a common task definition — a structured input that the coordinator prepares before fan-out. The coordinator is responsible for:

  1. Partitioning the input — splitting a large codebase into files, a document into sections, or a dataset into chunks.
  2. Specializing the system prompt — each agent gets the same base instructions plus a specialization overlay (e.g., “focus on security” vs. “focus on performance”).
  3. Standardizing the output format — all agents must return results in the same structure so the merger can process them uniformly.
def create_review_team(code: str) -> TeamTask:
    """Create a team task where agents share the same code but different focus areas."""
    base_instruction = (
        "You are reviewing the following code. Return findings as a JSON array "
        "with objects containing 'severity', 'line', 'issue', and 'suggestion'."
    )
    focuses = {
        "security-agent": "Focus exclusively on security vulnerabilities.",
        "perf-agent": "Focus exclusively on performance issues.",
        "quality-agent": "Focus exclusively on code quality and maintainability.",
    }

    subtasks = [
        SubTask(
            agent_id=agent_id,
            system_prompt=f"{base_instruction}\n{focus}",
            user_message=code,
        )
        for agent_id, focus in focuses.items()
    ]

    return TeamTask(
        task_id="code-review",
        subtasks=subtasks,
        merge_strategy="llm",
        merge_prompt=(
            "Merge these code review findings. Deduplicate issues found by "
            "multiple agents. Sort by severity (critical > high > medium > low)."
        ),
    )

Execution Characteristics

PropertyValue
Max concurrent agentsLimited by API rate limits, not protocol
Agent-to-agent communicationNone — agents are fully isolated
Shared stateNone — only the coordinator sees all results
Failure isolationOne agent failing does not affect others
Total latencyMax latency of slowest agent + merge time
Total costSum of all agent costs + merge cost

The key insight: agent teams trade cost for latency. Running three agents in parallel costs 3× as much as running one agent, but completes in roughly the same wall-clock time as a single agent.

In the next lesson, you will learn about different team topologies and when to use each one.