Komplexní úlohy vyžadují orchestraci více kroků. Naučte se navrhovat robustní multi-step agenty.
Orchestration patterns
Pattern 1: Sequential Pipeline
class SequentialAgent:
def __init__(self, steps: list):
self.steps = steps
self.context = {}
async def run(self, initial_input: str):
current_input = initial_input
for step in self.steps:
print(f"Executing step: {step.name}")
result = await step.execute(current_input, self.context)
self.context[step.name] = result
current_input = result
return current_input
# Použití
agent = SequentialAgent([
AnalyzeRequirements(),
GenerateCode(),
WriteTests(),
RunTests(),
RefactorIfNeeded()
])
result = await agent.run("Build a REST API for user management")
Pattern 2: Decision Tree
class DecisionAgent:
def __init__(self):
self.client = Anthropic()
async def run(self, task: str):
# Krok 1: Analýza a rozhodnutí
decision = await self.analyze(task)
# Krok 2: Branch based on decision
if decision == "simple_query":
return await self.handle_simple(task)
elif decision == "code_generation":
return await self.handle_code(task)
elif decision == "research_needed":
return await self.handle_research(task)
else:
return await self.handle_complex(task)
async def analyze(self, task: str) -> str:
response = self.client.messages.create(
model="claude-opus-4-5-20250101",
max_tokens=100,
messages=[{
"role": "user",
"content": f"""Classify this task:
Task: {task}
Categories:
- simple_query: Can be answered directly
- code_generation: Needs to write code
- research_needed: Needs to gather information first
- complex: Multiple steps required
Return only the category name."""
}]
)
return response.content[0].text.strip()
Pattern 3: Iterative Refinement
class RefinementAgent:
def __init__(self, max_iterations: int = 5):
self.max_iterations = max_iterations
self.client = Anthropic()
async def run(self, task: str, quality_threshold: float = 0.8):
result = await self.initial_attempt(task)
for i in range(self.max_iterations):
# Evaluate quality
score = await self.evaluate(result, task)
print(f"Iteration {i+1}: Quality score {score}")
if score >= quality_threshold:
return result
# Improve
feedback = await self.get_feedback(result, task)
result = await self.improve(result, feedback)
return result # Best effort
async def evaluate(self, result: str, task: str) -> float:
response = self.client.messages.create(
model="claude-opus-4-5-20250101",
max_tokens=100,
messages=[{
"role": "user",
"content": f"""Rate this result from 0 to 1:
Task: {task}
Result: {result}
Return only a number between 0 and 1."""
}]
)
return float(response.content[0].text.strip())
State Management
from dataclasses import dataclass, field
from typing import Any
from enum import Enum
class StepStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class WorkflowState:
task: str
steps: dict = field(default_factory=dict)
context: dict = field(default_factory=dict)
current_step: str = None
def update_step(self, name: str, status: StepStatus, result: Any = None):
self.steps[name] = {
"status": status,
"result": result,
"timestamp": datetime.now().isoformat()
}
def get_completed_results(self) -> dict:
return {
name: step["result"]
for name, step in self.steps.items()
if step["status"] == StepStatus.COMPLETED
}
def to_context_string(self) -> str:
"""For injecting into prompts"""
completed = self.get_completed_results()
return "\n".join([
f"## {name}\n{result}"
for name, result in completed.items()
])
Komplexní orchestration příklad
class CodeProjectAgent:
"""Agent pro vytvoření kompletního projektu"""
def __init__(self):
self.client = Anthropic()
self.tools = self.setup_tools()
async def create_project(self, requirements: str):
state = WorkflowState(task=requirements)
# Step 1: Analyze requirements
state.current_step = "requirements"
analysis = await self.analyze_requirements(requirements)
state.update_step("requirements", StepStatus.COMPLETED, analysis)
# Step 2: Design architecture
state.current_step = "architecture"
architecture = await self.design_architecture(analysis)
state.update_step("architecture", StepStatus.COMPLETED, architecture)
# Step 3: Generate code (parallel for each component)
state.current_step = "code_generation"
components = architecture["components"]
code_tasks = [
self.generate_component(comp, state.to_context_string())
for comp in components
]
code_results = await asyncio.gather(*code_tasks)
state.update_step("code_generation", StepStatus.COMPLETED, code_results)
# Step 4: Write tests
state.current_step = "tests"
tests = await self.generate_tests(code_results)
state.update_step("tests", StepStatus.COMPLETED, tests)
# Step 5: Run tests and fix if needed
state.current_step = "validation"
for attempt in range(3):
test_result = await self.run_tests()
if test_result["passed"]:
break
# Fix failures
fixes = await self.fix_failures(test_result["failures"], state)
await self.apply_fixes(fixes)
state.update_step("validation", StepStatus.COMPLETED, test_result)
return state
Checkpointing
Pro dlouhé workflow je důležité ukládat průběh:
import json
from pathlib import Path
class CheckpointedAgent:
def __init__(self, checkpoint_dir: str = ".checkpoints"):
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(exist_ok=True)
def save_checkpoint(self, state: WorkflowState, name: str):
path = self.checkpoint_dir / f"{name}.json"
with open(path, 'w') as f:
json.dump(state.__dict__, f, default=str)
def load_checkpoint(self, name: str) -> WorkflowState:
path = self.checkpoint_dir / f"{name}.json"
if path.exists():
with open(path) as f:
data = json.load(f)
return WorkflowState(**data)
return None
async def run_with_checkpoints(self, task: str):
# Try to resume
state = self.load_checkpoint(task_hash(task))
if state:
print("Resuming from checkpoint...")
else:
state = WorkflowState(task=task)
# Continue from last completed step
# ...
Multi-step orchestration je klíčem k budování skutečně užitečných AI agentů.