Not every multi-agent problem looks the same. A code review needs parallel independent analysis. A data pipeline needs sequential stages. A safety-critical classification needs multiple agents to agree. Choosing the wrong topology wastes tokens and adds complexity without benefit.
The Four Topologies
graph TB
subgraph Fan-Out / Fan-In
FO_C[Coordinator] --> FO_A1[Agent 1]
FO_C --> FO_A2[Agent 2]
FO_C --> FO_A3[Agent 3]
FO_A1 --> FO_M[Merger]
FO_A2 --> FO_M
FO_A3 --> FO_M
end
subgraph Pipeline
P1[Stage 1] --> P2[Stage 2] --> P3[Stage 3] --> P4[Stage 4]
end
subgraph Consensus
CO_I[Input] --> CO_A1[Agent A]
CO_I --> CO_A2[Agent B]
CO_I --> CO_A3[Agent C]
CO_A1 --> CO_V[Vote]
CO_A2 --> CO_V
CO_A3 --> CO_V
end
subgraph Specialist
SP_R[Router] --> SP_S1[Security Expert]
SP_R --> SP_P1[Performance Expert]
SP_R --> SP_D1[Domain Expert]
end
Topology Decision Matrix
| Criterion | Fan-Out/Fan-In | Pipeline | Consensus | Specialist |
|---|---|---|---|---|
| Task independence | Subtasks fully independent | Each stage depends on previous | Same task, independent execution | Different tasks, different expertise |
| Latency | Min (parallel) | Sum of all stages | Min (parallel) | Single agent latency |
| Cost | N × single agent | N × single agent | N × single agent | 1× (only one expert runs) |
| Best for | Code review, bulk analysis | Document processing, ETL | Safety-critical classification | Multi-domain routing |
| Failure mode | Partial results possible | Pipeline stalls | Tie-breaking needed | Router misclassification |
| Merge complexity | Medium | Low (chained) | Low (voting) | None |
1. Fan-Out / Fan-In
Each agent works on the same input with a different lens. Results are merged afterward. This is the most common topology.
import asyncio
import anthropic
class FanOutFanIn:
def __init__(self):
self.client = anthropic.AsyncAnthropic()
async def run(self, input_data: str,
perspectives: dict[str, str]) -> dict[str, str]:
"""
Run the same input through multiple agents with different system prompts.
Args:
input_data: The shared input all agents receive.
perspectives: Mapping of agent_id -> system prompt specialization.
"""
async def _call(agent_id: str, system: str) -> tuple[str, str]:
response = await self.client.messages.create(
model="claude-opus-4-6-20260205",
max_tokens=4096,
thinking={"type": "adaptive"},
system=system,
messages=[{"role": "user", "content": input_data}],
)
text = next(b.text for b in response.content if b.type == "text")
return agent_id, text
tasks = [_call(aid, sys) for aid, sys in perspectives.items()]
results = await asyncio.gather(*tasks)
return dict(results)
# Usage
perspectives = {
"security": "You are a security auditor. Report only security issues.",
"performance": "You are a performance engineer. Report only performance issues.",
"readability": "You are a code reviewer. Report only readability issues.",
}
results = asyncio.run(
FanOutFanIn().run(source_code, perspectives)
)
2. Pipeline
Each stage transforms or enriches the output of the previous stage. Use when tasks have natural sequential dependencies.
class Pipeline:
def __init__(self):
self.client = anthropic.AsyncAnthropic()
async def run(self, initial_input: str,
stages: list[dict]) -> str:
"""
Process input through sequential stages.
Each stage dict has:
- name: str
- system: str (system prompt for this stage)
- max_tokens: int
"""
current_input = initial_input
for stage in stages:
response = await self.client.messages.create(
model="claude-opus-4-6-20260205",
max_tokens=stage.get("max_tokens", 4096),
thinking={"type": "adaptive"},
system=stage["system"],
messages=[{"role": "user", "content": current_input}],
)
current_input = next(
b.text for b in response.content if b.type == "text"
)
return current_input
# Usage — document processing pipeline
stages = [
{
"name": "extract",
"system": "Extract all structured data from this document. "
"Return as JSON with fields: entities, dates, amounts, relationships.",
},
{
"name": "validate",
"system": "Validate the extracted data. Check for inconsistencies, "
"missing fields, and logical errors. Return corrected JSON.",
},
{
"name": "enrich",
"system": "Enrich the validated data with inferred relationships and "
"computed fields. Add confidence scores to each field.",
},
{
"name": "summarize",
"system": "Generate a human-readable summary of the enriched data. "
"Highlight key findings and flag any low-confidence items.",
},
]
result = asyncio.run(Pipeline().run(raw_document, stages))
3. Consensus
Multiple agents answer the same question independently. Their answers are compared to produce a high-confidence result. Essential for safety-critical decisions.
class Consensus:
def __init__(self, agent_count: int = 3):
self.client = anthropic.AsyncAnthropic()
self.sync_client = anthropic.Anthropic()
self.agent_count = agent_count
async def run(self, question: str, system: str) -> dict:
"""Run multiple agents on the same question, then vote."""
async def _call(agent_id: int) -> str:
response = await self.client.messages.create(
model="claude-opus-4-6-20260205",
max_tokens=2048,
thinking={"type": "adaptive"},
system=system,
messages=[{"role": "user", "content": question}],
)
return next(b.text for b in response.content if b.type == "text")
answers = await asyncio.gather(
*[_call(i) for i in range(self.agent_count)]
)
# Use a final agent to adjudicate
adjudication_prompt = (
f"You received {self.agent_count} independent answers to the same "
f"question. Determine the consensus answer.\n\n"
+ "\n\n".join(
f"**Agent {i+1}:** {a}" for i, a in enumerate(answers)
)
+ "\n\nProvide the consensus answer. If agents disagree, explain "
"the disagreement and state which answer has the strongest reasoning."
)
verdict = self.sync_client.messages.create(
model="claude-opus-4-6-20260205",
max_tokens=4096,
thinking={"type": "adaptive", "effort": "deep"},
messages=[{"role": "user", "content": adjudication_prompt}],
)
return {
"individual_answers": list(answers),
"consensus": next(
b.text for b in verdict.content if b.type == "text"
),
}
# Usage — safety-critical classification
result = asyncio.run(
Consensus(agent_count=3).run(
question="Should this transaction be flagged as potentially fraudulent?",
system="You are a fraud detection analyst. Respond with FRAUD or LEGITIMATE "
"followed by your reasoning.",
)
)
4. Specialist
A router examines the input and dispatches it to the single most qualified agent. Only one agent runs, so cost equals a single-agent call.
class Specialist:
def __init__(self):
self.client = anthropic.Anthropic()
self.specialists = {
"security": (
"You are a security specialist. Analyze code for vulnerabilities, "
"injection risks, authentication issues, and data exposure."
),
"performance": (
"You are a performance engineer. Analyze code for algorithmic "
"complexity, memory leaks, unnecessary allocations, and I/O bottlenecks."
),
"database": (
"You are a database specialist. Analyze queries for N+1 problems, "
"missing indexes, transaction issues, and schema design flaws."
),
"general": (
"You are a senior software engineer. Provide a general code review."
),
}
def route(self, query: str) -> str:
"""Use a cheap model call to classify the query domain."""
response = self.client.messages.create(
model="claude-opus-4-6-20260205",
max_tokens=50,
thinking={"type": "none"},
system=(
"Classify the following query into exactly one category: "
"security, performance, database, general. "
"Respond with only the category name."
),
messages=[{"role": "user", "content": query[:500]}],
)
category = next(
b.text for b in response.content if b.type == "text"
).strip().lower()
return category if category in self.specialists else "general"
def run(self, query: str) -> dict:
"""Route to the right specialist and execute."""
category = self.route(query)
system = self.specialists[category]
response = self.client.messages.create(
model="claude-opus-4-6-20260205",
max_tokens=8192,
thinking={"type": "adaptive", "effort": "deep"},
system=system,
messages=[{"role": "user", "content": query}],
)
return {
"routed_to": category,
"response": next(
b.text for b in response.content if b.type == "text"
),
}
Choosing the Right Topology
Use this decision tree:
-
Can subtasks run independently?
- Yes → Are you looking for one best answer or combining multiple perspectives?
- One best answer → Consensus
- Multiple perspectives → Fan-Out/Fan-In
- No → Pipeline
- Yes → Are you looking for one best answer or combining multiple perspectives?
-
Does the input domain vary widely?
- Yes → Specialist (with router)
-
Is the decision safety-critical?
- Yes → Consensus (always, regardless of other factors)
Combining Topologies
Real systems often combine patterns. A common production architecture:
graph TD
Input[Input] --> Router[Specialist Router]
Router -->|Security query| SEC[Security Pipeline]
Router -->|Code review| CR[Fan-Out Review]
Router -->|Classification| CL[Consensus Classifier]
SEC --> S1[Vulnerability Scan] --> S2[Impact Assessment] --> S3[Remediation]
CR --> CR1[Style Agent]
CR --> CR2[Logic Agent]
CR --> CR3[Perf Agent]
CR1 --> Merge[Merge Findings]
CR2 --> Merge
CR3 --> Merge
CL --> V1[Agent A]
CL --> V2[Agent B]
CL --> V3[Agent C]
V1 --> Vote[Adjudicate]
V2 --> Vote
V3 --> Vote
In the next lesson, you will build a complete multi-agent code review pipeline using the Fan-Out/Fan-In topology.