Lesson 38 of 46 ~25 min
Course progress
0%

Advanced Coordination Patterns

Shared state management, conflict resolution when agents disagree, priority-based merging, and handling partial failures in multi-agent systems.

The previous lessons assumed clean execution: every agent succeeds, outputs parse correctly, and no two agents contradict each other. Production systems are not that polite. This lesson covers what happens when agents disagree, fail partially, or need to share context.

Shared State Without Shared Memory

Agents in a team cannot communicate during execution. But they often need to work from the same facts — a shared configuration, a common knowledge base, or a running accumulator. The solution: externalize state into the coordinator and inject it into each agent’s prompt.

from dataclasses import dataclass, field
import json

@dataclass
class SharedContext:
    """State that every agent in the team receives."""
    project_language: str
    framework: str
    coding_standards: dict
    known_issues: list[str] = field(default_factory=list)
    previous_findings: list[dict] = field(default_factory=list)

    def to_prompt_section(self) -> str:
        return (
            f"## Project Context\n"
            f"- Language: {self.project_language}\n"
            f"- Framework: {self.framework}\n"
            f"- Standards: {json.dumps(self.coding_standards, indent=2)}\n"
            f"- Known issues to ignore: {', '.join(self.known_issues) or 'None'}\n"
            f"- Findings from prior review stages: "
            f"{json.dumps(self.previous_findings) if self.previous_findings else 'None'}\n"
        )

def build_agent_prompt(system_base: str, context: SharedContext) -> str:
    """Inject shared context into an agent's system prompt."""
    return f"{system_base}\n\n{context.to_prompt_section()}"

This pattern is simple but powerful. Each agent sees the same project context, so its findings are calibrated to the actual stack. The coordinator updates SharedContext between pipeline stages if needed.

Conflict Resolution

When two agents disagree — one says the code is safe, another flags a vulnerability — you need a systematic way to resolve the conflict.

Strategy 1: Severity-Based Priority

The most conservative approach: when findings conflict, the higher severity wins.

from dataclasses import dataclass

@dataclass
class ConflictResolution:
    finding_a: dict
    finding_b: dict
    resolution: str
    chosen: dict

SEVERITY_RANK = {"critical": 0, "high": 1, "medium": 2, "low": 3, "info": 4}

def resolve_by_severity(findings: list[dict]) -> list[dict]:
    """When multiple agents flag the same line, keep the highest severity."""
    by_line: dict[int, list[dict]] = {}
    for f in findings:
        line = f.get("line")
        if line is not None:
            by_line.setdefault(line, []).append(f)

    resolved = []
    for line, group in by_line.items():
        if len(group) == 1:
            resolved.append(group[0])
        else:
            winner = min(group, key=lambda f: SEVERITY_RANK.get(f["severity"], 99))
            resolved.append(winner)

    # Include findings without a line number
    resolved.extend(f for f in findings if f.get("line") is None)
    return resolved

Strategy 2: LLM Adjudication

For complex conflicts, use a separate Claude call to evaluate both sides:

import anthropic

def adjudicate_conflict(finding_a: dict, finding_b: dict,
                        code_snippet: str) -> dict:
    """Use Claude to resolve a conflict between two agent findings."""
    client = anthropic.Anthropic()

    prompt = (
        "Two review agents produced conflicting findings for the same code.\n\n"
        f"**Code:**\n{code_snippet}\n\n"
        f"**Agent A ({finding_a['agent_id']}) says:**\n"
        f"Severity: {finding_a['severity']}\n"
        f"{finding_a['description']}\n\n"
        f"**Agent B ({finding_b['agent_id']}) says:**\n"
        f"Severity: {finding_b['severity']}\n"
        f"{finding_b['description']}\n\n"
        "Determine which agent is correct, or if both have valid points.\n"
        'Respond with JSON: {"winner": "A|B|both", "severity": "<final>", "reasoning": "<explanation>"}'
    )

    response = client.messages.create(
        model="claude-opus-4-6-20260205",
        max_tokens=2048,
        thinking={"type": "adaptive", "effort": "deep"},
        messages=[{"role": "user", "content": prompt}],
    )
    text = next(b.text for b in response.content if b.type == "text")
    return json.loads(text)

Strategy 3: Weighted Voting

Assign each agent a trust weight based on its specialization and the finding category:

