Agent teams in Opus 4.6 let you decompose a complex task into independent subtasks, dispatch each to a separate Claude agent running in parallel, and merge the results into a single coherent output. This is not prompt chaining — each agent operates with its own context window, system prompt, and tool access.
The Coordination Protocol
Every agent team execution follows a three-phase protocol:
graph TD
C[Coordinator] -->|1. Fan-Out| A1[Agent 1]
C -->|1. Fan-Out| A2[Agent 2]
C -->|1. Fan-Out| A3[Agent 3]
A1 -->|2. Execute| R1[Result 1]
A2 -->|2. Execute| R2[Result 2]
A3 -->|2. Execute| R3[Result 3]
R1 -->|3. Fan-In| M[Merger]
R2 -->|3. Fan-In| M
R3 -->|3. Fan-In| M
M --> F[Final Output]
Phase 1 — Fan-Out: The coordinator decomposes the task into subtask definitions. Each subtask includes a system prompt, user message, and optional tool configuration. Subtasks are dispatched concurrently.
Phase 2 — Execute: Each agent processes its subtask independently. Agents have no awareness of each other. They cannot communicate during execution.
Phase 3 — Fan-In: Results from all agents are collected. A merger step — either deterministic or LLM-based — combines them into the final output.
Task Definition Structure
Each subtask is a self-contained unit that tells one agent exactly what to do:
from dataclasses import dataclass, field
@dataclass
class SubTask:
"""Definition of a single agent's work unit."""
agent_id: str
system_prompt: str
user_message: str
model: str = "claude-opus-4-6-20260205"
max_tokens: int = 4096
thinking: dict = field(default_factory=lambda: {"type": "adaptive"})
tools: list[dict] = field(default_factory=list)
timeout_seconds: int = 120
@dataclass
class TeamTask:
"""Complete definition of a multi-agent task."""
task_id: str
subtasks: list[SubTask]
merge_strategy: str = "llm" # "llm", "concatenate", "vote"
merge_prompt: str | None = None
Launching an Agent Team
The core execution engine dispatches subtasks concurrently using asyncio:
import asyncio
import anthropic
from dataclasses import dataclass
@dataclass
class AgentResult:
agent_id: str
content: str
thinking: str | None
usage: dict
latency_ms: int
error: str | None = None
class AgentTeam:
def __init__(self):
self.client = anthropic.AsyncAnthropic()
async def _run_agent(self, subtask: SubTask) -> AgentResult:
"""Execute a single agent's subtask."""
import time
start = time.monotonic()
try:
response = await asyncio.wait_for(
self.client.messages.create(
model=subtask.model,
max_tokens=subtask.max_tokens,
thinking=subtask.thinking,
system=subtask.system_prompt,
messages=[{"role": "user", "content": subtask.user_message}],
),
timeout=subtask.timeout_seconds,
)
elapsed = int((time.monotonic() - start) * 1000)
text = next(
(b.text for b in response.content if b.type == "text"), ""
)
thinking = next(
(b.thinking for b in response.content if b.type == "thinking"),
None,
)
return AgentResult(
agent_id=subtask.agent_id,
content=text,
thinking=thinking,
usage={
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
},
latency_ms=elapsed,
)
except Exception as e:
elapsed = int((time.monotonic() - start) * 1000)
return AgentResult(
agent_id=subtask.agent_id,
content="",
thinking=None,
usage={},
latency_ms=elapsed,
error=str(e),
)
async def run(self, task: TeamTask) -> list[AgentResult]:
"""Execute all subtasks in parallel."""
results = await asyncio.gather(
*[self._run_agent(st) for st in task.subtasks]
)
return list(results)
Result Merging
The merge phase is where raw agent outputs become a unified answer. Three strategies cover most use cases:
class ResultMerger:
def __init__(self):
self.client = anthropic.Anthropic()
def merge(self, results: list[AgentResult], strategy: str,
merge_prompt: str | None = None) -> str:
if strategy == "concatenate":
return self._concatenate(results)
elif strategy == "vote":
return self._majority_vote(results)
elif strategy == "llm":
return self._llm_merge(results, merge_prompt)
raise ValueError(f"Unknown strategy: {strategy}")
def _concatenate(self, results: list[AgentResult]) -> str:
"""Simple concatenation with headers."""
sections = []
for r in results:
if r.error:
sections.append(f"## {r.agent_id} (FAILED)\n{r.error}")
else:
sections.append(f"## {r.agent_id}\n{r.content}")
return "\n\n".join(sections)
def _majority_vote(self, results: list[AgentResult]) -> str:
"""Return the most common answer (for classification tasks)."""
from collections import Counter
answers = [r.content.strip() for r in results if not r.error]
if not answers:
raise RuntimeError("All agents failed")
winner, count = Counter(answers).most_common(1)[0]
return f"{winner} ({count}/{len(answers)} agents agreed)"
def _llm_merge(self, results: list[AgentResult],
merge_prompt: str | None) -> str:
"""Use Claude to intelligently merge agent outputs."""
agent_outputs = "\n\n".join(
f"### {r.agent_id}\n{r.content}"
for r in results if not r.error
)
prompt = merge_prompt or (
"Merge the following agent outputs into a single coherent response. "
"Remove duplicates, resolve conflicts, and present a unified answer."
)
response = self.client.messages.create(
model="claude-opus-4-6-20260205",
max_tokens=8192,
thinking={"type": "adaptive"},
messages=[{
"role": "user",
"content": f"{prompt}\n\n{agent_outputs}",
}],
)
return next(b.text for b in response.content if b.type == "text")
How Agents Share a Common Task Definition
Agents in a team do not share memory or state. Instead, they share a common task definition — a structured input that the coordinator prepares before fan-out. The coordinator is responsible for:
- Partitioning the input — splitting a large codebase into files, a document into sections, or a dataset into chunks.
- Specializing the system prompt — each agent gets the same base instructions plus a specialization overlay (e.g., “focus on security” vs. “focus on performance”).
- Standardizing the output format — all agents must return results in the same structure so the merger can process them uniformly.
def create_review_team(code: str) -> TeamTask:
"""Create a team task where agents share the same code but different focus areas."""
base_instruction = (
"You are reviewing the following code. Return findings as a JSON array "
"with objects containing 'severity', 'line', 'issue', and 'suggestion'."
)
focuses = {
"security-agent": "Focus exclusively on security vulnerabilities.",
"perf-agent": "Focus exclusively on performance issues.",
"quality-agent": "Focus exclusively on code quality and maintainability.",
}
subtasks = [
SubTask(
agent_id=agent_id,
system_prompt=f"{base_instruction}\n{focus}",
user_message=code,
)
for agent_id, focus in focuses.items()
]
return TeamTask(
task_id="code-review",
subtasks=subtasks,
merge_strategy="llm",
merge_prompt=(
"Merge these code review findings. Deduplicate issues found by "
"multiple agents. Sort by severity (critical > high > medium > low)."
),
)
Execution Characteristics
| Property | Value |
|---|---|
| Max concurrent agents | Limited by API rate limits, not protocol |
| Agent-to-agent communication | None — agents are fully isolated |
| Shared state | None — only the coordinator sees all results |
| Failure isolation | One agent failing does not affect others |
| Total latency | Max latency of slowest agent + merge time |
| Total cost | Sum of all agent costs + merge cost |
The key insight: agent teams trade cost for latency. Running three agents in parallel costs 3× as much as running one agent, but completes in roughly the same wall-clock time as a single agent.
In the next lesson, you will learn about different team topologies and when to use each one.