LLM systems fail in ways that traditional software does not. A REST API either returns a response or an error code. An LLM can return a wrong response with a 200 status code — and that failure is invisible to your monitoring unless you know what to look for.
Failure Mode Taxonomy
LLM Failures
├── Infrastructure Failures (visible)
│ ├── Rate limits (429)
│ ├── Timeouts (504/408)
│ ├── Server errors (500/502/503)
│ └── Authentication errors (401/403)
│
├── Quality Failures (invisible to HTTP)
│ ├── Hallucination
│ ├── Instruction non-compliance
│ ├── Output truncation
│ ├── Sycophantic responses
│ └── Context window overflow
│
└── Cost Failures
├── Thinking token explosion
├── Prompt caching failures
└── Model routing errors
Failure 1: Rate Limits
Diagnosis
class RateLimitDiagnostics:
"""Diagnose and prevent rate limit issues."""
def __init__(self):
self.request_timestamps: list[float] = []
def record_request(self):
self.request_timestamps.append(time.time())
# Keep only last 5 minutes
cutoff = time.time() - 300
self.request_timestamps = [
t for t in self.request_timestamps if t > cutoff
]
def current_rpm(self) -> float:
"""Calculate current requests per minute."""
one_minute_ago = time.time() - 60
recent = [t for t in self.request_timestamps if t > one_minute_ago]
return len(recent)
def diagnose(self, error) -> dict:
"""Diagnose a rate limit error."""
rpm = self.current_rpm()
retry_after = None
if hasattr(error, 'response') and error.response is not None:
retry_after = error.response.headers.get('retry-after')
return {
"cause": "rate_limit",
"current_rpm": rpm,
"retry_after_seconds": retry_after,
"recommendations": [
"Implement request queuing with rate limiting",
f"Current RPM ({rpm}) may exceed tier limit",
"Consider upgrading API tier for higher limits",
"Batch requests where possible to reduce call count",
]
}
Failure 2: Timeouts
import time
class TimeoutDiagnostics:
"""Diagnose timeout issues."""
def diagnose(self, prompt: str, timeout_seconds: float,
model: str) -> dict:
"""Diagnose why a request timed out."""
estimated_input_tokens = len(prompt) // 4
recommendations = []
# Large input → slow processing
if estimated_input_tokens > 100_000:
recommendations.append(
f"Input is very large (~{estimated_input_tokens:,} tokens). "
f"Consider chunking or using RAG instead of full context."
)
# Thinking mode can cause long processing
if "opus" in model.lower():
recommendations.append(
"Opus with adaptive thinking can take 30-120s for complex "
"tasks. Set thinking effort to 'standard' or 'quick' for "
"faster responses."
)
# Timeout too aggressive
if timeout_seconds < 30:
recommendations.append(
f"Timeout of {timeout_seconds}s is aggressive for Opus. "
f"Increase to 120s for standard tasks, 300s for deep thinking."
)
recommendations.append(
"Use streaming to get partial results and avoid full timeouts."
)
return {
"cause": "timeout",
"estimated_input_tokens": estimated_input_tokens,
"timeout_seconds": timeout_seconds,
"model": model,
"recommendations": recommendations,
}
Failure 3: Degraded Output Quality
This is the hardest failure to detect — the model returns a response, but it is wrong, incomplete, or low quality.
class QualityDiagnostics:
"""Detect and diagnose output quality degradation."""
def __init__(self):
from anthropic import Anthropic
self.client = Anthropic()
self.quality_history: list[dict] = []
def assess_quality(self, prompt: str, output: str,
expected_properties: list[str]) -> dict:
"""Assess output quality against expected properties."""
checks = {}
# Length check
if len(output.strip()) < 50:
checks["suspiciously_short"] = True
# Refusal detection
refusal_phrases = [
"I cannot", "I'm unable to", "I don't have access",
"I apologize, but", "I'm not able to"
]
checks["possible_refusal"] = any(
phrase in output for phrase in refusal_phrases
)
# Repetition detection
sentences = output.split('. ')
if len(sentences) > 5:
unique = set(sentences)
checks["repetition_ratio"] = 1 - (len(unique) / len(sentences))
else:
checks["repetition_ratio"] = 0.0
# Truncation detection
checks["possibly_truncated"] = (
not output.rstrip().endswith(('.', '!', '?', '```', '"', ')'))
and len(output) > 100
)
# Expected property check
checks["missing_properties"] = [
prop for prop in expected_properties
if prop.lower() not in output.lower()
]
# Overall quality score
score = 1.0
if checks.get("suspiciously_short"):
score -= 0.3
if checks.get("possible_refusal"):
score -= 0.5
if checks.get("repetition_ratio", 0) > 0.3:
score -= 0.2
if checks.get("possibly_truncated"):
score -= 0.2
if checks.get("missing_properties"):
score -= 0.1 * len(checks["missing_properties"])
checks["quality_score"] = max(0.0, score)
self.quality_history.append(checks)
return checks
def detect_quality_trend(self) -> dict:
"""Detect if quality is trending downward."""
if len(self.quality_history) < 10:
return {"trend": "insufficient_data"}
recent = self.quality_history[-10:]
older = self.quality_history[-20:-10] if len(self.quality_history) >= 20 else []
recent_avg = sum(
r.get("quality_score", 0) for r in recent
) / len(recent)
if older:
older_avg = sum(
r.get("quality_score", 0) for r in older
) / len(older)
delta = recent_avg - older_avg
else:
delta = 0
return {
"trend": "declining" if delta < -0.1 else
"improving" if delta > 0.1 else "stable",
"recent_avg_quality": round(recent_avg, 2),
"quality_delta": round(delta, 2),
}
Failure 4: Context Window Overflow
class ContextOverflowDiagnostics:
"""Diagnose context window overflow issues."""
MODEL_LIMITS = {
"claude-opus-4-6-20260205": 1_000_000,
"claude-sonnet-4-5-20241022": 200_000,
"claude-haiku-4-5-20241022": 200_000,
}
def check_context_budget(self, model: str, messages: list[dict],
system: str = "",
max_output_tokens: int = 4096) -> dict:
"""Check if a request will exceed context limits."""
# Rough token estimation
system_tokens = len(system) // 4
message_tokens = sum(
len(str(m.get("content", ""))) // 4 for m in messages
)
total_input = system_tokens + message_tokens
model_limit = self.MODEL_LIMITS.get(model, 200_000)
available_for_output = model_limit - total_input
will_overflow = available_for_output < max_output_tokens
result = {
"model": model,
"model_limit": model_limit,
"estimated_input_tokens": total_input,
"requested_output_tokens": max_output_tokens,
"available_for_output": max(0, available_for_output),
"will_overflow": will_overflow,
"utilization": total_input / model_limit,
}
if will_overflow:
result["recommendations"] = [
f"Input ({total_input:,}) + output ({max_output_tokens:,}) "
f"exceeds limit ({model_limit:,})",
"Reduce input by summarizing older messages",
"Enable compaction API for automatic management",
"Use RAG to load only relevant context",
]
return result
Failure 5: API Changes
class APIChangeDetector:
"""Detect unexpected API behavior changes."""
def __init__(self):
self.response_schemas: dict[str, set] = {}
def record_response_shape(self, model: str, response):
"""Record the shape of API responses to detect changes."""
shape = self._extract_shape(response)
if model not in self.response_schemas:
self.response_schemas[model] = shape
return {"status": "baseline_recorded"}
expected = self.response_schemas[model]
new_fields = shape - expected
missing_fields = expected - shape
if new_fields or missing_fields:
return {
"status": "schema_change_detected",
"new_fields": list(new_fields),
"missing_fields": list(missing_fields),
"recommendation": (
"API response schema has changed. Review Anthropic "
"changelog and update your response parsing code."
),
}
return {"status": "no_changes"}
def _extract_shape(self, response) -> set:
"""Extract the structural shape of a response."""
fields = set()
for attr in dir(response):
if not attr.startswith('_'):
fields.add(attr)
return fields
Unified Diagnostics Runner
class DiagnosticsRunner:
"""Run all diagnostics and produce a health report."""
def __init__(self):
self.rate_limit = RateLimitDiagnostics()
self.timeout = TimeoutDiagnostics()
self.quality = QualityDiagnostics()
self.context = ContextOverflowDiagnostics()
self.api_change = APIChangeDetector()
def full_health_check(self, recent_metrics: list) -> dict:
"""Run a comprehensive health check."""
issues = []
# Check error patterns
errors_by_type = defaultdict(int)
for m in recent_metrics:
if m.get("status") == "error":
errors_by_type[m.get("error_type", "unknown")] += 1
for error_type, count in errors_by_type.items():
if count > 5:
issues.append({
"type": error_type,
"count": count,
"severity": "critical" if count > 20 else "warning",
})
# Check quality trend
quality_trend = self.quality.detect_quality_trend()
if quality_trend.get("trend") == "declining":
issues.append({
"type": "quality_degradation",
"details": quality_trend,
"severity": "warning",
})
return {
"timestamp": datetime.now().isoformat(),
"total_requests": len(recent_metrics),
"issues": issues,
"healthy": len(issues) == 0,
}
Understanding failure modes is defensive knowledge. In the next and final lesson, you will learn how to migrate from Opus 4.5 to 4.6 without downtime — feature flags, A/B testing, and rollback procedures.