AGENT_WEIGHTS = {
    # (agent_id, finding_category) -> weight
    ("security", "security"): 1.0,
    ("security", "performance"): 0.3,
    ("security", "maintainability"): 0.2,
    ("performance", "security"): 0.3,
    ("performance", "performance"): 1.0,
    ("performance", "maintainability"): 0.4,
    ("maintainability", "security"): 0.2,
    ("maintainability", "performance"): 0.4,
    ("maintainability", "maintainability"): 1.0,
}

def weighted_severity(findings: list[dict]) -> str:
    """Compute weighted severity when multiple agents flag the same issue."""
    severity_scores = {"critical": 4, "high": 3, "medium": 2, "low": 1, "info": 0}

    total_weight = 0.0
    weighted_score = 0.0

    for f in findings:
        weight = AGENT_WEIGHTS.get(
            (f["agent_id"], f["category"]), 0.5
        )
        score = severity_scores.get(f["severity"], 0)
        weighted_score += weight * score
        total_weight += weight

    avg_score = weighted_score / total_weight if total_weight > 0 else 0
    score_to_severity = {4: "critical", 3: "high", 2: "medium", 1: "low", 0: "info"}
    return score_to_severity.get(round(avg_score), "medium")

Handling Partial Failures

When one agent fails, the team should still produce useful results from the agents that succeeded.

import asyncio
import logging
from dataclasses import dataclass

logger = logging.getLogger(__name__)

@dataclass
class TeamResult:
    successful: dict[str, str]      # agent_id -> output
    failed: dict[str, str]          # agent_id -> error message
    degraded: bool                  # True if any agent failed

    @property
    def is_complete(self) -> bool:
        return len(self.failed) == 0

class ResilientTeam:
    def __init__(self, client):
        self.client = client

    async def run_with_fallback(
        self,
        subtasks: dict[str, dict],
        timeout: float = 120.0,
        max_retries: int = 2,
    ) -> TeamResult:
        """Run agents with per-agent timeout, retry, and graceful degradation."""

        async def _run_one(agent_id: str, config: dict) -> tuple[str, str | None, str | None]:
            last_error = None
            for attempt in range(max_retries + 1):
                try:
                    response = await asyncio.wait_for(
                        self.client.messages.create(
                            model=config["model"],
                            max_tokens=config["max_tokens"],
                            system=config["system"],
                            thinking=config.get("thinking", {"type": "adaptive"}),
                            messages=[{"role": "user", "content": config["prompt"]}],
                        ),
                        timeout=timeout,
                    )
                    text = next(
                        b.text for b in response.content if b.type == "text"
                    )
                    return agent_id, text, None

                except asyncio.TimeoutError:
                    last_error = f"Timeout after {timeout}s (attempt {attempt + 1})"
                    logger.warning(f"Agent {agent_id}: {last_error}")
                except Exception as e:
                    last_error = f"{type(e).__name__}: {e} (attempt {attempt + 1})"
                    logger.warning(f"Agent {agent_id}: {last_error}")

                # Exponential backoff between retries
                if attempt < max_retries:
                    await asyncio.sleep(2 ** attempt)

            return agent_id, None, last_error

        results = await asyncio.gather(
            *[_run_one(aid, cfg) for aid, cfg in subtasks.items()]
        )

        successful = {}
        failed = {}
        for agent_id, output, error in results:
            if error:
                failed[agent_id] = error
            else:
                successful[agent_id] = output

        return TeamResult(
            successful=successful,
            failed=failed,
            degraded=len(failed) > 0,
        )

Priority-Based Merging

When agents produce overlapping findings, use a priority system that considers both severity and agent expertise:

from dataclasses import dataclass

@dataclass
class PrioritizedFinding:
    finding: dict
    priority_score: float

