The gap between a working prototype and a production system is measured in error handling. LLM APIs fail — rate limits, timeouts, server errors, degraded quality. This lesson implements the patterns that keep your system running when things go wrong.
Pattern 1: Retry with Exponential Backoff
import time
import random
from anthropic import (
Anthropic, APIError, RateLimitError, APITimeoutError,
APIConnectionError
)
class RetryableClient:
"""Anthropic client with configurable retry behavior."""
RETRYABLE_ERRORS = (
RateLimitError,
APITimeoutError,
APIConnectionError,
)
def __init__(self, max_retries: int = 3, base_delay: float = 1.0,
max_delay: float = 60.0):
self.client = Anthropic()
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
def messages_create(self, **kwargs):
"""Create a message with automatic retries."""
last_error = None
for attempt in range(self.max_retries + 1):
try:
return self.client.messages.create(**kwargs)
except self.RETRYABLE_ERRORS as e:
last_error = e
if attempt == self.max_retries:
break
# Exponential backoff with jitter
delay = min(
self.base_delay * (2 ** attempt) + random.uniform(0, 1),
self.max_delay
)
# Respect Retry-After header if present
if hasattr(e, 'response') and e.response is not None:
retry_after = e.response.headers.get('retry-after')
if retry_after:
delay = max(delay, float(retry_after))
print(f"⚠️ Attempt {attempt + 1}/{self.max_retries} failed: "
f"{type(e).__name__}. Retrying in {delay:.1f}s...")
time.sleep(delay)
except APIError as e:
# Non-retryable API errors (400, 401, 403, etc.)
raise
raise last_error
Pattern 2: Circuit Breaker
Prevent cascading failures by stopping requests when the API is consistently failing:
from datetime import datetime, timedelta
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Blocking requests
HALF_OPEN = "half_open" # Testing if API recovered
class CircuitBreaker:
"""Circuit breaker for LLM API calls."""
def __init__(self, failure_threshold: int = 5,
recovery_timeout: int = 60,
success_threshold: int = 3):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout # seconds
self.success_threshold = success_threshold
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
def can_execute(self) -> bool:
"""Check if requests are allowed."""
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
if self._recovery_timeout_expired():
self.state = CircuitState.HALF_OPEN
self.success_count = 0
return True
return False
# HALF_OPEN: allow limited requests to test recovery
return True
def record_success(self):
"""Record a successful API call."""
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
print("✅ Circuit breaker closed — API recovered")
else:
self.failure_count = max(0, self.failure_count - 1)
def record_failure(self):
"""Record a failed API call."""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"🔴 Circuit breaker OPEN — {self.failure_count} "
f"consecutive failures")
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
print("🔴 Circuit breaker re-opened — recovery test failed")
def _recovery_timeout_expired(self) -> bool:
if self.last_failure_time is None:
return True
return (datetime.now() - self.last_failure_time >
timedelta(seconds=self.recovery_timeout))
Pattern 3: Fallback to Weaker Models
When the primary model is unavailable, fall back gracefully:
@dataclass
class FallbackConfig:
primary_model: str = "claude-opus-4-6-20260205"
fallback_chain: list = None
def __post_init__(self):
if self.fallback_chain is None:
self.fallback_chain = [
"claude-sonnet-4-5-20241022",
"claude-haiku-4-5-20241022",
]
class FallbackClient:
"""Client that gracefully degrades through model tiers."""
def __init__(self, config: FallbackConfig = None):
self.client = Anthropic()
self.config = config or FallbackConfig()
self.circuit_breakers = {
model: CircuitBreaker()
for model in [self.config.primary_model]
+ self.config.fallback_chain
}
def messages_create(self, **kwargs) -> dict:
"""Create a message with automatic fallback."""
models = [self.config.primary_model] + self.config.fallback_chain
errors = []
for model in models:
breaker = self.circuit_breakers[model]
if not breaker.can_execute():
errors.append({
"model": model,
"error": "Circuit breaker open"
})
continue
try:
response = self.client.messages.create(
model=model,
**{k: v for k, v in kwargs.items() if k != 'model'}
)
breaker.record_success()
is_fallback = model != self.config.primary_model
if is_fallback:
print(f"⚠️ Using fallback model: {model}")
return {
"response": response,
"model_used": model,
"is_fallback": is_fallback,
"errors_before_success": errors,
}
except (RateLimitError, APITimeoutError,
APIConnectionError) as e:
breaker.record_failure()
errors.append({
"model": model,
"error": f"{type(e).__name__}: {str(e)[:200]}"
})
continue
except APIError:
raise # Non-retryable errors should propagate
raise RuntimeError(
f"All models failed. Errors: {errors}"
)
Pattern 4: Request Queue with Priority
import heapq
import threading
from queue import PriorityQueue
@dataclass
class QueuedRequest:
priority: int # Lower = higher priority
request_id: str
kwargs: dict
callback: callable
created_at: float = 0
def __post_init__(self):
if not self.created_at:
self.created_at = time.time()
def __lt__(self, other):
return self.priority < other.priority
class RequestQueue:
"""Priority queue for LLM API requests."""
def __init__(self, max_concurrent: int = 5,
rate_limit_rpm: int = 60):
self.client = FallbackClient()
self.queue = PriorityQueue()
self.max_concurrent = max_concurrent
self.rate_limit_rpm = rate_limit_rpm
self.active_count = 0
self.lock = threading.Lock()
self._running = False
def enqueue(self, priority: int, request_id: str,
callback: callable, **kwargs):
"""Add a request to the queue."""
request = QueuedRequest(
priority=priority,
request_id=request_id,
kwargs=kwargs,
callback=callback,
)
self.queue.put(request)
def start(self):
"""Start processing the queue."""
self._running = True
for _ in range(self.max_concurrent):
thread = threading.Thread(target=self._worker, daemon=True)
thread.start()
def stop(self):
"""Stop processing."""
self._running = False
def _worker(self):
while self._running:
try:
request = self.queue.get(timeout=1)
except Exception:
continue
with self.lock:
self.active_count += 1
try:
result = self.client.messages_create(**request.kwargs)
request.callback(request.request_id, result, None)
except Exception as e:
request.callback(request.request_id, None, e)
finally:
with self.lock:
self.active_count -= 1
# Rate limiting
time.sleep(60 / self.rate_limit_rpm)
Combining All Patterns
class ProductionLLMClient:
"""Production-grade LLM client combining all resilience patterns."""
def __init__(self):
self.fallback_client = FallbackClient()
self.retry_client = RetryableClient(max_retries=3)
self.circuit_breaker = CircuitBreaker(
failure_threshold=5,
recovery_timeout=60
)
def query(self, prompt: str, system: str = "",
max_tokens: int = 4096,
priority: str = "normal") -> dict:
"""Execute a query with full production resilience."""
# Check circuit breaker
if not self.circuit_breaker.can_execute():
# Fall back to weaker model
return self.fallback_client.messages_create(
max_tokens=max_tokens,
system=system,
messages=[{"role": "user", "content": prompt}],
)
try:
# Try primary model with retries
response = self.retry_client.messages_create(
model="claude-opus-4-6-20260205",
max_tokens=max_tokens,
system=system,
messages=[{"role": "user", "content": prompt}],
)
self.circuit_breaker.record_success()
return {
"response": response,
"model_used": "claude-opus-4-6-20260205",
"is_fallback": False,
}
except Exception as e:
self.circuit_breaker.record_failure()
# Attempt fallback
return self.fallback_client.messages_create(
max_tokens=max_tokens,
system=system,
messages=[{"role": "user", "content": prompt}],
)
These patterns are the foundation of reliable AI systems. In the next lesson, you will build the monitoring and observability layer that tells you when these patterns are activating — and why.