Lesson 46 of 46 ~25 min
Course progress
0%

Agent Teams in Production

Production deployment of agent teams — monitoring, per-agent cost tracking, timeout management, graceful degradation, and structured logging for observability.

Running agent teams in development is straightforward. Running them in production — where you need cost controls, latency SLAs, failure recovery, and audit trails — requires infrastructure that most teams build too late. This lesson covers everything you need before deploying your first multi-agent workflow.

Production Architecture

graph TD
    Request[Incoming Request] --> Gateway[API Gateway]
    Gateway --> Coord[Coordinator]
    Coord --> A1[Agent 1]
    Coord --> A2[Agent 2]
    Coord --> A3[Agent 3]
    A1 --> Collector[Result Collector]
    A2 --> Collector
    A3 --> Collector
    Collector --> Merger[Merge & Validate]
    Merger --> Response[Response]

    A1 -.-> Metrics[Metrics Store]
    A2 -.-> Metrics
    A3 -.-> Metrics
    Coord -.-> Metrics
    Merger -.-> Logs[Structured Logs]
    Coord -.-> Logs
    Metrics -.-> Dashboard[Monitoring Dashboard]
    Logs -.-> Dashboard

Per-Agent Cost Tracking

Track cost at the individual agent level to identify which agents are expensive and whether they justify their cost:

import time
from dataclasses import dataclass, field
from datetime import datetime, timezone

# Opus 4.6 pricing per 1M tokens
PRICING = {
    "input": 15.00,
    "output": 75.00,
    "thinking": 5.00,
}

@dataclass
class AgentCostRecord:
    agent_id: str
    task_id: str
    timestamp: datetime
    input_tokens: int
    output_tokens: int
    thinking_tokens: int
    latency_ms: int
    success: bool

    @property
    def cost_usd(self) -> float:
        return (
            self.input_tokens * PRICING["input"] / 1_000_000
            + self.output_tokens * PRICING["output"] / 1_000_000
            + self.thinking_tokens * PRICING["thinking"] / 1_000_000
        )

class CostTracker:
    def __init__(self):
        self.records: list[AgentCostRecord] = []

    def record(self, agent_id: str, task_id: str, response,
               latency_ms: int, success: bool) -> AgentCostRecord:
        rec = AgentCostRecord(
            agent_id=agent_id,
            task_id=task_id,
            timestamp=datetime.now(timezone.utc),
            input_tokens=response.usage.input_tokens,
            output_tokens=response.usage.output_tokens,
            thinking_tokens=getattr(response.usage, "thinking_tokens", 0),
            latency_ms=latency_ms,
            success=success,
        )
        self.records.append(rec)
        return rec

    def summary_by_agent(self) -> dict[str, dict]:
        by_agent: dict[str, list[AgentCostRecord]] = {}
        for r in self.records:
            by_agent.setdefault(r.agent_id, []).append(r)

        summary = {}
        for agent_id, recs in by_agent.items():
            total_cost = sum(r.cost_usd for r in recs)
            avg_latency = sum(r.latency_ms for r in recs) / len(recs)
            success_rate = sum(1 for r in recs if r.success) / len(recs)
            summary[agent_id] = {
                "total_cost_usd": round(total_cost, 4),
                "call_count": len(recs),
                "avg_cost_per_call": round(total_cost / len(recs), 4),
                "avg_latency_ms": round(avg_latency),
                "success_rate": round(success_rate, 3),
            }
        return summary

    def check_budget(self, daily_budget_usd: float) -> bool:
        """Return False if daily budget is exceeded."""
        today = datetime.now(timezone.utc).date()
        today_cost = sum(
            r.cost_usd for r in self.records
            if r.timestamp.date() == today
        )
        return today_cost <= daily_budget_usd

Timeout Management

Different agents have different latency profiles. A security audit agent legitimately needs more time than a formatting agent. Use per-agent timeout configuration:

from dataclasses import dataclass

@dataclass
class TimeoutConfig:
    agent_timeout_seconds: float = 120.0
    team_timeout_seconds: float = 180.0
    merge_timeout_seconds: float = 60.0

    # Per-agent overrides
    agent_overrides: dict[str, float] = None

    def __post_init__(self):
        self.agent_overrides = self.agent_overrides or {}

    def get_agent_timeout(self, agent_id: str) -> float:
        return self.agent_overrides.get(agent_id, self.agent_timeout_seconds)

# Configuration
TIMEOUT_CONFIG = TimeoutConfig(
    agent_timeout_seconds=120.0,
    team_timeout_seconds=180.0,
    agent_overrides={
        "security": 180.0,       # Security analysis needs more time
        "performance": 120.0,
        "maintainability": 90.0, # Usually faster
    },
)

Implement a team-level timeout that caps total execution time regardless of individual agent timeouts:

import asyncio
import anthropic

