You cannot fix what you cannot see. LLM systems have unique monitoring requirements — token usage, thinking time, model degradation, and cost anomalies do not exist in traditional application monitoring. This lesson builds an observability stack designed for AI-powered systems.
Metrics to Track
| Category | Metric | Alert Threshold | Purpose |
|---|---|---|---|
| Latency | p50, p95, p99 response time | p95 > 30s | User experience |
| Tokens | Input/output/thinking per request | Output > 2× average | Cost control |
| Cost | Per-request, daily, monthly | Daily > budget × 1.5 | Budget management |
| Errors | Error rate by type | > 5% | Reliability |
| Quality | Output length variance | Sudden drop > 30% | Model degradation |
| Fallbacks | Fallback activation rate | > 10% | Primary model health |
| Cache | Cache hit rate | Drop below 50% | Cost efficiency |
| Circuit breaker | State changes | Any OPEN event | System health |
Metrics Collector
import time
import statistics
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timedelta
@dataclass
class RequestMetric:
timestamp: float
model: str
input_tokens: int
output_tokens: int
thinking_tokens: int
latency_ms: int
cost_usd: float
status: str # "success", "error", "fallback"
error_type: str = ""
cache_hit: bool = False
class MetricsCollector:
"""Collect and aggregate LLM metrics."""
def __init__(self, window_minutes: int = 60):
self.metrics: list[RequestMetric] = []
self.window = timedelta(minutes=window_minutes)
def record(self, metric: RequestMetric):
self.metrics.append(metric)
self._cleanup_old_metrics()
def _cleanup_old_metrics(self):
cutoff = time.time() - self.window.total_seconds()
self.metrics = [m for m in self.metrics if m.timestamp > cutoff]
def latency_percentiles(self) -> dict:
if not self.metrics:
return {}
latencies = [m.latency_ms for m in self.metrics
if m.status == "success"]
if not latencies:
return {}
latencies.sort()
n = len(latencies)
return {
"p50": latencies[int(n * 0.5)],
"p95": latencies[int(n * 0.95)],
"p99": latencies[int(n * 0.99)] if n > 100 else latencies[-1],
"max": latencies[-1],
}
def error_rate(self) -> float:
if not self.metrics:
return 0.0
errors = sum(1 for m in self.metrics if m.status == "error")
return errors / len(self.metrics)
def fallback_rate(self) -> float:
if not self.metrics:
return 0.0
fallbacks = sum(1 for m in self.metrics if m.status == "fallback")
return fallbacks / len(self.metrics)
def cost_summary(self) -> dict:
if not self.metrics:
return {}
costs = [m.cost_usd for m in self.metrics]
return {
"total": sum(costs),
"average": statistics.mean(costs),
"median": statistics.median(costs),
"max": max(costs),
"count": len(costs),
}
def token_summary(self) -> dict:
if not self.metrics:
return {}
return {
"total_input": sum(m.input_tokens for m in self.metrics),
"total_output": sum(m.output_tokens for m in self.metrics),
"total_thinking": sum(m.thinking_tokens for m in self.metrics),
"avg_input": statistics.mean(
m.input_tokens for m in self.metrics
),
"avg_output": statistics.mean(
m.output_tokens for m in self.metrics
),
}
def cache_hit_rate(self) -> float:
if not self.metrics:
return 0.0
hits = sum(1 for m in self.metrics if m.cache_hit)
return hits / len(self.metrics)
Anomaly Detection
class AnomalyDetector:
"""Detect anomalies in LLM metrics."""
def __init__(self, collector: MetricsCollector,
sensitivity: float = 2.0):
self.collector = collector
self.sensitivity = sensitivity # Standard deviations
self.baselines: dict = {}
def update_baselines(self):
"""Calculate baselines from recent metrics."""
metrics = self.collector.metrics
if len(metrics) < 20:
return # Need minimum data for baselines
self.baselines = {
"latency_mean": statistics.mean(
m.latency_ms for m in metrics if m.status == "success"
),
"latency_stdev": statistics.stdev(
m.latency_ms for m in metrics if m.status == "success"
),
"output_tokens_mean": statistics.mean(
m.output_tokens for m in metrics
),
"output_tokens_stdev": statistics.stdev(
m.output_tokens for m in metrics
),
"cost_mean": statistics.mean(m.cost_usd for m in metrics),
"cost_stdev": statistics.stdev(m.cost_usd for m in metrics),
}
def check(self, metric: RequestMetric) -> list[dict]:
"""Check a single metric for anomalies."""
if not self.baselines:
return []
alerts = []
# Latency anomaly
if metric.status == "success":
z_latency = (
(metric.latency_ms - self.baselines["latency_mean"])
/ max(self.baselines["latency_stdev"], 1)
)
if z_latency > self.sensitivity:
alerts.append({
"type": "latency_spike",
"severity": "warning" if z_latency < 3 else "critical",
"value": metric.latency_ms,
"baseline": self.baselines["latency_mean"],
"z_score": round(z_latency, 2),
})
# Output length anomaly (potential model degradation)
if metric.output_tokens > 0:
z_output = (
(metric.output_tokens - self.baselines["output_tokens_mean"])
/ max(self.baselines["output_tokens_stdev"], 1)
)
if abs(z_output) > self.sensitivity:
alerts.append({
"type": "output_length_anomaly",
"severity": "warning",
"value": metric.output_tokens,
"baseline": self.baselines["output_tokens_mean"],
"z_score": round(z_output, 2),
"direction": "high" if z_output > 0 else "low",
})
# Cost anomaly
z_cost = (
(metric.cost_usd - self.baselines["cost_mean"])
/ max(self.baselines["cost_stdev"], 0.001)
)
if z_cost > self.sensitivity:
alerts.append({
"type": "cost_spike",
"severity": "warning" if z_cost < 3 else "critical",
"value": metric.cost_usd,
"baseline": self.baselines["cost_mean"],
"z_score": round(z_cost, 2),
})
return alerts
Dashboard Output
class Dashboard:
"""Real-time monitoring dashboard for LLM systems."""
def __init__(self, collector: MetricsCollector,
anomaly_detector: AnomalyDetector):
self.collector = collector
self.detector = anomaly_detector
def render(self) -> str:
"""Render a text-based dashboard."""
latency = self.collector.latency_percentiles()
costs = self.collector.cost_summary()
tokens = self.collector.token_summary()
return f"""
╔══════════════════════════════════════════════════╗
║ LLM System Dashboard ║
║ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ║
╠══════════════════════════════════════════════════╣
║ LATENCY ║
║ p50: {latency.get('p50', 'N/A'):>6}ms │ p95: {latency.get('p95', 'N/A'):>6}ms ║
║ p99: {latency.get('p99', 'N/A'):>6}ms │ max: {latency.get('max', 'N/A'):>6}ms ║
╠══════════════════════════════════════════════════╣
║ RELIABILITY ║
║ Error rate: {self.collector.error_rate():>6.1%} ║
║ Fallback rate: {self.collector.fallback_rate():>6.1%} ║
║ Cache hit rate:{self.collector.cache_hit_rate():>6.1%} ║
╠══════════════════════════════════════════════════╣
║ COST (current window) ║
║ Total: ${costs.get('total', 0):>8.2f} ║
║ Average: ${costs.get('average', 0):>8.4f}/req ║
║ Requests:{costs.get('count', 0):>6} ║
╠══════════════════════════════════════════════════╣
║ TOKENS (current window) ║
║ Input: {tokens.get('total_input', 0):>10,} ║
║ Output: {tokens.get('total_output', 0):>10,} ║
║ Thinking: {tokens.get('total_thinking', 0):>10,} ║
╚══════════════════════════════════════════════════╝
"""
Alerting Integration
class AlertManager:
"""Route alerts to appropriate channels."""
def __init__(self):
self.handlers: dict[str, list[callable]] = {
"warning": [],
"critical": [],
}
def register_handler(self, severity: str, handler: callable):
self.handlers[severity].append(handler)
def alert(self, alert: dict):
severity = alert.get("severity", "warning")
for handler in self.handlers.get(severity, []):
handler(alert)
@staticmethod
def format_alert(alert: dict) -> str:
return (
f"[{alert['severity'].upper()}] {alert['type']}: "
f"value={alert['value']}, baseline={alert['baseline']}, "
f"z_score={alert['z_score']}"
)
# Example handlers
def slack_handler(alert: dict):
"""Send alert to Slack."""
message = AlertManager.format_alert(alert)
# requests.post(SLACK_WEBHOOK, json={"text": message})
print(f"📢 Slack: {message}")
def pagerduty_handler(alert: dict):
"""Trigger PagerDuty for critical alerts."""
message = AlertManager.format_alert(alert)
# requests.post(PD_ENDPOINT, json={"event": message})
print(f"🚨 PagerDuty: {message}")
# Setup
alert_mgr = AlertManager()
alert_mgr.register_handler("warning", slack_handler)
alert_mgr.register_handler("critical", slack_handler)
alert_mgr.register_handler("critical", pagerduty_handler)
Monitoring tells you what is happening. In the next lesson, you will learn how to diagnose why things fail — systematic troubleshooting for the most common LLM failure modes.