Lesson 35 of 46 ~25 min
Course progress
0%

Common Failure Modes & Diagnosis

Systematically diagnose common LLM failures — rate limits, timeout errors, degraded output quality, context overflow, and API changes.

LLM systems fail in ways that traditional software does not. A REST API either returns a response or an error code. An LLM can return a wrong response with a 200 status code — and that failure is invisible to your monitoring unless you know what to look for.

Failure Mode Taxonomy

LLM Failures
├── Infrastructure Failures (visible)
│   ├── Rate limits (429)
│   ├── Timeouts (504/408)
│   ├── Server errors (500/502/503)
│   └── Authentication errors (401/403)

├── Quality Failures (invisible to HTTP)
│   ├── Hallucination
│   ├── Instruction non-compliance
│   ├── Output truncation
│   ├── Sycophantic responses
│   └── Context window overflow

└── Cost Failures
    ├── Thinking token explosion
    ├── Prompt caching failures
    └── Model routing errors

Failure 1: Rate Limits

Diagnosis

class RateLimitDiagnostics:
    """Diagnose and prevent rate limit issues."""

    def __init__(self):
        self.request_timestamps: list[float] = []

    def record_request(self):
        self.request_timestamps.append(time.time())
        # Keep only last 5 minutes
        cutoff = time.time() - 300
        self.request_timestamps = [
            t for t in self.request_timestamps if t > cutoff
        ]

    def current_rpm(self) -> float:
        """Calculate current requests per minute."""
        one_minute_ago = time.time() - 60
        recent = [t for t in self.request_timestamps if t > one_minute_ago]
        return len(recent)

    def diagnose(self, error) -> dict:
        """Diagnose a rate limit error."""
        rpm = self.current_rpm()
        retry_after = None
        if hasattr(error, 'response') and error.response is not None:
            retry_after = error.response.headers.get('retry-after')

        return {
            "cause": "rate_limit",
            "current_rpm": rpm,
            "retry_after_seconds": retry_after,
            "recommendations": [
                "Implement request queuing with rate limiting",
                f"Current RPM ({rpm}) may exceed tier limit",
                "Consider upgrading API tier for higher limits",
                "Batch requests where possible to reduce call count",
            ]
        }

Failure 2: Timeouts

import time

class TimeoutDiagnostics:
    """Diagnose timeout issues."""

    def diagnose(self, prompt: str, timeout_seconds: float,
                 model: str) -> dict:
        """Diagnose why a request timed out."""
        estimated_input_tokens = len(prompt) // 4
        recommendations = []

        # Large input → slow processing
        if estimated_input_tokens > 100_000:
            recommendations.append(
                f"Input is very large (~{estimated_input_tokens:,} tokens). "
                f"Consider chunking or using RAG instead of full context."
            )

        # Thinking mode can cause long processing
        if "opus" in model.lower():
            recommendations.append(
                "Opus with adaptive thinking can take 30-120s for complex "
                "tasks. Set thinking effort to 'standard' or 'quick' for "
                "faster responses."
            )

        # Timeout too aggressive
        if timeout_seconds < 30:
            recommendations.append(
                f"Timeout of {timeout_seconds}s is aggressive for Opus. "
                f"Increase to 120s for standard tasks, 300s for deep thinking."
            )

        recommendations.append(
            "Use streaming to get partial results and avoid full timeouts."
        )

        return {
            "cause": "timeout",
            "estimated_input_tokens": estimated_input_tokens,
            "timeout_seconds": timeout_seconds,
            "model": model,
            "recommendations": recommendations,
        }

Failure 3: Degraded Output Quality

This is the hardest failure to detect — the model returns a response, but it is wrong, incomplete, or low quality.

class QualityDiagnostics:
    """Detect and diagnose output quality degradation."""

    def __init__(self):
        from anthropic import Anthropic
        self.client = Anthropic()
        self.quality_history: list[dict] = []

    def assess_quality(self, prompt: str, output: str,
                       expected_properties: list[str]) -> dict:
        """Assess output quality against expected properties."""
        checks = {}

        # Length check
        if len(output.strip()) < 50:
            checks["suspiciously_short"] = True

        # Refusal detection
        refusal_phrases = [
            "I cannot", "I'm unable to", "I don't have access",
            "I apologize, but", "I'm not able to"
        ]
        checks["possible_refusal"] = any(
            phrase in output for phrase in refusal_phrases
        )

        # Repetition detection
        sentences = output.split('. ')
        if len(sentences) > 5:
            unique = set(sentences)
            checks["repetition_ratio"] = 1 - (len(unique) / len(sentences))
        else:
            checks["repetition_ratio"] = 0.0

        # Truncation detection
        checks["possibly_truncated"] = (
            not output.rstrip().endswith(('.', '!', '?', '```', '"', ')'))
            and len(output) > 100
        )

        # Expected property check
        checks["missing_properties"] = [
            prop for prop in expected_properties
            if prop.lower() not in output.lower()
        ]

        # Overall quality score
        score = 1.0
        if checks.get("suspiciously_short"):
            score -= 0.3
        if checks.get("possible_refusal"):
            score -= 0.5
        if checks.get("repetition_ratio", 0) > 0.3:
            score -= 0.2
        if checks.get("possibly_truncated"):
            score -= 0.2
        if checks.get("missing_properties"):
            score -= 0.1 * len(checks["missing_properties"])

        checks["quality_score"] = max(0.0, score)

        self.quality_history.append(checks)
        return checks

    def detect_quality_trend(self) -> dict:
        """Detect if quality is trending downward."""
        if len(self.quality_history) < 10:
            return {"trend": "insufficient_data"}

        recent = self.quality_history[-10:]
        older = self.quality_history[-20:-10] if len(self.quality_history) >= 20 else []

        recent_avg = sum(
            r.get("quality_score", 0) for r in recent
        ) / len(recent)

        if older:
            older_avg = sum(
                r.get("quality_score", 0) for r in older
            ) / len(older)
            delta = recent_avg - older_avg
        else:
            delta = 0

        return {
            "trend": "declining" if delta < -0.1 else
                     "improving" if delta > 0.1 else "stable",
            "recent_avg_quality": round(recent_avg, 2),
            "quality_delta": round(delta, 2),
        }

