Lesson 17 of 30 ~15 min
Course progress
0%

Multi-Step Orchestration

Budujte komplexní agentní workflow s multiple kroky a rozhodovací logikou.

Komplexní úlohy vyžadují orchestraci více kroků. Naučte se navrhovat robustní multi-step agenty.

Orchestration patterns

Pattern 1: Sequential Pipeline

class SequentialAgent:
    def __init__(self, steps: list):
        self.steps = steps
        self.context = {}
    
    async def run(self, initial_input: str):
        current_input = initial_input
        
        for step in self.steps:
            print(f"Executing step: {step.name}")
            result = await step.execute(current_input, self.context)
            self.context[step.name] = result
            current_input = result
        
        return current_input

# Použití
agent = SequentialAgent([
    AnalyzeRequirements(),
    GenerateCode(),
    WriteTests(),
    RunTests(),
    RefactorIfNeeded()
])

result = await agent.run("Build a REST API for user management")

Pattern 2: Decision Tree

class DecisionAgent:
    def __init__(self):
        self.client = Anthropic()
    
    async def run(self, task: str):
        # Krok 1: Analýza a rozhodnutí
        decision = await self.analyze(task)
        
        # Krok 2: Branch based on decision
        if decision == "simple_query":
            return await self.handle_simple(task)
        elif decision == "code_generation":
            return await self.handle_code(task)
        elif decision == "research_needed":
            return await self.handle_research(task)
        else:
            return await self.handle_complex(task)
    
    async def analyze(self, task: str) -> str:
        response = self.client.messages.create(
            model="claude-opus-4-5-20250101",
            max_tokens=100,
            messages=[{
                "role": "user",
                "content": f"""Classify this task:
Task: {task}

Categories:
- simple_query: Can be answered directly
- code_generation: Needs to write code
- research_needed: Needs to gather information first
- complex: Multiple steps required

Return only the category name."""
            }]
        )
        return response.content[0].text.strip()

Pattern 3: Iterative Refinement

class RefinementAgent:
    def __init__(self, max_iterations: int = 5):
        self.max_iterations = max_iterations
        self.client = Anthropic()
    
    async def run(self, task: str, quality_threshold: float = 0.8):
        result = await self.initial_attempt(task)
        
        for i in range(self.max_iterations):
            # Evaluate quality
            score = await self.evaluate(result, task)
            print(f"Iteration {i+1}: Quality score {score}")
            
            if score >= quality_threshold:
                return result
            
            # Improve
            feedback = await self.get_feedback(result, task)
            result = await self.improve(result, feedback)
        
        return result  # Best effort
    
    async def evaluate(self, result: str, task: str) -> float:
        response = self.client.messages.create(
            model="claude-opus-4-5-20250101",
            max_tokens=100,
            messages=[{
                "role": "user",
                "content": f"""Rate this result from 0 to 1:

Task: {task}
Result: {result}

Return only a number between 0 and 1."""
            }]
        )
        return float(response.content[0].text.strip())

State Management

from dataclasses import dataclass, field
from typing import Any
from enum import Enum

class StepStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class WorkflowState:
    task: str
    steps: dict = field(default_factory=dict)
    context: dict = field(default_factory=dict)
    current_step: str = None
    
    def update_step(self, name: str, status: StepStatus, result: Any = None):
        self.steps[name] = {
            "status": status,
            "result": result,
            "timestamp": datetime.now().isoformat()
        }
    
    def get_completed_results(self) -> dict:
        return {
            name: step["result"] 
            for name, step in self.steps.items()
            if step["status"] == StepStatus.COMPLETED
        }
    
    def to_context_string(self) -> str:
        """For injecting into prompts"""
        completed = self.get_completed_results()
        return "\n".join([
            f"## {name}\n{result}" 
            for name, result in completed.items()
        ])

Komplexní orchestration příklad

class CodeProjectAgent:
    """Agent pro vytvoření kompletního projektu"""
    
    def __init__(self):
        self.client = Anthropic()
        self.tools = self.setup_tools()
    
    async def create_project(self, requirements: str):
        state = WorkflowState(task=requirements)
        
        # Step 1: Analyze requirements
        state.current_step = "requirements"
        analysis = await self.analyze_requirements(requirements)
        state.update_step("requirements", StepStatus.COMPLETED, analysis)
        
        # Step 2: Design architecture
        state.current_step = "architecture"
        architecture = await self.design_architecture(analysis)
        state.update_step("architecture", StepStatus.COMPLETED, architecture)
        
        # Step 3: Generate code (parallel for each component)
        state.current_step = "code_generation"
        components = architecture["components"]
        code_tasks = [
            self.generate_component(comp, state.to_context_string())
            for comp in components
        ]
        code_results = await asyncio.gather(*code_tasks)
        state.update_step("code_generation", StepStatus.COMPLETED, code_results)
        
        # Step 4: Write tests
        state.current_step = "tests"
        tests = await self.generate_tests(code_results)
        state.update_step("tests", StepStatus.COMPLETED, tests)
        
        # Step 5: Run tests and fix if needed
        state.current_step = "validation"
        for attempt in range(3):
            test_result = await self.run_tests()
            if test_result["passed"]:
                break
            # Fix failures
            fixes = await self.fix_failures(test_result["failures"], state)
            await self.apply_fixes(fixes)
        
        state.update_step("validation", StepStatus.COMPLETED, test_result)
        
        return state

Checkpointing

Pro dlouhé workflow je důležité ukládat průběh:

import json
from pathlib import Path

class CheckpointedAgent:
    def __init__(self, checkpoint_dir: str = ".checkpoints"):
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(exist_ok=True)
    
    def save_checkpoint(self, state: WorkflowState, name: str):
        path = self.checkpoint_dir / f"{name}.json"
        with open(path, 'w') as f:
            json.dump(state.__dict__, f, default=str)
    
    def load_checkpoint(self, name: str) -> WorkflowState:
        path = self.checkpoint_dir / f"{name}.json"
        if path.exists():
            with open(path) as f:
                data = json.load(f)
                return WorkflowState(**data)
        return None
    
    async def run_with_checkpoints(self, task: str):
        # Try to resume
        state = self.load_checkpoint(task_hash(task))
        if state:
            print("Resuming from checkpoint...")
        else:
            state = WorkflowState(task=task)
        
        # Continue from last completed step
        # ...

Multi-step orchestration je klíčem k budování skutečně užitečných AI agentů.