class TimedTeam:
    def __init__(self, config: TimeoutConfig):
        self.config = config
        self.client = anthropic.AsyncAnthropic()

    async def run(self, subtasks: dict[str, dict]) -> dict:
        async def _run_agent(agent_id: str, cfg: dict):
            timeout = self.config.get_agent_timeout(agent_id)
            try:
                response = await asyncio.wait_for(
                    self.client.messages.create(
                        model=cfg["model"],
                        max_tokens=cfg["max_tokens"],
                        system=cfg["system"],
                        thinking=cfg.get("thinking", {"type": "adaptive"}),
                        messages=[{"role": "user", "content": cfg["prompt"]}],
                    ),
                    timeout=timeout,
                )
                text = next(b.text for b in response.content if b.type == "text")
                return agent_id, {"status": "success", "output": text, "response": response}
            except asyncio.TimeoutError:
                return agent_id, {"status": "timeout", "output": None, "response": None}
            except Exception as e:
                return agent_id, {"status": "error", "output": str(e), "response": None}

        # Team-level timeout wraps the entire parallel execution
        try:
            results = await asyncio.wait_for(
                asyncio.gather(*[_run_agent(aid, cfg) for aid, cfg in subtasks.items()]),
                timeout=self.config.team_timeout_seconds,
            )
        except asyncio.TimeoutError:
            return {"error": "Team timeout exceeded", "partial_results": {}}

        return dict(results)

Graceful Degradation

When one agent fails, the system should still return useful results. Define degradation levels based on which agents succeed:

from dataclasses import dataclass
from enum import Enum

class DegradationLevel(str, Enum):
    FULL = "full"           # All agents succeeded
    PARTIAL = "partial"     # Some agents succeeded
    MINIMAL = "minimal"     # Only one agent succeeded
    FAILED = "failed"       # No agents succeeded

@dataclass
class DegradedResponse:
    level: DegradationLevel
    result: dict
    missing_perspectives: list[str]
    warning: str | None

class GracefulDegradation:
    # Minimum agents needed for each quality level
    REQUIRED_AGENTS = {
        DegradationLevel.FULL: 3,
        DegradationLevel.PARTIAL: 2,
        DegradationLevel.MINIMAL: 1,
    }

    def evaluate(self, results: dict[str, dict],
                 all_agent_ids: list[str]) -> DegradedResponse:
        succeeded = {
            aid: res for aid, res in results.items()
            if res["status"] == "success"
        }
        failed_ids = [aid for aid in all_agent_ids if aid not in succeeded]

        count = len(succeeded)
        if count >= self.REQUIRED_AGENTS[DegradationLevel.FULL]:
            level = DegradationLevel.FULL
            warning = None
        elif count >= self.REQUIRED_AGENTS[DegradationLevel.PARTIAL]:
            level = DegradationLevel.PARTIAL
            warning = f"Missing analysis from: {', '.join(failed_ids)}"
        elif count >= self.REQUIRED_AGENTS[DegradationLevel.MINIMAL]:
            level = DegradationLevel.MINIMAL
            warning = (
                f"Only {list(succeeded.keys())[0]} agent succeeded. "
                f"Results may be incomplete."
            )
        else:
            level = DegradationLevel.FAILED
            warning = "All agents failed. No results available."

        return DegradedResponse(
            level=level,
            result={aid: res["output"] for aid, res in succeeded.items()},
            missing_perspectives=failed_ids,
            warning=warning,
        )

Structured Logging

Every agent team execution should produce a structured log entry that captures the full lifecycle:

import json
import logging
from dataclasses import dataclass, asdict
from datetime import datetime, timezone

logger = logging.getLogger("agent_teams")

@dataclass
class TeamExecutionLog:
    task_id: str
    timestamp: str
    total_latency_ms: int
    total_cost_usd: float
    degradation_level: str
    agents: list[dict]
    merge_latency_ms: int | None = None
    error: str | None = None

def log_team_execution(task_id: str, agent_results: dict,
                       cost_tracker: CostTracker,
                       start_time: float, merge_latency_ms: int | None) -> None:
    """Emit a structured log entry for a team execution."""
    import time

    total_latency = int((time.monotonic() - start_time) * 1000)
    agents_log = []

    for agent_id, result in agent_results.items():
        agents_log.append({
            "agent_id": agent_id,
            "status": result["status"],
            "latency_ms": result.get("latency_ms"),
            "tokens": result.get("tokens"),
        })

    summary = cost_tracker.summary_by_agent()
    total_cost = sum(s["total_cost_usd"] for s in summary.values())

    succeeded = sum(1 for r in agent_results.values() if r["status"] == "success")
    total = len(agent_results)
    if succeeded == total:
        level = "full"
    elif succeeded > 0:
        level = "partial"
    else:
        level = "failed"

    entry = TeamExecutionLog(
        task_id=task_id,
        timestamp=datetime.now(timezone.utc).isoformat(),
        total_latency_ms=total_latency,
        total_cost_usd=round(total_cost, 4),
        degradation_level=level,
        agents=agents_log,
        merge_latency_ms=merge_latency_ms,
    )

    logger.info(json.dumps(asdict(entry)))

Monitoring Dashboard Metrics

Export these metrics from your agent team system. They are the minimum set needed to operate confidently:

