The previous lessons assumed clean execution: every agent succeeds, outputs parse correctly, and no two agents contradict each other. Production systems are not that polite. This lesson covers what happens when agents disagree, fail partially, or need to share context.
Shared State Without Shared Memory
Agents in a team cannot communicate during execution. But they often need to work from the same facts — a shared configuration, a common knowledge base, or a running accumulator. The solution: externalize state into the coordinator and inject it into each agent’s prompt.
from dataclasses import dataclass, field
import json
@dataclass
class SharedContext:
"""State that every agent in the team receives."""
project_language: str
framework: str
coding_standards: dict
known_issues: list[str] = field(default_factory=list)
previous_findings: list[dict] = field(default_factory=list)
def to_prompt_section(self) -> str:
return (
f"## Project Context\n"
f"- Language: {self.project_language}\n"
f"- Framework: {self.framework}\n"
f"- Standards: {json.dumps(self.coding_standards, indent=2)}\n"
f"- Known issues to ignore: {', '.join(self.known_issues) or 'None'}\n"
f"- Findings from prior review stages: "
f"{json.dumps(self.previous_findings) if self.previous_findings else 'None'}\n"
)
def build_agent_prompt(system_base: str, context: SharedContext) -> str:
"""Inject shared context into an agent's system prompt."""
return f"{system_base}\n\n{context.to_prompt_section()}"
This pattern is simple but powerful. Each agent sees the same project context, so its findings are calibrated to the actual stack. The coordinator updates SharedContext between pipeline stages if needed.
Conflict Resolution
When two agents disagree — one says the code is safe, another flags a vulnerability — you need a systematic way to resolve the conflict.
Strategy 1: Severity-Based Priority
The most conservative approach: when findings conflict, the higher severity wins.
from dataclasses import dataclass
@dataclass
class ConflictResolution:
finding_a: dict
finding_b: dict
resolution: str
chosen: dict
SEVERITY_RANK = {"critical": 0, "high": 1, "medium": 2, "low": 3, "info": 4}
def resolve_by_severity(findings: list[dict]) -> list[dict]:
"""When multiple agents flag the same line, keep the highest severity."""
by_line: dict[int, list[dict]] = {}
for f in findings:
line = f.get("line")
if line is not None:
by_line.setdefault(line, []).append(f)
resolved = []
for line, group in by_line.items():
if len(group) == 1:
resolved.append(group[0])
else:
winner = min(group, key=lambda f: SEVERITY_RANK.get(f["severity"], 99))
resolved.append(winner)
# Include findings without a line number
resolved.extend(f for f in findings if f.get("line") is None)
return resolved
Strategy 2: LLM Adjudication
For complex conflicts, use a separate Claude call to evaluate both sides:
import anthropic
def adjudicate_conflict(finding_a: dict, finding_b: dict,
code_snippet: str) -> dict:
"""Use Claude to resolve a conflict between two agent findings."""
client = anthropic.Anthropic()
prompt = (
"Two review agents produced conflicting findings for the same code.\n\n"
f"**Code:**\n{code_snippet}\n\n"
f"**Agent A ({finding_a['agent_id']}) says:**\n"
f"Severity: {finding_a['severity']}\n"
f"{finding_a['description']}\n\n"
f"**Agent B ({finding_b['agent_id']}) says:**\n"
f"Severity: {finding_b['severity']}\n"
f"{finding_b['description']}\n\n"
"Determine which agent is correct, or if both have valid points.\n"
'Respond with JSON: {"winner": "A|B|both", "severity": "<final>", "reasoning": "<explanation>"}'
)
response = client.messages.create(
model="claude-opus-4-6-20260205",
max_tokens=2048,
thinking={"type": "adaptive", "effort": "deep"},
messages=[{"role": "user", "content": prompt}],
)
text = next(b.text for b in response.content if b.type == "text")
return json.loads(text)
Strategy 3: Weighted Voting
Assign each agent a trust weight based on its specialization and the finding category:
AGENT_WEIGHTS = {
# (agent_id, finding_category) -> weight
("security", "security"): 1.0,
("security", "performance"): 0.3,
("security", "maintainability"): 0.2,
("performance", "security"): 0.3,
("performance", "performance"): 1.0,
("performance", "maintainability"): 0.4,
("maintainability", "security"): 0.2,
("maintainability", "performance"): 0.4,
("maintainability", "maintainability"): 1.0,
}
def weighted_severity(findings: list[dict]) -> str:
"""Compute weighted severity when multiple agents flag the same issue."""
severity_scores = {"critical": 4, "high": 3, "medium": 2, "low": 1, "info": 0}
total_weight = 0.0
weighted_score = 0.0
for f in findings:
weight = AGENT_WEIGHTS.get(
(f["agent_id"], f["category"]), 0.5
)
score = severity_scores.get(f["severity"], 0)
weighted_score += weight * score
total_weight += weight
avg_score = weighted_score / total_weight if total_weight > 0 else 0
score_to_severity = {4: "critical", 3: "high", 2: "medium", 1: "low", 0: "info"}
return score_to_severity.get(round(avg_score), "medium")
Handling Partial Failures
When one agent fails, the team should still produce useful results from the agents that succeeded.
import asyncio
import logging
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class TeamResult:
successful: dict[str, str] # agent_id -> output
failed: dict[str, str] # agent_id -> error message
degraded: bool # True if any agent failed
@property
def is_complete(self) -> bool:
return len(self.failed) == 0
class ResilientTeam:
def __init__(self, client):
self.client = client
async def run_with_fallback(
self,
subtasks: dict[str, dict],
timeout: float = 120.0,
max_retries: int = 2,
) -> TeamResult:
"""Run agents with per-agent timeout, retry, and graceful degradation."""
async def _run_one(agent_id: str, config: dict) -> tuple[str, str | None, str | None]:
last_error = None
for attempt in range(max_retries + 1):
try:
response = await asyncio.wait_for(
self.client.messages.create(
model=config["model"],
max_tokens=config["max_tokens"],
system=config["system"],
thinking=config.get("thinking", {"type": "adaptive"}),
messages=[{"role": "user", "content": config["prompt"]}],
),
timeout=timeout,
)
text = next(
b.text for b in response.content if b.type == "text"
)
return agent_id, text, None
except asyncio.TimeoutError:
last_error = f"Timeout after {timeout}s (attempt {attempt + 1})"
logger.warning(f"Agent {agent_id}: {last_error}")
except Exception as e:
last_error = f"{type(e).__name__}: {e} (attempt {attempt + 1})"
logger.warning(f"Agent {agent_id}: {last_error}")
# Exponential backoff between retries
if attempt < max_retries:
await asyncio.sleep(2 ** attempt)
return agent_id, None, last_error
results = await asyncio.gather(
*[_run_one(aid, cfg) for aid, cfg in subtasks.items()]
)
successful = {}
failed = {}
for agent_id, output, error in results:
if error:
failed[agent_id] = error
else:
successful[agent_id] = output
return TeamResult(
successful=successful,
failed=failed,
degraded=len(failed) > 0,
)
Priority-Based Merging
When agents produce overlapping findings, use a priority system that considers both severity and agent expertise:
from dataclasses import dataclass
@dataclass
class PrioritizedFinding:
finding: dict
priority_score: float
def priority_merge(all_findings: list[dict],
agent_priorities: dict[str, float]) -> list[dict]:
"""
Merge findings with priority-based deduplication.
When two findings overlap (same line, similar title), keep the one
from the higher-priority agent for that category.
"""
SEVERITY_WEIGHT = {"critical": 10, "high": 7, "medium": 4, "low": 2, "info": 1}
scored = []
for f in all_findings:
agent_weight = agent_priorities.get(f["agent_id"], 0.5)
severity_weight = SEVERITY_WEIGHT.get(f["severity"], 1)
score = agent_weight * severity_weight
scored.append(PrioritizedFinding(finding=f, priority_score=score))
# Group by dedup key (line + normalized title)
groups: dict[str, list[PrioritizedFinding]] = {}
for pf in scored:
key = f"{pf.finding.get('line', 'none')}:{pf.finding['title'].lower().strip()}"
groups.setdefault(key, []).append(pf)
# Keep highest priority from each group
merged = []
for group in groups.values():
winner = max(group, key=lambda pf: pf.priority_score)
merged.append(winner.finding)
# Sort by priority score descending
scored_map = {id(pf.finding): pf.priority_score for pf in scored}
merged.sort(key=lambda f: scored_map.get(id(f), 0), reverse=True)
return merged
Retry Strategies
Different failure types need different retry approaches:
| Failure Type | Retry Strategy | Backoff | Max Retries |
|---|---|---|---|
| Rate limit (429) | Exponential backoff with jitter | 2ˢ + random | 5 |
| Timeout | Increase timeout, reduce thinking effort | Linear | 2 |
| Parse error | Retry with stricter output instructions | None | 1 |
| Server error (500) | Exponential backoff | 2ˢ | 3 |
| Overloaded (529) | Back off significantly | 5ˢ | 3 |
import asyncio
import random
async def retry_with_strategy(coro_factory, failure_type: str):
"""Retry a coroutine with type-appropriate strategy."""
strategies = {
"rate_limit": {"max_retries": 5, "base_delay": 2.0, "backoff": "exponential_jitter"},
"timeout": {"max_retries": 2, "base_delay": 1.0, "backoff": "linear"},
"parse_error": {"max_retries": 1, "base_delay": 0.0, "backoff": "none"},
"server_error":{"max_retries": 3, "base_delay": 2.0, "backoff": "exponential"},
"overloaded": {"max_retries": 3, "base_delay": 5.0, "backoff": "exponential"},
}
strategy = strategies.get(failure_type, strategies["server_error"])
for attempt in range(strategy["max_retries"] + 1):
try:
return await coro_factory()
except Exception as e:
if attempt == strategy["max_retries"]:
raise
if strategy["backoff"] == "exponential_jitter":
delay = strategy["base_delay"] ** (attempt + 1) + random.uniform(0, 1)
elif strategy["backoff"] == "exponential":
delay = strategy["base_delay"] ** (attempt + 1)
elif strategy["backoff"] == "linear":
delay = strategy["base_delay"] * (attempt + 1)
else:
delay = 0
await asyncio.sleep(delay)
Putting It All Together
A coordinator that handles shared context, conflict resolution, partial failures, and priority merging:
class AdvancedCoordinator:
def __init__(self, client):
self.client = client
self.team = ResilientTeam(client)
async def coordinated_review(self, code: str,
context: SharedContext) -> dict:
subtasks = {}
for agent_id, system_base in AGENT_CONFIGS.items():
subtasks[agent_id] = {
"model": "claude-opus-4-6-20260205",
"max_tokens": 8192,
"system": build_agent_prompt(system_base, context),
"thinking": {"type": "adaptive", "effort": "deep"},
"prompt": code,
}
result = await self.team.run_with_fallback(subtasks)
# Log degraded state
if result.degraded:
for agent_id, error in result.failed.items():
logger.error(f"Agent {agent_id} failed: {error}")
# Parse and merge findings from successful agents
all_findings = []
for agent_id, output in result.successful.items():
try:
parsed = json.loads(output)
for f in parsed:
f["agent_id"] = agent_id
f["category"] = agent_id
all_findings.extend(parsed)
except json.JSONDecodeError:
logger.warning(f"Agent {agent_id} returned non-JSON output")
# Priority merge with conflict resolution
agent_priorities = {"security": 1.0, "performance": 0.8, "maintainability": 0.7}
merged = priority_merge(all_findings, agent_priorities)
return {
"findings": merged,
"agents_succeeded": list(result.successful.keys()),
"agents_failed": list(result.failed.keys()),
"degraded": result.degraded,
}
In the next lesson, you will learn how to deploy agent teams in production with monitoring, cost tracking, and observability.