Lesson 11 of 46 ~25 min
Course progress
0%

Production Architecture Patterns

Implement production-grade patterns for LLM systems — retry with exponential backoff, circuit breaker, fallback to weaker models, and request queuing.

The gap between a working prototype and a production system is measured in error handling. LLM APIs fail — rate limits, timeouts, server errors, degraded quality. This lesson implements the patterns that keep your system running when things go wrong.

Pattern 1: Retry with Exponential Backoff

import time
import random
from anthropic import (
    Anthropic, APIError, RateLimitError, APITimeoutError,
    APIConnectionError
)

class RetryableClient:
    """Anthropic client with configurable retry behavior."""

    RETRYABLE_ERRORS = (
        RateLimitError,
        APITimeoutError,
        APIConnectionError,
    )

    def __init__(self, max_retries: int = 3, base_delay: float = 1.0,
                 max_delay: float = 60.0):
        self.client = Anthropic()
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay

    def messages_create(self, **kwargs):
        """Create a message with automatic retries."""
        last_error = None

        for attempt in range(self.max_retries + 1):
            try:
                return self.client.messages.create(**kwargs)

            except self.RETRYABLE_ERRORS as e:
                last_error = e
                if attempt == self.max_retries:
                    break

                # Exponential backoff with jitter
                delay = min(
                    self.base_delay * (2 ** attempt) + random.uniform(0, 1),
                    self.max_delay
                )

                # Respect Retry-After header if present
                if hasattr(e, 'response') and e.response is not None:
                    retry_after = e.response.headers.get('retry-after')
                    if retry_after:
                        delay = max(delay, float(retry_after))

                print(f"⚠️ Attempt {attempt + 1}/{self.max_retries} failed: "
                      f"{type(e).__name__}. Retrying in {delay:.1f}s...")
                time.sleep(delay)

            except APIError as e:
                # Non-retryable API errors (400, 401, 403, etc.)
                raise

        raise last_error

Pattern 2: Circuit Breaker

Prevent cascading failures by stopping requests when the API is consistently failing:

from datetime import datetime, timedelta
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"       # Normal operation
    OPEN = "open"           # Blocking requests
    HALF_OPEN = "half_open" # Testing if API recovered

class CircuitBreaker:
    """Circuit breaker for LLM API calls."""

    def __init__(self, failure_threshold: int = 5,
                 recovery_timeout: int = 60,
                 success_threshold: int = 3):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout  # seconds
        self.success_threshold = success_threshold

        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = None

    def can_execute(self) -> bool:
        """Check if requests are allowed."""
        if self.state == CircuitState.CLOSED:
            return True

        if self.state == CircuitState.OPEN:
            if self._recovery_timeout_expired():
                self.state = CircuitState.HALF_OPEN
                self.success_count = 0
                return True
            return False

        # HALF_OPEN: allow limited requests to test recovery
        return True

    def record_success(self):
        """Record a successful API call."""
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
                print("✅ Circuit breaker closed — API recovered")
        else:
            self.failure_count = max(0, self.failure_count - 1)

    def record_failure(self):
        """Record a failed API call."""
        self.failure_count += 1
        self.last_failure_time = datetime.now()

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            print(f"🔴 Circuit breaker OPEN — {self.failure_count} "
                  f"consecutive failures")

        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.OPEN
            print("🔴 Circuit breaker re-opened — recovery test failed")

    def _recovery_timeout_expired(self) -> bool:
        if self.last_failure_time is None:
            return True
        return (datetime.now() - self.last_failure_time >
                timedelta(seconds=self.recovery_timeout))

Pattern 3: Fallback to Weaker Models

When the primary model is unavailable, fall back gracefully:

@dataclass
class FallbackConfig:
    primary_model: str = "claude-opus-4-6-20260205"
    fallback_chain: list = None

    def __post_init__(self):
        if self.fallback_chain is None:
            self.fallback_chain = [
                "claude-sonnet-4-5-20241022",
                "claude-haiku-4-5-20241022",
            ]

