Running agent teams in development is straightforward. Running them in production — where you need cost controls, latency SLAs, failure recovery, and audit trails — requires infrastructure that most teams build too late. This lesson covers everything you need before deploying your first multi-agent workflow.
Production Architecture
graph TD
Request[Incoming Request] --> Gateway[API Gateway]
Gateway --> Coord[Coordinator]
Coord --> A1[Agent 1]
Coord --> A2[Agent 2]
Coord --> A3[Agent 3]
A1 --> Collector[Result Collector]
A2 --> Collector
A3 --> Collector
Collector --> Merger[Merge & Validate]
Merger --> Response[Response]
A1 -.-> Metrics[Metrics Store]
A2 -.-> Metrics
A3 -.-> Metrics
Coord -.-> Metrics
Merger -.-> Logs[Structured Logs]
Coord -.-> Logs
Metrics -.-> Dashboard[Monitoring Dashboard]
Logs -.-> Dashboard
Per-Agent Cost Tracking
Track cost at the individual agent level to identify which agents are expensive and whether they justify their cost:
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
# Opus 4.6 pricing per 1M tokens
PRICING = {
"input": 15.00,
"output": 75.00,
"thinking": 5.00,
}
@dataclass
class AgentCostRecord:
agent_id: str
task_id: str
timestamp: datetime
input_tokens: int
output_tokens: int
thinking_tokens: int
latency_ms: int
success: bool
@property
def cost_usd(self) -> float:
return (
self.input_tokens * PRICING["input"] / 1_000_000
+ self.output_tokens * PRICING["output"] / 1_000_000
+ self.thinking_tokens * PRICING["thinking"] / 1_000_000
)
class CostTracker:
def __init__(self):
self.records: list[AgentCostRecord] = []
def record(self, agent_id: str, task_id: str, response,
latency_ms: int, success: bool) -> AgentCostRecord:
rec = AgentCostRecord(
agent_id=agent_id,
task_id=task_id,
timestamp=datetime.now(timezone.utc),
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
thinking_tokens=getattr(response.usage, "thinking_tokens", 0),
latency_ms=latency_ms,
success=success,
)
self.records.append(rec)
return rec
def summary_by_agent(self) -> dict[str, dict]:
by_agent: dict[str, list[AgentCostRecord]] = {}
for r in self.records:
by_agent.setdefault(r.agent_id, []).append(r)
summary = {}
for agent_id, recs in by_agent.items():
total_cost = sum(r.cost_usd for r in recs)
avg_latency = sum(r.latency_ms for r in recs) / len(recs)
success_rate = sum(1 for r in recs if r.success) / len(recs)
summary[agent_id] = {
"total_cost_usd": round(total_cost, 4),
"call_count": len(recs),
"avg_cost_per_call": round(total_cost / len(recs), 4),
"avg_latency_ms": round(avg_latency),
"success_rate": round(success_rate, 3),
}
return summary
def check_budget(self, daily_budget_usd: float) -> bool:
"""Return False if daily budget is exceeded."""
today = datetime.now(timezone.utc).date()
today_cost = sum(
r.cost_usd for r in self.records
if r.timestamp.date() == today
)
return today_cost <= daily_budget_usd
Timeout Management
Different agents have different latency profiles. A security audit agent legitimately needs more time than a formatting agent. Use per-agent timeout configuration:
from dataclasses import dataclass
@dataclass
class TimeoutConfig:
agent_timeout_seconds: float = 120.0
team_timeout_seconds: float = 180.0
merge_timeout_seconds: float = 60.0
# Per-agent overrides
agent_overrides: dict[str, float] = None
def __post_init__(self):
self.agent_overrides = self.agent_overrides or {}
def get_agent_timeout(self, agent_id: str) -> float:
return self.agent_overrides.get(agent_id, self.agent_timeout_seconds)
# Configuration
TIMEOUT_CONFIG = TimeoutConfig(
agent_timeout_seconds=120.0,
team_timeout_seconds=180.0,
agent_overrides={
"security": 180.0, # Security analysis needs more time
"performance": 120.0,
"maintainability": 90.0, # Usually faster
},
)
Implement a team-level timeout that caps total execution time regardless of individual agent timeouts:
import asyncio
import anthropic
class TimedTeam:
def __init__(self, config: TimeoutConfig):
self.config = config
self.client = anthropic.AsyncAnthropic()
async def run(self, subtasks: dict[str, dict]) -> dict:
async def _run_agent(agent_id: str, cfg: dict):
timeout = self.config.get_agent_timeout(agent_id)
try:
response = await asyncio.wait_for(
self.client.messages.create(
model=cfg["model"],
max_tokens=cfg["max_tokens"],
system=cfg["system"],
thinking=cfg.get("thinking", {"type": "adaptive"}),
messages=[{"role": "user", "content": cfg["prompt"]}],
),
timeout=timeout,
)
text = next(b.text for b in response.content if b.type == "text")
return agent_id, {"status": "success", "output": text, "response": response}
except asyncio.TimeoutError:
return agent_id, {"status": "timeout", "output": None, "response": None}
except Exception as e:
return agent_id, {"status": "error", "output": str(e), "response": None}
# Team-level timeout wraps the entire parallel execution
try:
results = await asyncio.wait_for(
asyncio.gather(*[_run_agent(aid, cfg) for aid, cfg in subtasks.items()]),
timeout=self.config.team_timeout_seconds,
)
except asyncio.TimeoutError:
return {"error": "Team timeout exceeded", "partial_results": {}}
return dict(results)
Graceful Degradation
When one agent fails, the system should still return useful results. Define degradation levels based on which agents succeed:
from dataclasses import dataclass
from enum import Enum
class DegradationLevel(str, Enum):
FULL = "full" # All agents succeeded
PARTIAL = "partial" # Some agents succeeded
MINIMAL = "minimal" # Only one agent succeeded
FAILED = "failed" # No agents succeeded
@dataclass
class DegradedResponse:
level: DegradationLevel
result: dict
missing_perspectives: list[str]
warning: str | None
class GracefulDegradation:
# Minimum agents needed for each quality level
REQUIRED_AGENTS = {
DegradationLevel.FULL: 3,
DegradationLevel.PARTIAL: 2,
DegradationLevel.MINIMAL: 1,
}
def evaluate(self, results: dict[str, dict],
all_agent_ids: list[str]) -> DegradedResponse:
succeeded = {
aid: res for aid, res in results.items()
if res["status"] == "success"
}
failed_ids = [aid for aid in all_agent_ids if aid not in succeeded]
count = len(succeeded)
if count >= self.REQUIRED_AGENTS[DegradationLevel.FULL]:
level = DegradationLevel.FULL
warning = None
elif count >= self.REQUIRED_AGENTS[DegradationLevel.PARTIAL]:
level = DegradationLevel.PARTIAL
warning = f"Missing analysis from: {', '.join(failed_ids)}"
elif count >= self.REQUIRED_AGENTS[DegradationLevel.MINIMAL]:
level = DegradationLevel.MINIMAL
warning = (
f"Only {list(succeeded.keys())[0]} agent succeeded. "
f"Results may be incomplete."
)
else:
level = DegradationLevel.FAILED
warning = "All agents failed. No results available."
return DegradedResponse(
level=level,
result={aid: res["output"] for aid, res in succeeded.items()},
missing_perspectives=failed_ids,
warning=warning,
)
Structured Logging
Every agent team execution should produce a structured log entry that captures the full lifecycle:
import json
import logging
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
logger = logging.getLogger("agent_teams")
@dataclass
class TeamExecutionLog:
task_id: str
timestamp: str
total_latency_ms: int
total_cost_usd: float
degradation_level: str
agents: list[dict]
merge_latency_ms: int | None = None
error: str | None = None
def log_team_execution(task_id: str, agent_results: dict,
cost_tracker: CostTracker,
start_time: float, merge_latency_ms: int | None) -> None:
"""Emit a structured log entry for a team execution."""
import time
total_latency = int((time.monotonic() - start_time) * 1000)
agents_log = []
for agent_id, result in agent_results.items():
agents_log.append({
"agent_id": agent_id,
"status": result["status"],
"latency_ms": result.get("latency_ms"),
"tokens": result.get("tokens"),
})
summary = cost_tracker.summary_by_agent()
total_cost = sum(s["total_cost_usd"] for s in summary.values())
succeeded = sum(1 for r in agent_results.values() if r["status"] == "success")
total = len(agent_results)
if succeeded == total:
level = "full"
elif succeeded > 0:
level = "partial"
else:
level = "failed"
entry = TeamExecutionLog(
task_id=task_id,
timestamp=datetime.now(timezone.utc).isoformat(),
total_latency_ms=total_latency,
total_cost_usd=round(total_cost, 4),
degradation_level=level,
agents=agents_log,
merge_latency_ms=merge_latency_ms,
)
logger.info(json.dumps(asdict(entry)))
Monitoring Dashboard Metrics
Export these metrics from your agent team system. They are the minimum set needed to operate confidently:
| Metric | Type | Alert Threshold | Purpose |
|---|---|---|---|
agent_team.latency_p99 | Histogram | > 2× baseline | Detect slow agents |
agent_team.cost_per_request | Gauge | > budget / expected_requests | Cost control |
agent_team.agent_success_rate | Gauge per agent | < 95% | Reliability |
agent_team.degradation_rate | Counter | > 5% of requests | Quality impact |
agent_team.merge_failures | Counter | Any increase | Integration issues |
agent_team.thinking_tokens_total | Counter | > 2× budget | Cost anomaly |
agent_team.concurrent_teams | Gauge | > rate_limit / 3 | Capacity planning |
import time
class MetricsExporter:
"""Export agent team metrics to your observability stack."""
def __init__(self):
self._metrics: dict[str, list] = {}
def emit(self, name: str, value: float, tags: dict | None = None) -> None:
"""Emit a metric data point."""
point = {
"name": name,
"value": value,
"timestamp": time.time(),
"tags": tags or {},
}
self._metrics.setdefault(name, []).append(point)
# In production, forward to Datadog / Prometheus / CloudWatch
def record_agent_execution(self, agent_id: str, latency_ms: int,
cost: float, success: bool) -> None:
self.emit("agent_team.agent_latency_ms", latency_ms,
{"agent_id": agent_id})
self.emit("agent_team.agent_cost_usd", cost,
{"agent_id": agent_id})
self.emit("agent_team.agent_success", 1.0 if success else 0.0,
{"agent_id": agent_id})
def record_team_execution(self, task_id: str, total_latency_ms: int,
total_cost: float, degradation_level: str) -> None:
self.emit("agent_team.team_latency_ms", total_latency_ms)
self.emit("agent_team.team_cost_usd", total_cost)
self.emit("agent_team.degradation",
0.0 if degradation_level == "full" else 1.0)
Production Checklist
Before shipping an agent team to production, verify each item:
| Category | Check | Status |
|---|---|---|
| Cost | Daily budget cap implemented | ☐ |
| Cost | Per-agent cost visible in dashboard | ☐ |
| Cost | Alert on cost anomalies (>2× expected) | ☐ |
| Reliability | Per-agent timeout configured | ☐ |
| Reliability | Team-level timeout configured | ☐ |
| Reliability | Graceful degradation tested | ☐ |
| Reliability | Retry strategy per failure type | ☐ |
| Observability | Structured logs for every execution | ☐ |
| Observability | Latency histogram per agent | ☐ |
| Observability | Success rate per agent | ☐ |
| Security | Agent system prompts do not leak user data | ☐ |
| Security | Rate limiting on team invocations | ☐ |
| Testing | Failure injection tested (kill one agent) | ☐ |
| Testing | Cost under load measured | ☐ |
End-to-End Production Wrapper
Bring together cost tracking, timeouts, degradation, and logging into a single production-ready coordinator:
import asyncio
import time
import uuid
import anthropic
class ProductionTeamCoordinator:
def __init__(self, daily_budget_usd: float = 100.0):
self.client = anthropic.AsyncAnthropic()
self.cost_tracker = CostTracker()
self.metrics = MetricsExporter()
self.degradation = GracefulDegradation()
self.timeout_config = TIMEOUT_CONFIG
self.daily_budget = daily_budget_usd
async def execute(self, subtasks: dict[str, dict]) -> dict:
task_id = str(uuid.uuid4())[:8]
start = time.monotonic()
# Budget gate
if not self.cost_tracker.check_budget(self.daily_budget):
return {"error": "Daily budget exceeded", "task_id": task_id}
# Run agents with timeout
timed_team = TimedTeam(self.timeout_config)
raw_results = await timed_team.run(subtasks)
# Track costs for successful agents
for agent_id, result in raw_results.items():
if result["response"] is not None:
latency = int((time.monotonic() - start) * 1000)
record = self.cost_tracker.record(
agent_id, task_id, result["response"],
latency, result["status"] == "success",
)
self.metrics.record_agent_execution(
agent_id, latency, record.cost_usd,
result["status"] == "success",
)
# Evaluate degradation
degraded = self.degradation.evaluate(
raw_results, list(subtasks.keys())
)
# Merge results from successful agents
merge_start = time.monotonic()
merged_output = "\n\n".join(
f"## {aid}\n{output}"
for aid, output in degraded.result.items()
)
merge_latency = int((time.monotonic() - merge_start) * 1000)
total_latency = int((time.monotonic() - start) * 1000)
total_cost = sum(
s["total_cost_usd"]
for s in self.cost_tracker.summary_by_agent().values()
)
# Emit team-level metrics
self.metrics.record_team_execution(
task_id, total_latency, total_cost, degraded.level.value,
)
# Structured log
log_team_execution(task_id, raw_results, self.cost_tracker,
start, merge_latency)
return {
"task_id": task_id,
"output": merged_output,
"degradation_level": degraded.level.value,
"warning": degraded.warning,
"missing_perspectives": degraded.missing_perspectives,
"total_cost_usd": round(total_cost, 4),
"total_latency_ms": total_latency,
}
You now have the full toolkit for building, coordinating, and operating multi-agent systems in production. The patterns in this module — fan-out/fan-in execution, conflict resolution, graceful degradation, and structured observability — apply to any agent team architecture, not just code review.