Lesson 14 of 46 ~25 min
Course progress
0%

Agent Team Topologies

Four team patterns for multi-agent systems — Fan-Out/Fan-In, Pipeline, Consensus, and Specialist — with a decision matrix and implementation for each.

Not every multi-agent problem looks the same. A code review needs parallel independent analysis. A data pipeline needs sequential stages. A safety-critical classification needs multiple agents to agree. Choosing the wrong topology wastes tokens and adds complexity without benefit.

The Four Topologies

graph TB
    subgraph Fan-Out / Fan-In
        FO_C[Coordinator] --> FO_A1[Agent 1]
        FO_C --> FO_A2[Agent 2]
        FO_C --> FO_A3[Agent 3]
        FO_A1 --> FO_M[Merger]
        FO_A2 --> FO_M
        FO_A3 --> FO_M
    end

    subgraph Pipeline
        P1[Stage 1] --> P2[Stage 2] --> P3[Stage 3] --> P4[Stage 4]
    end

    subgraph Consensus
        CO_I[Input] --> CO_A1[Agent A]
        CO_I --> CO_A2[Agent B]
        CO_I --> CO_A3[Agent C]
        CO_A1 --> CO_V[Vote]
        CO_A2 --> CO_V
        CO_A3 --> CO_V
    end

    subgraph Specialist
        SP_R[Router] --> SP_S1[Security Expert]
        SP_R --> SP_P1[Performance Expert]
        SP_R --> SP_D1[Domain Expert]
    end

Topology Decision Matrix

CriterionFan-Out/Fan-InPipelineConsensusSpecialist
Task independenceSubtasks fully independentEach stage depends on previousSame task, independent executionDifferent tasks, different expertise
LatencyMin (parallel)Sum of all stagesMin (parallel)Single agent latency
CostN × single agentN × single agentN × single agent1× (only one expert runs)
Best forCode review, bulk analysisDocument processing, ETLSafety-critical classificationMulti-domain routing
Failure modePartial results possiblePipeline stallsTie-breaking neededRouter misclassification
Merge complexityMediumLow (chained)Low (voting)None

1. Fan-Out / Fan-In

Each agent works on the same input with a different lens. Results are merged afterward. This is the most common topology.

import asyncio
import anthropic

class FanOutFanIn:
    def __init__(self):
        self.client = anthropic.AsyncAnthropic()

    async def run(self, input_data: str,
                  perspectives: dict[str, str]) -> dict[str, str]:
        """
        Run the same input through multiple agents with different system prompts.

        Args:
            input_data: The shared input all agents receive.
            perspectives: Mapping of agent_id -> system prompt specialization.
        """
        async def _call(agent_id: str, system: str) -> tuple[str, str]:
            response = await self.client.messages.create(
                model="claude-opus-4-6-20260205",
                max_tokens=4096,
                thinking={"type": "adaptive"},
                system=system,
                messages=[{"role": "user", "content": input_data}],
            )
            text = next(b.text for b in response.content if b.type == "text")
            return agent_id, text

        tasks = [_call(aid, sys) for aid, sys in perspectives.items()]
        results = await asyncio.gather(*tasks)
        return dict(results)

# Usage
perspectives = {
    "security": "You are a security auditor. Report only security issues.",
    "performance": "You are a performance engineer. Report only performance issues.",
    "readability": "You are a code reviewer. Report only readability issues.",
}
results = asyncio.run(
    FanOutFanIn().run(source_code, perspectives)
)

2. Pipeline

Each stage transforms or enriches the output of the previous stage. Use when tasks have natural sequential dependencies.

class Pipeline:
    def __init__(self):
        self.client = anthropic.AsyncAnthropic()

    async def run(self, initial_input: str,
                  stages: list[dict]) -> str:
        """
        Process input through sequential stages.

        Each stage dict has:
            - name: str
            - system: str (system prompt for this stage)
            - max_tokens: int
        """
        current_input = initial_input

        for stage in stages:
            response = await self.client.messages.create(
                model="claude-opus-4-6-20260205",
                max_tokens=stage.get("max_tokens", 4096),
                thinking={"type": "adaptive"},
                system=stage["system"],
                messages=[{"role": "user", "content": current_input}],
            )
            current_input = next(
                b.text for b in response.content if b.type == "text"
            )

        return current_input

# Usage — document processing pipeline
stages = [
    {
        "name": "extract",
        "system": "Extract all structured data from this document. "
                  "Return as JSON with fields: entities, dates, amounts, relationships.",
    },
    {
        "name": "validate",
        "system": "Validate the extracted data. Check for inconsistencies, "
                  "missing fields, and logical errors. Return corrected JSON.",
    },
    {
        "name": "enrich",
        "system": "Enrich the validated data with inferred relationships and "
                  "computed fields. Add confidence scores to each field.",
    },
    {
        "name": "summarize",
        "system": "Generate a human-readable summary of the enriched data. "
                  "Highlight key findings and flag any low-confidence items.",
    },
]

result = asyncio.run(Pipeline().run(raw_document, stages))

3. Consensus

