Lesson 23 of 46 ~25 min
Course progress
0%

Monitoring & Observability

Build monitoring for LLM-powered systems — latency tracking, token usage dashboards, quality metrics, and alerting on anomalies.

You cannot fix what you cannot see. LLM systems have unique monitoring requirements — token usage, thinking time, model degradation, and cost anomalies do not exist in traditional application monitoring. This lesson builds an observability stack designed for AI-powered systems.

Metrics to Track

CategoryMetricAlert ThresholdPurpose
Latencyp50, p95, p99 response timep95 > 30sUser experience
TokensInput/output/thinking per requestOutput > 2× averageCost control
CostPer-request, daily, monthlyDaily > budget × 1.5Budget management
ErrorsError rate by type> 5%Reliability
QualityOutput length varianceSudden drop > 30%Model degradation
FallbacksFallback activation rate> 10%Primary model health
CacheCache hit rateDrop below 50%Cost efficiency
Circuit breakerState changesAny OPEN eventSystem health

Metrics Collector

import time
import statistics
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timedelta

@dataclass
class RequestMetric:
    timestamp: float
    model: str
    input_tokens: int
    output_tokens: int
    thinking_tokens: int
    latency_ms: int
    cost_usd: float
    status: str        # "success", "error", "fallback"
    error_type: str = ""
    cache_hit: bool = False

class MetricsCollector:
    """Collect and aggregate LLM metrics."""

    def __init__(self, window_minutes: int = 60):
        self.metrics: list[RequestMetric] = []
        self.window = timedelta(minutes=window_minutes)

    def record(self, metric: RequestMetric):
        self.metrics.append(metric)
        self._cleanup_old_metrics()

    def _cleanup_old_metrics(self):
        cutoff = time.time() - self.window.total_seconds()
        self.metrics = [m for m in self.metrics if m.timestamp > cutoff]

    def latency_percentiles(self) -> dict:
        if not self.metrics:
            return {}
        latencies = [m.latency_ms for m in self.metrics
                     if m.status == "success"]
        if not latencies:
            return {}
        latencies.sort()
        n = len(latencies)
        return {
            "p50": latencies[int(n * 0.5)],
            "p95": latencies[int(n * 0.95)],
            "p99": latencies[int(n * 0.99)] if n > 100 else latencies[-1],
            "max": latencies[-1],
        }

    def error_rate(self) -> float:
        if not self.metrics:
            return 0.0
        errors = sum(1 for m in self.metrics if m.status == "error")
        return errors / len(self.metrics)

    def fallback_rate(self) -> float:
        if not self.metrics:
            return 0.0
        fallbacks = sum(1 for m in self.metrics if m.status == "fallback")
        return fallbacks / len(self.metrics)

    def cost_summary(self) -> dict:
        if not self.metrics:
            return {}
        costs = [m.cost_usd for m in self.metrics]
        return {
            "total": sum(costs),
            "average": statistics.mean(costs),
            "median": statistics.median(costs),
            "max": max(costs),
            "count": len(costs),
        }

    def token_summary(self) -> dict:
        if not self.metrics:
            return {}
        return {
            "total_input": sum(m.input_tokens for m in self.metrics),
            "total_output": sum(m.output_tokens for m in self.metrics),
            "total_thinking": sum(m.thinking_tokens for m in self.metrics),
            "avg_input": statistics.mean(
                m.input_tokens for m in self.metrics
            ),
            "avg_output": statistics.mean(
                m.output_tokens for m in self.metrics
            ),
        }

    def cache_hit_rate(self) -> float:
        if not self.metrics:
            return 0.0
        hits = sum(1 for m in self.metrics if m.cache_hit)
        return hits / len(self.metrics)

Anomaly Detection