Failure 4: Context Window Overflow

class ContextOverflowDiagnostics:
    """Diagnose context window overflow issues."""

    MODEL_LIMITS = {
        "claude-opus-4-6-20260205": 1_000_000,
        "claude-sonnet-4-5-20241022": 200_000,
        "claude-haiku-4-5-20241022": 200_000,
    }

    def check_context_budget(self, model: str, messages: list[dict],
                              system: str = "",
                              max_output_tokens: int = 4096) -> dict:
        """Check if a request will exceed context limits."""
        # Rough token estimation
        system_tokens = len(system) // 4
        message_tokens = sum(
            len(str(m.get("content", ""))) // 4 for m in messages
        )
        total_input = system_tokens + message_tokens
        model_limit = self.MODEL_LIMITS.get(model, 200_000)

        available_for_output = model_limit - total_input
        will_overflow = available_for_output < max_output_tokens

        result = {
            "model": model,
            "model_limit": model_limit,
            "estimated_input_tokens": total_input,
            "requested_output_tokens": max_output_tokens,
            "available_for_output": max(0, available_for_output),
            "will_overflow": will_overflow,
            "utilization": total_input / model_limit,
        }

        if will_overflow:
            result["recommendations"] = [
                f"Input ({total_input:,}) + output ({max_output_tokens:,}) "
                f"exceeds limit ({model_limit:,})",
                "Reduce input by summarizing older messages",
                "Enable compaction API for automatic management",
                "Use RAG to load only relevant context",
            ]

        return result

Failure 5: API Changes

class APIChangeDetector:
    """Detect unexpected API behavior changes."""

    def __init__(self):
        self.response_schemas: dict[str, set] = {}

    def record_response_shape(self, model: str, response):
        """Record the shape of API responses to detect changes."""
        shape = self._extract_shape(response)

        if model not in self.response_schemas:
            self.response_schemas[model] = shape
            return {"status": "baseline_recorded"}

        expected = self.response_schemas[model]
        new_fields = shape - expected
        missing_fields = expected - shape

        if new_fields or missing_fields:
            return {
                "status": "schema_change_detected",
                "new_fields": list(new_fields),
                "missing_fields": list(missing_fields),
                "recommendation": (
                    "API response schema has changed. Review Anthropic "
                    "changelog and update your response parsing code."
                ),
            }

        return {"status": "no_changes"}

    def _extract_shape(self, response) -> set:
        """Extract the structural shape of a response."""
        fields = set()
        for attr in dir(response):
            if not attr.startswith('_'):
                fields.add(attr)
        return fields

Unified Diagnostics Runner

class DiagnosticsRunner:
    """Run all diagnostics and produce a health report."""

    def __init__(self):
        self.rate_limit = RateLimitDiagnostics()
        self.timeout = TimeoutDiagnostics()
        self.quality = QualityDiagnostics()
        self.context = ContextOverflowDiagnostics()
        self.api_change = APIChangeDetector()

    def full_health_check(self, recent_metrics: list) -> dict:
        """Run a comprehensive health check."""
        issues = []

        # Check error patterns
        errors_by_type = defaultdict(int)
        for m in recent_metrics:
            if m.get("status") == "error":
                errors_by_type[m.get("error_type", "unknown")] += 1

        for error_type, count in errors_by_type.items():
            if count > 5:
                issues.append({
                    "type": error_type,
                    "count": count,
                    "severity": "critical" if count > 20 else "warning",
                })

        # Check quality trend
        quality_trend = self.quality.detect_quality_trend()
        if quality_trend.get("trend") == "declining":
            issues.append({
                "type": "quality_degradation",
                "details": quality_trend,
                "severity": "warning",
            })

        return {
            "timestamp": datetime.now().isoformat(),
            "total_requests": len(recent_metrics),
            "issues": issues,
            "healthy": len(issues) == 0,
        }

Understanding failure modes is defensive knowledge. In the next and final lesson, you will learn how to migrate from Opus 4.5 to 4.6 without downtime — feature flags, A/B testing, and rollback procedures.