Multiple agents answer the same question independently. Their answers are compared to produce a high-confidence result. Essential for safety-critical decisions.

class Consensus:
    def __init__(self, agent_count: int = 3):
        self.client = anthropic.AsyncAnthropic()
        self.sync_client = anthropic.Anthropic()
        self.agent_count = agent_count

    async def run(self, question: str, system: str) -> dict:
        """Run multiple agents on the same question, then vote."""

        async def _call(agent_id: int) -> str:
            response = await self.client.messages.create(
                model="claude-opus-4-6-20260205",
                max_tokens=2048,
                thinking={"type": "adaptive"},
                system=system,
                messages=[{"role": "user", "content": question}],
            )
            return next(b.text for b in response.content if b.type == "text")

        answers = await asyncio.gather(
            *[_call(i) for i in range(self.agent_count)]
        )

        # Use a final agent to adjudicate
        adjudication_prompt = (
            f"You received {self.agent_count} independent answers to the same "
            f"question. Determine the consensus answer.\n\n"
            + "\n\n".join(
                f"**Agent {i+1}:** {a}" for i, a in enumerate(answers)
            )
            + "\n\nProvide the consensus answer. If agents disagree, explain "
            "the disagreement and state which answer has the strongest reasoning."
        )

        verdict = self.sync_client.messages.create(
            model="claude-opus-4-6-20260205",
            max_tokens=4096,
            thinking={"type": "adaptive", "effort": "deep"},
            messages=[{"role": "user", "content": adjudication_prompt}],
        )

        return {
            "individual_answers": list(answers),
            "consensus": next(
                b.text for b in verdict.content if b.type == "text"
            ),
        }

# Usage — safety-critical classification
result = asyncio.run(
    Consensus(agent_count=3).run(
        question="Should this transaction be flagged as potentially fraudulent?",
        system="You are a fraud detection analyst. Respond with FRAUD or LEGITIMATE "
               "followed by your reasoning.",
    )
)

4. Specialist

A router examines the input and dispatches it to the single most qualified agent. Only one agent runs, so cost equals a single-agent call.

class Specialist:
    def __init__(self):
        self.client = anthropic.Anthropic()
        self.specialists = {
            "security": (
                "You are a security specialist. Analyze code for vulnerabilities, "
                "injection risks, authentication issues, and data exposure."
            ),
            "performance": (
                "You are a performance engineer. Analyze code for algorithmic "
                "complexity, memory leaks, unnecessary allocations, and I/O bottlenecks."
            ),
            "database": (
                "You are a database specialist. Analyze queries for N+1 problems, "
                "missing indexes, transaction issues, and schema design flaws."
            ),
            "general": (
                "You are a senior software engineer. Provide a general code review."
            ),
        }

    def route(self, query: str) -> str:
        """Use a cheap model call to classify the query domain."""
        response = self.client.messages.create(
            model="claude-opus-4-6-20260205",
            max_tokens=50,
            thinking={"type": "none"},
            system=(
                "Classify the following query into exactly one category: "
                "security, performance, database, general. "
                "Respond with only the category name."
            ),
            messages=[{"role": "user", "content": query[:500]}],
        )
        category = next(
            b.text for b in response.content if b.type == "text"
        ).strip().lower()
        return category if category in self.specialists else "general"

    def run(self, query: str) -> dict:
        """Route to the right specialist and execute."""
        category = self.route(query)
        system = self.specialists[category]

        response = self.client.messages.create(
            model="claude-opus-4-6-20260205",
            max_tokens=8192,
            thinking={"type": "adaptive", "effort": "deep"},
            system=system,
            messages=[{"role": "user", "content": query}],
        )

        return {
            "routed_to": category,
            "response": next(
                b.text for b in response.content if b.type == "text"
            ),
        }

Choosing the Right Topology

Use this decision tree:

  1. Can subtasks run independently?

    • Yes → Are you looking for one best answer or combining multiple perspectives?
      • One best answer → Consensus
      • Multiple perspectives → Fan-Out/Fan-In
    • No → Pipeline
  2. Does the input domain vary widely?

    • Yes → Specialist (with router)
  3. Is the decision safety-critical?

    • Yes → Consensus (always, regardless of other factors)

Combining Topologies

Real systems often combine patterns. A common production architecture:

graph TD
    Input[Input] --> Router[Specialist Router]
    Router -->|Security query| SEC[Security Pipeline]
    Router -->|Code review| CR[Fan-Out Review]
    Router -->|Classification| CL[Consensus Classifier]

    SEC --> S1[Vulnerability Scan] --> S2[Impact Assessment] --> S3[Remediation]
    CR --> CR1[Style Agent]
    CR --> CR2[Logic Agent]
    CR --> CR3[Perf Agent]
    CR1 --> Merge[Merge Findings]
    CR2 --> Merge
    CR3 --> Merge
    CL --> V1[Agent A]
    CL --> V2[Agent B]
    CL --> V3[Agent C]
    V1 --> Vote[Adjudicate]
    V2 --> Vote
    V3 --> Vote

In the next lesson, you will build a complete multi-agent code review pipeline using the Fan-Out/Fan-In topology.