class AnomalyDetector:
    """Detect anomalies in LLM metrics."""

    def __init__(self, collector: MetricsCollector,
                 sensitivity: float = 2.0):
        self.collector = collector
        self.sensitivity = sensitivity  # Standard deviations
        self.baselines: dict = {}

    def update_baselines(self):
        """Calculate baselines from recent metrics."""
        metrics = self.collector.metrics
        if len(metrics) < 20:
            return  # Need minimum data for baselines

        self.baselines = {
            "latency_mean": statistics.mean(
                m.latency_ms for m in metrics if m.status == "success"
            ),
            "latency_stdev": statistics.stdev(
                m.latency_ms for m in metrics if m.status == "success"
            ),
            "output_tokens_mean": statistics.mean(
                m.output_tokens for m in metrics
            ),
            "output_tokens_stdev": statistics.stdev(
                m.output_tokens for m in metrics
            ),
            "cost_mean": statistics.mean(m.cost_usd for m in metrics),
            "cost_stdev": statistics.stdev(m.cost_usd for m in metrics),
        }

    def check(self, metric: RequestMetric) -> list[dict]:
        """Check a single metric for anomalies."""
        if not self.baselines:
            return []

        alerts = []

        # Latency anomaly
        if metric.status == "success":
            z_latency = (
                (metric.latency_ms - self.baselines["latency_mean"])
                / max(self.baselines["latency_stdev"], 1)
            )
            if z_latency > self.sensitivity:
                alerts.append({
                    "type": "latency_spike",
                    "severity": "warning" if z_latency < 3 else "critical",
                    "value": metric.latency_ms,
                    "baseline": self.baselines["latency_mean"],
                    "z_score": round(z_latency, 2),
                })

        # Output length anomaly (potential model degradation)
        if metric.output_tokens > 0:
            z_output = (
                (metric.output_tokens - self.baselines["output_tokens_mean"])
                / max(self.baselines["output_tokens_stdev"], 1)
            )
            if abs(z_output) > self.sensitivity:
                alerts.append({
                    "type": "output_length_anomaly",
                    "severity": "warning",
                    "value": metric.output_tokens,
                    "baseline": self.baselines["output_tokens_mean"],
                    "z_score": round(z_output, 2),
                    "direction": "high" if z_output > 0 else "low",
                })

        # Cost anomaly
        z_cost = (
            (metric.cost_usd - self.baselines["cost_mean"])
            / max(self.baselines["cost_stdev"], 0.001)
        )
        if z_cost > self.sensitivity:
            alerts.append({
                "type": "cost_spike",
                "severity": "warning" if z_cost < 3 else "critical",
                "value": metric.cost_usd,
                "baseline": self.baselines["cost_mean"],
                "z_score": round(z_cost, 2),
            })

        return alerts

Dashboard Output

class Dashboard:
    """Real-time monitoring dashboard for LLM systems."""

    def __init__(self, collector: MetricsCollector,
                 anomaly_detector: AnomalyDetector):
        self.collector = collector
        self.detector = anomaly_detector

    def render(self) -> str:
        """Render a text-based dashboard."""
        latency = self.collector.latency_percentiles()
        costs = self.collector.cost_summary()
        tokens = self.collector.token_summary()

        return f"""
╔══════════════════════════════════════════════════╗
║           LLM System Dashboard                   ║
{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
╠══════════════════════════════════════════════════╣
║ LATENCY                                          ║
║   p50: {latency.get('p50', 'N/A'):>6}ms  │  p95: {latency.get('p95', 'N/A'):>6}ms       ║
║   p99: {latency.get('p99', 'N/A'):>6}ms  │  max: {latency.get('max', 'N/A'):>6}ms       ║
╠══════════════════════════════════════════════════╣
║ RELIABILITY                                      ║
║   Error rate:    {self.collector.error_rate():>6.1%}
║   Fallback rate: {self.collector.fallback_rate():>6.1%}
║   Cache hit rate:{self.collector.cache_hit_rate():>6.1%}
╠══════════════════════════════════════════════════╣
║ COST (current window)                            ║
║   Total:   ${costs.get('total', 0):>8.2f}
║   Average: ${costs.get('average', 0):>8.4f}/req                    ║
║   Requests:{costs.get('count', 0):>6}
╠══════════════════════════════════════════════════╣
║ TOKENS (current window)                          ║
║   Input:    {tokens.get('total_input', 0):>10,}
║   Output:   {tokens.get('total_output', 0):>10,}
║   Thinking: {tokens.get('total_thinking', 0):>10,}
╚══════════════════════════════════════════════════╝
"""

Alerting Integration

class AlertManager:
    """Route alerts to appropriate channels."""

    def __init__(self):
        self.handlers: dict[str, list[callable]] = {
            "warning": [],
            "critical": [],
        }

    def register_handler(self, severity: str, handler: callable):
        self.handlers[severity].append(handler)

    def alert(self, alert: dict):
        severity = alert.get("severity", "warning")
        for handler in self.handlers.get(severity, []):
            handler(alert)

    @staticmethod
    def format_alert(alert: dict) -> str:
        return (
            f"[{alert['severity'].upper()}] {alert['type']}: "
            f"value={alert['value']}, baseline={alert['baseline']}, "
            f"z_score={alert['z_score']}"
        )

# Example handlers
def slack_handler(alert: dict):
    """Send alert to Slack."""
    message = AlertManager.format_alert(alert)
    # requests.post(SLACK_WEBHOOK, json={"text": message})
    print(f"📢 Slack: {message}")

def pagerduty_handler(alert: dict):
    """Trigger PagerDuty for critical alerts."""
    message = AlertManager.format_alert(alert)
    # requests.post(PD_ENDPOINT, json={"event": message})
    print(f"🚨 PagerDuty: {message}")

# Setup
alert_mgr = AlertManager()
alert_mgr.register_handler("warning", slack_handler)
alert_mgr.register_handler("critical", slack_handler)
alert_mgr.register_handler("critical", pagerduty_handler)

Monitoring tells you what is happening. In the next lesson, you will learn how to diagnose why things fail — systematic troubleshooting for the most common LLM failure modes.