Lesson 25 of 30 ~15 min
Course progress
0%

Error Handling in Agents

Robustní error handling pro agentní systémy - retry strategie, graceful degradation, recovery.

Agentní systémy musí být odolné vůči chybám. Naučte se budovat robustní error handling.

Typy chyb v agentních systémech

1. API Errors      → Rate limits, timeouts, network issues
2. Tool Failures   → Tool returns error, invalid output
3. Logic Errors    → Agent makes wrong decision
4. Context Errors  → Lost context, corrupted state
5. External Errors → Third-party service failures

Retry strategie

Exponential Backoff

import asyncio
from functools import wraps

def with_retry(max_retries=3, base_delay=1, max_delay=60):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_error = None
            
            for attempt in range(max_retries):
                try:
                    return await func(*args, **kwargs)
                except RateLimitError as e:
                    last_error = e
                    delay = min(base_delay * (2 ** attempt), max_delay)
                    print(f"Rate limited. Retrying in {delay}s...")
                    await asyncio.sleep(delay)
                except (TimeoutError, NetworkError) as e:
                    last_error = e
                    delay = base_delay * (attempt + 1)
                    print(f"Network error. Retrying in {delay}s...")
                    await asyncio.sleep(delay)
            
            raise last_error
        return wrapper
    return decorator

# Použití
@with_retry(max_retries=5)
async def call_claude(prompt: str):
    return await client.messages.create(...)

Circuit Breaker

from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class CircuitBreaker:
    failure_threshold: int = 5
    reset_timeout: timedelta = timedelta(minutes=1)
    
    _failures: int = 0
    _last_failure: datetime = None
    _state: str = "closed"  # closed, open, half-open
    
    def record_failure(self):
        self._failures += 1
        self._last_failure = datetime.now()
        
        if self._failures >= self.failure_threshold:
            self._state = "open"
            print("Circuit breaker opened!")
    
    def record_success(self):
        self._failures = 0
        self._state = "closed"
    
    def can_execute(self) -> bool:
        if self._state == "closed":
            return True
        
        if self._state == "open":
            if datetime.now() - self._last_failure > self.reset_timeout:
                self._state = "half-open"
                return True
            return False
        
        return True  # half-open: allow one try

# Použití
breaker = CircuitBreaker()

async def safe_call(prompt: str):
    if not breaker.can_execute():
        raise CircuitOpenError("Service temporarily unavailable")
    
    try:
        result = await call_claude(prompt)
        breaker.record_success()
        return result
    except Exception as e:
        breaker.record_failure()
        raise

Tool Failure Handling

class RobustToolExecutor:
    def __init__(self):
        self.fallbacks = {}
    
    def register_fallback(self, tool_name: str, fallback_fn):
        self.fallbacks[tool_name] = fallback_fn
    
    async def execute(self, tool_name: str, input_data: dict):
        try:
            result = await self.primary_execute(tool_name, input_data)
            return {"success": True, "data": result}
        
        except ToolNotFoundError:
            return {
                "success": False,
                "error": "tool_not_found",
                "suggestion": f"Tool '{tool_name}' is not available"
            }
        
        except ValidationError as e:
            return {
                "success": False,
                "error": "invalid_input",
                "details": str(e),
                "suggestion": "Please check the input parameters"
            }
        
        except Exception as e:
            # Try fallback
            if tool_name in self.fallbacks:
                try:
                    result = await self.fallbacks[tool_name](input_data)
                    return {"success": True, "data": result, "used_fallback": True}
                except:
                    pass
            
            return {
                "success": False,
                "error": "execution_failed",
                "details": str(e)
            }

Graceful Degradation

class DegradableAgent:
    """Agent který může fungovat i s omezenými schopnostmi"""
    
    def __init__(self):
        self.capabilities = {
            "web_search": True,
            "code_execution": True,
            "file_access": True
        }
    
    def disable_capability(self, name: str):
        self.capabilities[name] = False
        print(f"Capability '{name}' disabled")
    
    async def process(self, task: str):
        # Adjust behavior based on available capabilities
        if self.capabilities["web_search"]:
            context = await self.search_web(task)
        else:
            context = "Note: Web search unavailable. Using cached knowledge."
        
        if self.capabilities["code_execution"]:
            can_verify = True
        else:
            can_verify = False
            context += "\nNote: Cannot verify code by running it."
        
        # Continue with degraded capabilities
        return await self.complete_task(task, context, can_verify)

# Při chybě
agent = DegradableAgent()

try:
    result = await agent.search_web(query)
except ServiceUnavailableError:
    agent.disable_capability("web_search")
    # Agent continues with limited functionality

State Recovery

class RecoverableWorkflow:
    def __init__(self, state_file: str):
        self.state_file = state_file
    
    def save_state(self, state: dict):
        with open(self.state_file, 'w') as f:
            json.dump(state, f)
    
    def load_state(self) -> dict:
        if os.path.exists(self.state_file):
            with open(self.state_file) as f:
                return json.load(f)
        return None
    
    async def run(self, task: str):
        # Attempt recovery
        state = self.load_state()
        if state and state.get("task") == task:
            print(f"Recovering from step: {state['last_completed']}")
            start_step = state["last_completed"] + 1
        else:
            state = {"task": task, "last_completed": -1, "results": {}}
            start_step = 0
        
        steps = [self.step1, self.step2, self.step3]
        
        for i, step in enumerate(steps[start_step:], start=start_step):
            try:
                result = await step(state)
                state["results"][f"step_{i}"] = result
                state["last_completed"] = i
                self.save_state(state)  # Checkpoint after each step
            except Exception as e:
                print(f"Step {i} failed: {e}")
                self.save_state(state)  # Save progress
                raise
        
        return state["results"]

Logging pro debugging

import logging
import json
from datetime import datetime

class AgentLogger:
    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
        self.session_id = datetime.now().strftime("%Y%m%d_%H%M%S")
    
    def log_step(self, step: str, input_data: dict, output: any, duration: float):
        self.logger.info(json.dumps({
            "session": self.session_id,
            "step": step,
            "input": input_data,
            "output": str(output)[:500],  # Truncate
            "duration_ms": duration * 1000,
            "timestamp": datetime.now().isoformat()
        }))
    
    def log_error(self, step: str, error: Exception, context: dict):
        self.logger.error(json.dumps({
            "session": self.session_id,
            "step": step,
            "error_type": type(error).__name__,
            "error_message": str(error),
            "context": context,
            "timestamp": datetime.now().isoformat()
        }))

Robustní error handling je to, co odlišuje produkční agenty od prototypů.