class FallbackClient:
    """Client that gracefully degrades through model tiers."""

    def __init__(self, config: FallbackConfig = None):
        self.client = Anthropic()
        self.config = config or FallbackConfig()
        self.circuit_breakers = {
            model: CircuitBreaker()
            for model in [self.config.primary_model]
                         + self.config.fallback_chain
        }

    def messages_create(self, **kwargs) -> dict:
        """Create a message with automatic fallback."""
        models = [self.config.primary_model] + self.config.fallback_chain
        errors = []

        for model in models:
            breaker = self.circuit_breakers[model]

            if not breaker.can_execute():
                errors.append({
                    "model": model,
                    "error": "Circuit breaker open"
                })
                continue

            try:
                response = self.client.messages.create(
                    model=model,
                    **{k: v for k, v in kwargs.items() if k != 'model'}
                )

                breaker.record_success()

                is_fallback = model != self.config.primary_model
                if is_fallback:
                    print(f"⚠️ Using fallback model: {model}")

                return {
                    "response": response,
                    "model_used": model,
                    "is_fallback": is_fallback,
                    "errors_before_success": errors,
                }

            except (RateLimitError, APITimeoutError,
                    APIConnectionError) as e:
                breaker.record_failure()
                errors.append({
                    "model": model,
                    "error": f"{type(e).__name__}: {str(e)[:200]}"
                })
                continue

            except APIError:
                raise  # Non-retryable errors should propagate

        raise RuntimeError(
            f"All models failed. Errors: {errors}"
        )

Pattern 4: Request Queue with Priority

import heapq
import threading
from queue import PriorityQueue

@dataclass
class QueuedRequest:
    priority: int          # Lower = higher priority
    request_id: str
    kwargs: dict
    callback: callable
    created_at: float = 0

    def __post_init__(self):
        if not self.created_at:
            self.created_at = time.time()

    def __lt__(self, other):
        return self.priority < other.priority

class RequestQueue:
    """Priority queue for LLM API requests."""

    def __init__(self, max_concurrent: int = 5,
                 rate_limit_rpm: int = 60):
        self.client = FallbackClient()
        self.queue = PriorityQueue()
        self.max_concurrent = max_concurrent
        self.rate_limit_rpm = rate_limit_rpm
        self.active_count = 0
        self.lock = threading.Lock()
        self._running = False

    def enqueue(self, priority: int, request_id: str,
                callback: callable, **kwargs):
        """Add a request to the queue."""
        request = QueuedRequest(
            priority=priority,
            request_id=request_id,
            kwargs=kwargs,
            callback=callback,
        )
        self.queue.put(request)

    def start(self):
        """Start processing the queue."""
        self._running = True
        for _ in range(self.max_concurrent):
            thread = threading.Thread(target=self._worker, daemon=True)
            thread.start()

    def stop(self):
        """Stop processing."""
        self._running = False

    def _worker(self):
        while self._running:
            try:
                request = self.queue.get(timeout=1)
            except Exception:
                continue

            with self.lock:
                self.active_count += 1

            try:
                result = self.client.messages_create(**request.kwargs)
                request.callback(request.request_id, result, None)
            except Exception as e:
                request.callback(request.request_id, None, e)
            finally:
                with self.lock:
                    self.active_count -= 1

            # Rate limiting
            time.sleep(60 / self.rate_limit_rpm)

Combining All Patterns

class ProductionLLMClient:
    """Production-grade LLM client combining all resilience patterns."""

    def __init__(self):
        self.fallback_client = FallbackClient()
        self.retry_client = RetryableClient(max_retries=3)
        self.circuit_breaker = CircuitBreaker(
            failure_threshold=5,
            recovery_timeout=60
        )

    def query(self, prompt: str, system: str = "",
              max_tokens: int = 4096,
              priority: str = "normal") -> dict:
        """Execute a query with full production resilience."""
        # Check circuit breaker
        if not self.circuit_breaker.can_execute():
            # Fall back to weaker model
            return self.fallback_client.messages_create(
                max_tokens=max_tokens,
                system=system,
                messages=[{"role": "user", "content": prompt}],
            )

        try:
            # Try primary model with retries
            response = self.retry_client.messages_create(
                model="claude-opus-4-6-20260205",
                max_tokens=max_tokens,
                system=system,
                messages=[{"role": "user", "content": prompt}],
            )

            self.circuit_breaker.record_success()

            return {
                "response": response,
                "model_used": "claude-opus-4-6-20260205",
                "is_fallback": False,
            }

        except Exception as e:
            self.circuit_breaker.record_failure()

            # Attempt fallback
            return self.fallback_client.messages_create(
                max_tokens=max_tokens,
                system=system,
                messages=[{"role": "user", "content": prompt}],
            )

These patterns are the foundation of reliable AI systems. In the next lesson, you will build the monitoring and observability layer that tells you when these patterns are activating — and why.