Agentní systémy musí být odolné vůči chybám. Naučte se budovat robustní error handling.
Typy chyb v agentních systémech
1. API Errors → Rate limits, timeouts, network issues
2. Tool Failures → Tool returns error, invalid output
3. Logic Errors → Agent makes wrong decision
4. Context Errors → Lost context, corrupted state
5. External Errors → Third-party service failures
Retry strategie
Exponential Backoff
import asyncio
from functools import wraps
def with_retry(max_retries=3, base_delay=1, max_delay=60):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_error = None
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except RateLimitError as e:
last_error = e
delay = min(base_delay * (2 ** attempt), max_delay)
print(f"Rate limited. Retrying in {delay}s...")
await asyncio.sleep(delay)
except (TimeoutError, NetworkError) as e:
last_error = e
delay = base_delay * (attempt + 1)
print(f"Network error. Retrying in {delay}s...")
await asyncio.sleep(delay)
raise last_error
return wrapper
return decorator
# Použití
@with_retry(max_retries=5)
async def call_claude(prompt: str):
return await client.messages.create(...)
Circuit Breaker
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class CircuitBreaker:
failure_threshold: int = 5
reset_timeout: timedelta = timedelta(minutes=1)
_failures: int = 0
_last_failure: datetime = None
_state: str = "closed" # closed, open, half-open
def record_failure(self):
self._failures += 1
self._last_failure = datetime.now()
if self._failures >= self.failure_threshold:
self._state = "open"
print("Circuit breaker opened!")
def record_success(self):
self._failures = 0
self._state = "closed"
def can_execute(self) -> bool:
if self._state == "closed":
return True
if self._state == "open":
if datetime.now() - self._last_failure > self.reset_timeout:
self._state = "half-open"
return True
return False
return True # half-open: allow one try
# Použití
breaker = CircuitBreaker()
async def safe_call(prompt: str):
if not breaker.can_execute():
raise CircuitOpenError("Service temporarily unavailable")
try:
result = await call_claude(prompt)
breaker.record_success()
return result
except Exception as e:
breaker.record_failure()
raise
Tool Failure Handling
class RobustToolExecutor:
def __init__(self):
self.fallbacks = {}
def register_fallback(self, tool_name: str, fallback_fn):
self.fallbacks[tool_name] = fallback_fn
async def execute(self, tool_name: str, input_data: dict):
try:
result = await self.primary_execute(tool_name, input_data)
return {"success": True, "data": result}
except ToolNotFoundError:
return {
"success": False,
"error": "tool_not_found",
"suggestion": f"Tool '{tool_name}' is not available"
}
except ValidationError as e:
return {
"success": False,
"error": "invalid_input",
"details": str(e),
"suggestion": "Please check the input parameters"
}
except Exception as e:
# Try fallback
if tool_name in self.fallbacks:
try:
result = await self.fallbacks[tool_name](input_data)
return {"success": True, "data": result, "used_fallback": True}
except:
pass
return {
"success": False,
"error": "execution_failed",
"details": str(e)
}
Graceful Degradation
class DegradableAgent:
"""Agent který může fungovat i s omezenými schopnostmi"""
def __init__(self):
self.capabilities = {
"web_search": True,
"code_execution": True,
"file_access": True
}
def disable_capability(self, name: str):
self.capabilities[name] = False
print(f"Capability '{name}' disabled")
async def process(self, task: str):
# Adjust behavior based on available capabilities
if self.capabilities["web_search"]:
context = await self.search_web(task)
else:
context = "Note: Web search unavailable. Using cached knowledge."
if self.capabilities["code_execution"]:
can_verify = True
else:
can_verify = False
context += "\nNote: Cannot verify code by running it."
# Continue with degraded capabilities
return await self.complete_task(task, context, can_verify)
# Při chybě
agent = DegradableAgent()
try:
result = await agent.search_web(query)
except ServiceUnavailableError:
agent.disable_capability("web_search")
# Agent continues with limited functionality
State Recovery
class RecoverableWorkflow:
def __init__(self, state_file: str):
self.state_file = state_file
def save_state(self, state: dict):
with open(self.state_file, 'w') as f:
json.dump(state, f)
def load_state(self) -> dict:
if os.path.exists(self.state_file):
with open(self.state_file) as f:
return json.load(f)
return None
async def run(self, task: str):
# Attempt recovery
state = self.load_state()
if state and state.get("task") == task:
print(f"Recovering from step: {state['last_completed']}")
start_step = state["last_completed"] + 1
else:
state = {"task": task, "last_completed": -1, "results": {}}
start_step = 0
steps = [self.step1, self.step2, self.step3]
for i, step in enumerate(steps[start_step:], start=start_step):
try:
result = await step(state)
state["results"][f"step_{i}"] = result
state["last_completed"] = i
self.save_state(state) # Checkpoint after each step
except Exception as e:
print(f"Step {i} failed: {e}")
self.save_state(state) # Save progress
raise
return state["results"]
Logging pro debugging
import logging
import json
from datetime import datetime
class AgentLogger:
def __init__(self, name: str):
self.logger = logging.getLogger(name)
self.session_id = datetime.now().strftime("%Y%m%d_%H%M%S")
def log_step(self, step: str, input_data: dict, output: any, duration: float):
self.logger.info(json.dumps({
"session": self.session_id,
"step": step,
"input": input_data,
"output": str(output)[:500], # Truncate
"duration_ms": duration * 1000,
"timestamp": datetime.now().isoformat()
}))
def log_error(self, step: str, error: Exception, context: dict):
self.logger.error(json.dumps({
"session": self.session_id,
"step": step,
"error_type": type(error).__name__,
"error_message": str(error),
"context": context,
"timestamp": datetime.now().isoformat()
}))
Robustní error handling je to, co odlišuje produkční agenty od prototypů.