def priority_merge(all_findings: list[dict],
                   agent_priorities: dict[str, float]) -> list[dict]:
    """
    Merge findings with priority-based deduplication.

    When two findings overlap (same line, similar title), keep the one
    from the higher-priority agent for that category.
    """
    SEVERITY_WEIGHT = {"critical": 10, "high": 7, "medium": 4, "low": 2, "info": 1}

    scored = []
    for f in all_findings:
        agent_weight = agent_priorities.get(f["agent_id"], 0.5)
        severity_weight = SEVERITY_WEIGHT.get(f["severity"], 1)
        score = agent_weight * severity_weight
        scored.append(PrioritizedFinding(finding=f, priority_score=score))

    # Group by dedup key (line + normalized title)
    groups: dict[str, list[PrioritizedFinding]] = {}
    for pf in scored:
        key = f"{pf.finding.get('line', 'none')}:{pf.finding['title'].lower().strip()}"
        groups.setdefault(key, []).append(pf)

    # Keep highest priority from each group
    merged = []
    for group in groups.values():
        winner = max(group, key=lambda pf: pf.priority_score)
        merged.append(winner.finding)

    # Sort by priority score descending
    scored_map = {id(pf.finding): pf.priority_score for pf in scored}
    merged.sort(key=lambda f: scored_map.get(id(f), 0), reverse=True)

    return merged

Retry Strategies

Different failure types need different retry approaches:

Failure TypeRetry StrategyBackoffMax Retries
Rate limit (429)Exponential backoff with jitter2ˢ + random5
TimeoutIncrease timeout, reduce thinking effortLinear2
Parse errorRetry with stricter output instructionsNone1
Server error (500)Exponential backoff3
Overloaded (529)Back off significantly3
import asyncio
import random

async def retry_with_strategy(coro_factory, failure_type: str):
    """Retry a coroutine with type-appropriate strategy."""
    strategies = {
        "rate_limit":  {"max_retries": 5, "base_delay": 2.0, "backoff": "exponential_jitter"},
        "timeout":     {"max_retries": 2, "base_delay": 1.0, "backoff": "linear"},
        "parse_error": {"max_retries": 1, "base_delay": 0.0, "backoff": "none"},
        "server_error":{"max_retries": 3, "base_delay": 2.0, "backoff": "exponential"},
        "overloaded":  {"max_retries": 3, "base_delay": 5.0, "backoff": "exponential"},
    }

    strategy = strategies.get(failure_type, strategies["server_error"])

    for attempt in range(strategy["max_retries"] + 1):
        try:
            return await coro_factory()
        except Exception as e:
            if attempt == strategy["max_retries"]:
                raise

            if strategy["backoff"] == "exponential_jitter":
                delay = strategy["base_delay"] ** (attempt + 1) + random.uniform(0, 1)
            elif strategy["backoff"] == "exponential":
                delay = strategy["base_delay"] ** (attempt + 1)
            elif strategy["backoff"] == "linear":
                delay = strategy["base_delay"] * (attempt + 1)
            else:
                delay = 0

            await asyncio.sleep(delay)

Putting It All Together

A coordinator that handles shared context, conflict resolution, partial failures, and priority merging:

class AdvancedCoordinator:
    def __init__(self, client):
        self.client = client
        self.team = ResilientTeam(client)

    async def coordinated_review(self, code: str,
                                 context: SharedContext) -> dict:
        subtasks = {}
        for agent_id, system_base in AGENT_CONFIGS.items():
            subtasks[agent_id] = {
                "model": "claude-opus-4-6-20260205",
                "max_tokens": 8192,
                "system": build_agent_prompt(system_base, context),
                "thinking": {"type": "adaptive", "effort": "deep"},
                "prompt": code,
            }

        result = await self.team.run_with_fallback(subtasks)

        # Log degraded state
        if result.degraded:
            for agent_id, error in result.failed.items():
                logger.error(f"Agent {agent_id} failed: {error}")

        # Parse and merge findings from successful agents
        all_findings = []
        for agent_id, output in result.successful.items():
            try:
                parsed = json.loads(output)
                for f in parsed:
                    f["agent_id"] = agent_id
                    f["category"] = agent_id
                all_findings.extend(parsed)
            except json.JSONDecodeError:
                logger.warning(f"Agent {agent_id} returned non-JSON output")

        # Priority merge with conflict resolution
        agent_priorities = {"security": 1.0, "performance": 0.8, "maintainability": 0.7}
        merged = priority_merge(all_findings, agent_priorities)

        return {
            "findings": merged,
            "agents_succeeded": list(result.successful.keys()),
            "agents_failed": list(result.failed.keys()),
            "degraded": result.degraded,
        }

In the next lesson, you will learn how to deploy agent teams in production with monitoring, cost tracking, and observability.