This lesson builds a production-grade multi-agent code review pipeline from scratch. Three specialist agents analyze the same code concurrently, and a coordinator merges their findings into a single, deduplicated report sorted by severity.
Architecture
graph TD
Input[Source Code] --> Coord[Coordinator]
Coord -->|Fan-Out| SA[Security Agent]
Coord -->|Fan-Out| PA[Performance Agent]
Coord -->|Fan-Out| MA[Maintainability Agent]
SA -->|JSON findings| Collect[Result Collector]
PA -->|JSON findings| Collect
MA -->|JSON findings| Collect
Collect --> Dedup[Deduplication]
Dedup --> Rank[Severity Ranking]
Rank --> Report[Final Report]
Data Model
Define a shared finding format that all agents must produce. This is the contract that makes merging possible.
from dataclasses import dataclass, field
from enum import Enum
import json
class Severity(str, Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
class Category(str, Enum):
SECURITY = "security"
PERFORMANCE = "performance"
MAINTAINABILITY = "maintainability"
@dataclass
class Finding:
category: Category
severity: Severity
line: int | None
title: str
description: str
suggestion: str
agent_id: str
def key(self) -> str:
"""Deduplication key — same issue on the same line."""
return f"{self.line}:{self.title.lower().strip()}"
@dataclass
class ReviewReport:
file_path: str
findings: list[Finding] = field(default_factory=list)
agent_stats: dict = field(default_factory=dict)
def add_findings(self, new_findings: list[Finding]) -> None:
seen_keys = {f.key() for f in self.findings}
for f in new_findings:
if f.key() not in seen_keys:
self.findings.append(f)
seen_keys.add(f.key())
def sorted_findings(self) -> list[Finding]:
severity_order = {
Severity.CRITICAL: 0, Severity.HIGH: 1,
Severity.MEDIUM: 2, Severity.LOW: 3, Severity.INFO: 4,
}
return sorted(self.findings, key=lambda f: severity_order[f.severity])
Agent System Prompts
Each agent gets a specialized system prompt and a strict output format contract:
OUTPUT_FORMAT_INSTRUCTION = """
Return your findings as a JSON array. Each finding must have:
- "severity": one of "critical", "high", "medium", "low", "info"
- "line": line number (integer) or null if not line-specific
- "title": short title (max 80 chars)
- "description": detailed explanation
- "suggestion": concrete fix recommendation
Return ONLY the JSON array, no other text. If you find no issues, return [].
"""
SECURITY_SYSTEM = f"""You are a security auditor performing a code review.
Focus exclusively on security vulnerabilities:
- Injection attacks (SQL, command, XSS, SSTI)
- Authentication and authorization flaws
- Sensitive data exposure (hardcoded secrets, PII leaks)
- Insecure deserialization
- Missing input validation
- Cryptographic weaknesses
- SSRF and path traversal
{OUTPUT_FORMAT_INSTRUCTION}"""
PERFORMANCE_SYSTEM = f"""You are a performance engineer performing a code review.
Focus exclusively on performance issues:
- Algorithmic complexity (O(n²) or worse where O(n) is possible)
- Unnecessary memory allocations and copies
- N+1 query patterns
- Missing caching opportunities
- Blocking I/O in async contexts
- Unbounded data structures
- Inefficient string concatenation in loops
{OUTPUT_FORMAT_INSTRUCTION}"""
MAINTAINABILITY_SYSTEM = f"""You are a senior engineer reviewing code for long-term maintainability.
Focus exclusively on maintainability issues:
- Functions longer than 50 lines or with cyclomatic complexity > 10
- Missing error handling or bare except clauses
- Tight coupling between modules
- Magic numbers and hardcoded values
- Missing type hints on public interfaces
- Dead code and unused imports
- Naming that obscures intent
{OUTPUT_FORMAT_INSTRUCTION}"""
The Review Engine
import asyncio
import time
import anthropic
class CodeReviewPipeline:
def __init__(self):
self.client = anthropic.AsyncAnthropic()
self.sync_client = anthropic.Anthropic()
self.agents = {
"security": SECURITY_SYSTEM,
"performance": PERFORMANCE_SYSTEM,
"maintainability": MAINTAINABILITY_SYSTEM,
}
async def _run_agent(self, agent_id: str, system: str,
code: str) -> tuple[str, list[Finding], dict]:
"""Run a single review agent and parse its findings."""
start = time.monotonic()
response = await self.client.messages.create(
model="claude-opus-4-6-20260205",
max_tokens=8192,
thinking={"type": "adaptive", "effort": "deep"},
system=system,
messages=[{"role": "user", "content": code}],
)
elapsed_ms = int((time.monotonic() - start) * 1000)
text = next(b.text for b in response.content if b.type == "text")
# Parse JSON findings
findings = []
try:
raw_findings = json.loads(text)
for rf in raw_findings:
findings.append(Finding(
category=Category(agent_id),
severity=Severity(rf["severity"]),
line=rf.get("line"),
title=rf["title"],
description=rf["description"],
suggestion=rf["suggestion"],
agent_id=agent_id,
))
except (json.JSONDecodeError, KeyError, ValueError) as e:
findings.append(Finding(
category=Category(agent_id),
severity=Severity.INFO,
line=None,
title=f"Agent {agent_id} returned unparseable output",
description=f"Parse error: {e}\nRaw output: {text[:500]}",
suggestion="Review agent output manually",
agent_id=agent_id,
))
stats = {
"latency_ms": elapsed_ms,
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
"findings_count": len(findings),
}
return agent_id, findings, stats
async def review(self, file_path: str, code: str) -> ReviewReport:
"""Run all agents in parallel and merge results."""
report = ReviewReport(file_path=file_path)
tasks = [
self._run_agent(agent_id, system, code)
for agent_id, system in self.agents.items()
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
report.findings.append(Finding(
category=Category.MAINTAINABILITY,
severity=Severity.INFO,
line=None,
title="Agent execution failed",
description=str(result),
suggestion="Retry the review",
agent_id="coordinator",
))
continue
agent_id, findings, stats = result
report.add_findings(findings)
report.agent_stats[agent_id] = stats
return report
Running a Review
async def main():
pipeline = CodeReviewPipeline()
code = '''
import os
import subprocess
def process_user_input(user_data):
query = f"SELECT * FROM users WHERE name = '{user_data}'"
result = subprocess.run(f"echo {user_data}", shell=True, capture_output=True)
items = []
for i in range(len(user_data)):
for j in range(len(user_data)):
items.append(user_data[i] + user_data[j])
password = "admin123"
try:
db.execute(query)
except:
pass
return items
'''
report = await pipeline.review("app/handlers.py", code)
# Print findings sorted by severity
for f in report.sorted_findings():
icon = {"critical": "🔴", "high": "🟠", "medium": "🟡",
"low": "🔵", "info": "⚪"}.get(f.severity.value, "⚪")
print(f"{icon} [{f.severity.value.upper()}] {f.title}")
print(f" Category: {f.category.value} | Line: {f.line or 'N/A'}")
print(f" {f.description}")
print(f" 💡 {f.suggestion}")
print()
# Print agent stats
print("--- Agent Statistics ---")
for agent_id, stats in report.agent_stats.items():
print(f"{agent_id}: {stats['findings_count']} findings, "
f"{stats['latency_ms']}ms, "
f"{stats['input_tokens'] + stats['output_tokens']} tokens")
asyncio.run(main())
Expected Output
The pipeline should catch at least these issues from the sample code:
| Agent | Severity | Finding |
|---|---|---|
| Security | Critical | SQL injection via string interpolation |
| Security | Critical | Command injection via subprocess with shell=True |
| Security | High | Hardcoded password |
| Performance | Medium | O(n²) nested loop creating cartesian product |
| Maintainability | Medium | Bare except: pass swallows all errors |
| Maintainability | Low | Unused import os |
Scaling to Multiple Files
Wrap the pipeline in a file-level parallel executor for reviewing entire projects:
async def review_project(file_paths: list[str],
max_concurrent: int = 5) -> list[ReviewReport]:
"""Review multiple files with concurrency control."""
pipeline = CodeReviewPipeline()
semaphore = asyncio.Semaphore(max_concurrent)
async def _review_file(path: str) -> ReviewReport:
async with semaphore:
with open(path) as f:
code = f.read()
return await pipeline.review(path, code)
reports = await asyncio.gather(
*[_review_file(p) for p in file_paths],
return_exceptions=True,
)
return [r for r in reports if isinstance(r, ReviewReport)]
Note the semaphore: each file fans out to three agents, so reviewing 10 files simultaneously means 30 concurrent API calls. The semaphore caps file-level concurrency to stay within rate limits.
Cost Analysis
For a 500-line file reviewed by three agents with deep thinking:
| Component | Input Tokens | Output Tokens | Thinking Tokens | Cost |
|---|---|---|---|---|
| Security agent | ~2,000 | ~1,500 | ~15,000 | ~$0.12 |
| Performance agent | ~2,000 | ~1,000 | ~12,000 | ~$0.10 |
| Maintainability agent | ~2,000 | ~1,200 | ~10,000 | ~$0.09 |
| Total per file | ~6,000 | ~3,700 | ~37,000 | ~$0.31 |
For a 100-file project: approximately $31.00 — comparable to a single hour of senior engineer time.
In the next lesson, you will learn advanced coordination patterns for handling conflicts, partial failures, and shared state between agents.