MetricTypeAlert ThresholdPurpose
agent_team.latency_p99Histogram> 2× baselineDetect slow agents
agent_team.cost_per_requestGauge> budget / expected_requestsCost control
agent_team.agent_success_rateGauge per agent< 95%Reliability
agent_team.degradation_rateCounter> 5% of requestsQuality impact
agent_team.merge_failuresCounterAny increaseIntegration issues
agent_team.thinking_tokens_totalCounter> 2× budgetCost anomaly
agent_team.concurrent_teamsGauge> rate_limit / 3Capacity planning
import time

class MetricsExporter:
    """Export agent team metrics to your observability stack."""

    def __init__(self):
        self._metrics: dict[str, list] = {}

    def emit(self, name: str, value: float, tags: dict | None = None) -> None:
        """Emit a metric data point."""
        point = {
            "name": name,
            "value": value,
            "timestamp": time.time(),
            "tags": tags or {},
        }
        self._metrics.setdefault(name, []).append(point)
        # In production, forward to Datadog / Prometheus / CloudWatch

    def record_agent_execution(self, agent_id: str, latency_ms: int,
                               cost: float, success: bool) -> None:
        self.emit("agent_team.agent_latency_ms", latency_ms,
                  {"agent_id": agent_id})
        self.emit("agent_team.agent_cost_usd", cost,
                  {"agent_id": agent_id})
        self.emit("agent_team.agent_success", 1.0 if success else 0.0,
                  {"agent_id": agent_id})

    def record_team_execution(self, task_id: str, total_latency_ms: int,
                              total_cost: float, degradation_level: str) -> None:
        self.emit("agent_team.team_latency_ms", total_latency_ms)
        self.emit("agent_team.team_cost_usd", total_cost)
        self.emit("agent_team.degradation",
                  0.0 if degradation_level == "full" else 1.0)

Production Checklist

Before shipping an agent team to production, verify each item:

CategoryCheckStatus
CostDaily budget cap implemented
CostPer-agent cost visible in dashboard
CostAlert on cost anomalies (>2× expected)
ReliabilityPer-agent timeout configured
ReliabilityTeam-level timeout configured
ReliabilityGraceful degradation tested
ReliabilityRetry strategy per failure type
ObservabilityStructured logs for every execution
ObservabilityLatency histogram per agent
ObservabilitySuccess rate per agent
SecurityAgent system prompts do not leak user data
SecurityRate limiting on team invocations
TestingFailure injection tested (kill one agent)
TestingCost under load measured

End-to-End Production Wrapper

Bring together cost tracking, timeouts, degradation, and logging into a single production-ready coordinator:

import asyncio
import time
import uuid
import anthropic

class ProductionTeamCoordinator:
    def __init__(self, daily_budget_usd: float = 100.0):
        self.client = anthropic.AsyncAnthropic()
        self.cost_tracker = CostTracker()
        self.metrics = MetricsExporter()
        self.degradation = GracefulDegradation()
        self.timeout_config = TIMEOUT_CONFIG
        self.daily_budget = daily_budget_usd

    async def execute(self, subtasks: dict[str, dict]) -> dict:
        task_id = str(uuid.uuid4())[:8]
        start = time.monotonic()

        # Budget gate
        if not self.cost_tracker.check_budget(self.daily_budget):
            return {"error": "Daily budget exceeded", "task_id": task_id}

        # Run agents with timeout
        timed_team = TimedTeam(self.timeout_config)
        raw_results = await timed_team.run(subtasks)

        # Track costs for successful agents
        for agent_id, result in raw_results.items():
            if result["response"] is not None:
                latency = int((time.monotonic() - start) * 1000)
                record = self.cost_tracker.record(
                    agent_id, task_id, result["response"],
                    latency, result["status"] == "success",
                )
                self.metrics.record_agent_execution(
                    agent_id, latency, record.cost_usd,
                    result["status"] == "success",
                )

        # Evaluate degradation
        degraded = self.degradation.evaluate(
            raw_results, list(subtasks.keys())
        )

        # Merge results from successful agents
        merge_start = time.monotonic()
        merged_output = "\n\n".join(
            f"## {aid}\n{output}"
            for aid, output in degraded.result.items()
        )
        merge_latency = int((time.monotonic() - merge_start) * 1000)

        total_latency = int((time.monotonic() - start) * 1000)
        total_cost = sum(
            s["total_cost_usd"]
            for s in self.cost_tracker.summary_by_agent().values()
        )

        # Emit team-level metrics
        self.metrics.record_team_execution(
            task_id, total_latency, total_cost, degraded.level.value,
        )

        # Structured log
        log_team_execution(task_id, raw_results, self.cost_tracker,
                           start, merge_latency)

        return {
            "task_id": task_id,
            "output": merged_output,
            "degradation_level": degraded.level.value,
            "warning": degraded.warning,
            "missing_perspectives": degraded.missing_perspectives,
            "total_cost_usd": round(total_cost, 4),
            "total_latency_ms": total_latency,
        }

You now have the full toolkit for building, coordinating, and operating multi-agent systems in production. The patterns in this module — fan-out/fan-in execution, conflict resolution, graceful degradation, and structured observability — apply to any agent team architecture, not just code review.