Lesson 26 of 46 ~30 min
Course progress
0%

Multi-Agent Code Review Pipeline

Build a complete multi-agent code review system — security, performance, and maintainability agents running in parallel with a coordinator that merges and deduplicates findings.

This lesson builds a production-grade multi-agent code review pipeline from scratch. Three specialist agents analyze the same code concurrently, and a coordinator merges their findings into a single, deduplicated report sorted by severity.

Architecture

graph TD
    Input[Source Code] --> Coord[Coordinator]
    Coord -->|Fan-Out| SA[Security Agent]
    Coord -->|Fan-Out| PA[Performance Agent]
    Coord -->|Fan-Out| MA[Maintainability Agent]
    SA -->|JSON findings| Collect[Result Collector]
    PA -->|JSON findings| Collect
    MA -->|JSON findings| Collect
    Collect --> Dedup[Deduplication]
    Dedup --> Rank[Severity Ranking]
    Rank --> Report[Final Report]

Data Model

Define a shared finding format that all agents must produce. This is the contract that makes merging possible.

from dataclasses import dataclass, field
from enum import Enum
import json

class Severity(str, Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    INFO = "info"

class Category(str, Enum):
    SECURITY = "security"
    PERFORMANCE = "performance"
    MAINTAINABILITY = "maintainability"

@dataclass
class Finding:
    category: Category
    severity: Severity
    line: int | None
    title: str
    description: str
    suggestion: str
    agent_id: str

    def key(self) -> str:
        """Deduplication key — same issue on the same line."""
        return f"{self.line}:{self.title.lower().strip()}"

@dataclass
class ReviewReport:
    file_path: str
    findings: list[Finding] = field(default_factory=list)
    agent_stats: dict = field(default_factory=dict)

    def add_findings(self, new_findings: list[Finding]) -> None:
        seen_keys = {f.key() for f in self.findings}
        for f in new_findings:
            if f.key() not in seen_keys:
                self.findings.append(f)
                seen_keys.add(f.key())

    def sorted_findings(self) -> list[Finding]:
        severity_order = {
            Severity.CRITICAL: 0, Severity.HIGH: 1,
            Severity.MEDIUM: 2, Severity.LOW: 3, Severity.INFO: 4,
        }
        return sorted(self.findings, key=lambda f: severity_order[f.severity])

Agent System Prompts

Each agent gets a specialized system prompt and a strict output format contract:

OUTPUT_FORMAT_INSTRUCTION = """
Return your findings as a JSON array. Each finding must have:
- "severity": one of "critical", "high", "medium", "low", "info"
- "line": line number (integer) or null if not line-specific
- "title": short title (max 80 chars)
- "description": detailed explanation
- "suggestion": concrete fix recommendation

Return ONLY the JSON array, no other text. If you find no issues, return [].
"""

SECURITY_SYSTEM = f"""You are a security auditor performing a code review.
Focus exclusively on security vulnerabilities:
- Injection attacks (SQL, command, XSS, SSTI)
- Authentication and authorization flaws
- Sensitive data exposure (hardcoded secrets, PII leaks)
- Insecure deserialization
- Missing input validation
- Cryptographic weaknesses
- SSRF and path traversal
{OUTPUT_FORMAT_INSTRUCTION}"""

PERFORMANCE_SYSTEM = f"""You are a performance engineer performing a code review.
Focus exclusively on performance issues:
- Algorithmic complexity (O(n²) or worse where O(n) is possible)
- Unnecessary memory allocations and copies
- N+1 query patterns
- Missing caching opportunities
- Blocking I/O in async contexts
- Unbounded data structures
- Inefficient string concatenation in loops
{OUTPUT_FORMAT_INSTRUCTION}"""

MAINTAINABILITY_SYSTEM = f"""You are a senior engineer reviewing code for long-term maintainability.
Focus exclusively on maintainability issues:
- Functions longer than 50 lines or with cyclomatic complexity > 10
- Missing error handling or bare except clauses
- Tight coupling between modules
- Magic numbers and hardcoded values
- Missing type hints on public interfaces
- Dead code and unused imports
- Naming that obscures intent
{OUTPUT_FORMAT_INSTRUCTION}"""

The Review Engine

import asyncio
import time
import anthropic

class CodeReviewPipeline:
    def __init__(self):
        self.client = anthropic.AsyncAnthropic()
        self.sync_client = anthropic.Anthropic()
        self.agents = {
            "security": SECURITY_SYSTEM,
            "performance": PERFORMANCE_SYSTEM,
            "maintainability": MAINTAINABILITY_SYSTEM,
        }

    async def _run_agent(self, agent_id: str, system: str,
                         code: str) -> tuple[str, list[Finding], dict]:
        """Run a single review agent and parse its findings."""
        start = time.monotonic()

        response = await self.client.messages.create(
            model="claude-opus-4-6-20260205",
            max_tokens=8192,
            thinking={"type": "adaptive", "effort": "deep"},
            system=system,
            messages=[{"role": "user", "content": code}],
        )

        elapsed_ms = int((time.monotonic() - start) * 1000)
        text = next(b.text for b in response.content if b.type == "text")

        # Parse JSON findings
        findings = []
        try:
            raw_findings = json.loads(text)
            for rf in raw_findings:
                findings.append(Finding(
                    category=Category(agent_id),
                    severity=Severity(rf["severity"]),
                    line=rf.get("line"),
                    title=rf["title"],
                    description=rf["description"],
                    suggestion=rf["suggestion"],
                    agent_id=agent_id,
                ))
        except (json.JSONDecodeError, KeyError, ValueError) as e:
            findings.append(Finding(
                category=Category(agent_id),
                severity=Severity.INFO,
                line=None,
                title=f"Agent {agent_id} returned unparseable output",
                description=f"Parse error: {e}\nRaw output: {text[:500]}",
                suggestion="Review agent output manually",
                agent_id=agent_id,
            ))

        stats = {
            "latency_ms": elapsed_ms,
            "input_tokens": response.usage.input_tokens,
            "output_tokens": response.usage.output_tokens,
            "findings_count": len(findings),
        }

        return agent_id, findings, stats

    async def review(self, file_path: str, code: str) -> ReviewReport:
        """Run all agents in parallel and merge results."""
        report = ReviewReport(file_path=file_path)

        tasks = [
            self._run_agent(agent_id, system, code)
            for agent_id, system in self.agents.items()
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        for result in results:
            if isinstance(result, Exception):
                report.findings.append(Finding(
                    category=Category.MAINTAINABILITY,
                    severity=Severity.INFO,
                    line=None,
                    title="Agent execution failed",
                    description=str(result),
                    suggestion="Retry the review",
                    agent_id="coordinator",
                ))
                continue

            agent_id, findings, stats = result
            report.add_findings(findings)
            report.agent_stats[agent_id] = stats

        return report

Running a Review

async def main():
    pipeline = CodeReviewPipeline()

    code = '''
    import os
    import subprocess

    def process_user_input(user_data):
        query = f"SELECT * FROM users WHERE name = '{user_data}'"
        result = subprocess.run(f"echo {user_data}", shell=True, capture_output=True)

        items = []
        for i in range(len(user_data)):
            for j in range(len(user_data)):
                items.append(user_data[i] + user_data[j])

        password = "admin123"

        try:
            db.execute(query)
        except:
            pass

        return items
    '''

    report = await pipeline.review("app/handlers.py", code)

    # Print findings sorted by severity
    for f in report.sorted_findings():
        icon = {"critical": "🔴", "high": "🟠", "medium": "🟡",
                "low": "🔵", "info": "⚪"}.get(f.severity.value, "⚪")
        print(f"{icon} [{f.severity.value.upper()}] {f.title}")
        print(f"   Category: {f.category.value} | Line: {f.line or 'N/A'}")
        print(f"   {f.description}")
        print(f"   💡 {f.suggestion}")
        print()

    # Print agent stats
    print("--- Agent Statistics ---")
    for agent_id, stats in report.agent_stats.items():
        print(f"{agent_id}: {stats['findings_count']} findings, "
              f"{stats['latency_ms']}ms, "
              f"{stats['input_tokens'] + stats['output_tokens']} tokens")

asyncio.run(main())

Expected Output

The pipeline should catch at least these issues from the sample code:

AgentSeverityFinding
SecurityCriticalSQL injection via string interpolation
SecurityCriticalCommand injection via subprocess with shell=True
SecurityHighHardcoded password
PerformanceMediumO(n²) nested loop creating cartesian product
MaintainabilityMediumBare except: pass swallows all errors
MaintainabilityLowUnused import os

Scaling to Multiple Files

Wrap the pipeline in a file-level parallel executor for reviewing entire projects:

async def review_project(file_paths: list[str],
                         max_concurrent: int = 5) -> list[ReviewReport]:
    """Review multiple files with concurrency control."""
    pipeline = CodeReviewPipeline()
    semaphore = asyncio.Semaphore(max_concurrent)

    async def _review_file(path: str) -> ReviewReport:
        async with semaphore:
            with open(path) as f:
                code = f.read()
            return await pipeline.review(path, code)

    reports = await asyncio.gather(
        *[_review_file(p) for p in file_paths],
        return_exceptions=True,
    )

    return [r for r in reports if isinstance(r, ReviewReport)]

Note the semaphore: each file fans out to three agents, so reviewing 10 files simultaneously means 30 concurrent API calls. The semaphore caps file-level concurrency to stay within rate limits.

Cost Analysis

For a 500-line file reviewed by three agents with deep thinking:

ComponentInput TokensOutput TokensThinking TokensCost
Security agent~2,000~1,500~15,000~$0.12
Performance agent~2,000~1,000~12,000~$0.10
Maintainability agent~2,000~1,200~10,000~$0.09
Total per file~6,000~3,700~37,000~$0.31

For a 100-file project: approximately $31.00 — comparable to a single hour of senior engineer time.

In the next lesson, you will learn advanced coordination patterns for handling conflicts, partial failures, and shared state between agents.