教程目录

TUTORIAL_SUMMARY

architecture-overview

初级

未知
未知作者
更新于 2025-06-14

OpenHands架构概览

项目简介

OpenHands(原OpenDevin)是一个开源的AI软件工程师项目,旨在创建能够执行复杂软件开发任务的AI Agent。该项目展示了如何构建大规模、生产级的LLM应用系统。

核心架构组件

1. Agent架构设计

OpenHands采用了模块化的Agent架构,支持多种Agent类型:

PYTHON
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
 
class ActionType(Enum):
    """动作类型"""
    RUN = "run"
    WRITE = "write"
    READ = "read"
    BROWSE = "browse"
    THINK = "think"
    FINISH = "finish"
 
@dataclass
class Action:
    """Agent动作"""
    type: ActionType
    content: str
    args: Dict[str, Any] = None
    timestamp: float = None
 
@dataclass
class Observation:
    """环境观察"""
    type: str
    content: str
    success: bool = True
    metadata: Dict[str, Any] = None
 
class BaseAgent(ABC):
    """基础Agent类"""
    
    def __init__(self, name: str, llm_config: Dict[str, Any]):
        self.name = name
        self.llm_config = llm_config
        self.action_history = []
        self.observation_history = []
        self.state = "idle"
    
    @abstractmethod
    async def step(self, observation: Observation) -> Action:
        """执行一步推理"""
        pass
    
    @abstractmethod
    def reset(self):
        """重置Agent状态"""
        pass
    
    def add_action(self, action: Action):
        """添加动作到历史"""
        self.action_history.append(action)
    
    def add_observation(self, observation: Observation):
        """添加观察到历史"""
        self.observation_history.append(observation)
    
    def get_context(self, max_history: int = 10) -> str:
        """获取上下文信息"""
        context_parts = []
        
        # 获取最近的动作和观察
        recent_items = []
        for i in range(min(len(self.action_history), max_history)):
            action = self.action_history[-(i+1)]
            observation = self.observation_history[-(i+1)] if i < len(self.observation_history) else None
            
            recent_items.append((action, observation))
        
        recent_items.reverse()
        
        for action, observation in recent_items:
            context_parts.append(f"Action: {action.type.value} - {action.content}")
            if observation:
                context_parts.append(f"Observation: {observation.content}")
        
        return "\n".join(context_parts)
 
class CodeActAgent(BaseAgent):
    """代码执行Agent"""
    
    def __init__(self, name: str, llm_config: Dict[str, Any]):
        super().__init__(name, llm_config)
        self.system_prompt = """
你是一个AI软件工程师,能够执行各种软件开发任务。
你可以:
1. 运行shell命令
2. 读写文件
3. 浏览网页
4. 思考和规划
 
请根据用户的需求,逐步完成任务。
"""
    
    async def step(self, observation: Observation) -> Action:
        """执行推理步骤"""
        # 构建提示
        context = self.get_context()
        
        prompt = f"""
{self.system_prompt}
 
当前任务上下文:
{context}
 
最新观察:
{observation.content}
 
请选择下一步动作。可用动作类型:
- run: 执行shell命令
- write: 写入文件
- read: 读取文件
- browse: 浏览网页
- think: 思考和规划
- finish: 完成任务
 
请以JSON格式返回动作:
{{"type": "动作类型", "content": "动作内容", "args": {{"key": "value"}}}}
"""
        
        # 调用LLM
        response = await self._call_llm(prompt)
        
        # 解析响应
        action = self._parse_action(response)
        
        # 添加到历史
        self.add_action(action)
        self.add_observation(observation)
        
        return action
    
    async def _call_llm(self, prompt: str) -> str:
        """调用LLM"""
        # 这里应该调用实际的LLM API
        # 简化实现
        return '{"type": "think", "content": "正在分析任务..."}'
    
    def _parse_action(self, response: str) -> Action:
        """解析LLM响应为动作"""
        try:
            import json
            data = json.loads(response)
            
            action_type = ActionType(data["type"])
            content = data["content"]
            args = data.get("args", {})
            
            return Action(
                type=action_type,
                content=content,
                args=args,
                timestamp=time.time()
            )
        except Exception as e:
            # 解析失败,返回思考动作
            return Action(
                type=ActionType.THINK,
                content=f"解析错误: {str(e)}",
                timestamp=time.time()
            )
    
    def reset(self):
        """重置Agent"""
        self.action_history = []
        self.observation_history = []
        self.state = "idle"
 
# 使用示例
agent = CodeActAgent("codeact_agent", {"model": "gpt-4", "temperature": 0.1})
 
# 模拟执行步骤
observation = Observation(
    type="user_message",
    content="请创建一个Python脚本来计算斐波那契数列"
)
 
# action = await agent.step(observation)
# print(f"Agent动作: {action.type.value} - {action.content}")

2. 事件驱动系统

OpenHands使用事件驱动架构来处理异步操作:

PYTHON
import asyncio
from typing import Callable, Dict, List
from dataclasses import dataclass
from enum import Enum
 
class EventType(Enum):
    """事件类型"""
    AGENT_ACTION = "agent_action"
    ENVIRONMENT_UPDATE = "environment_update"
    USER_MESSAGE = "user_message"
    TASK_COMPLETE = "task_complete"
    ERROR_OCCURRED = "error_occurred"
 
@dataclass
class Event:
    """事件数据结构"""
    type: EventType
    data: Dict[str, Any]
    source: str
    timestamp: float
    id: str = None
 
class EventBus:
    """事件总线"""
    
    def __init__(self):
        self.subscribers: Dict[EventType, List[Callable]] = {}
        self.event_history: List[Event] = []
        self.running = False
    
    def subscribe(self, event_type: EventType, handler: Callable):
        """订阅事件"""
        if event_type not in self.subscribers:
            self.subscribers[event_type] = []
        self.subscribers[event_type].append(handler)
    
    def unsubscribe(self, event_type: EventType, handler: Callable):
        """取消订阅"""
        if event_type in self.subscribers:
            self.subscribers[event_type].remove(handler)
    
    async def publish(self, event: Event):
        """发布事件"""
        # 记录事件
        self.event_history.append(event)
        
        # 通知订阅者
        if event.type in self.subscribers:
            tasks = []
            for handler in self.subscribers[event.type]:
                task = asyncio.create_task(handler(event))
                tasks.append(task)
            
            if tasks:
                await asyncio.gather(*tasks, return_exceptions=True)
    
    def get_event_history(self, event_type: EventType = None, limit: int = 100) -> List[Event]:
        """获取事件历史"""
        if event_type:
            filtered_events = [e for e in self.event_history if e.type == event_type]
        else:
            filtered_events = self.event_history
        
        return filtered_events[-limit:]
 
class AgentController:
    """Agent控制器"""
    
    def __init__(self, agent: BaseAgent, event_bus: EventBus):
        self.agent = agent
        self.event_bus = event_bus
        self.running = False
        
        # 订阅相关事件
        self.event_bus.subscribe(EventType.USER_MESSAGE, self.handle_user_message)
        self.event_bus.subscribe(EventType.ENVIRONMENT_UPDATE, self.handle_environment_update)
    
    async def start(self):
        """启动Agent控制器"""
        self.running = True
        print(f"Agent {self.agent.name} 已启动")
    
    async def stop(self):
        """停止Agent控制器"""
        self.running = False
        print(f"Agent {self.agent.name} 已停止")
    
    async def handle_user_message(self, event: Event):
        """处理用户消息"""
        if not self.running:
            return
        
        message = event.data.get("message", "")
        
        # 创建观察
        observation = Observation(
            type="user_message",
            content=message,
            metadata={"event_id": event.id}
        )
        
        # Agent执行步骤
        action = await self.agent.step(observation)
        
        # 发布Agent动作事件
        action_event = Event(
            type=EventType.AGENT_ACTION,
            data={
                "action": action,
                "agent_name": self.agent.name
            },
            source=f"agent_{self.agent.name}",
            timestamp=time.time(),
            id=f"action_{int(time.time() * 1000000)}"
        )
        
        await self.event_bus.publish(action_event)
    
    async def handle_environment_update(self, event: Event):
        """处理环境更新"""
        if not self.running:
            return
        
        update_data = event.data
        
        # 创建观察
        observation = Observation(
            type="environment_update",
            content=str(update_data),
            success=update_data.get("success", True),
            metadata=update_data
        )
        
        # 如果需要,让Agent响应环境变化
        if update_data.get("requires_response", False):
            action = await self.agent.step(observation)
            
            # 发布响应动作
            action_event = Event(
                type=EventType.AGENT_ACTION,
                data={
                    "action": action,
                    "agent_name": self.agent.name,
                    "triggered_by": event.id
                },
                source=f"agent_{self.agent.name}",
                timestamp=time.time()
            )
            
            await self.event_bus.publish(action_event)
 
# 使用示例
event_bus = EventBus()
agent = CodeActAgent("main_agent", {"model": "gpt-4"})
controller = AgentController(agent, event_bus)
 
async def demo_event_system():
    """演示事件系统"""
    # 启动控制器
    await controller.start()
    
    # 发布用户消息事件
    user_event = Event(
        type=EventType.USER_MESSAGE,
        data={"message": "请帮我创建一个简单的Web服务器"},
        source="user",
        timestamp=time.time(),
        id="msg_001"
    )
    
    await event_bus.publish(user_event)
    
    # 等待处理
    await asyncio.sleep(1)
    
    # 查看事件历史
    history = event_bus.get_event_history()
    print(f"处理了 {len(history)} 个事件")
    
    await controller.stop()
 
# asyncio.run(demo_event_system())

3. 状态管理系统

PYTHON
from typing import Any, Dict, Optional
import json
import threading
 
class StateManager:
    """状态管理器"""
    
    def __init__(self):
        self.state: Dict[str, Any] = {}
        self.lock = threading.RLock()
        self.observers: List[Callable] = []
    
    def get_state(self, key: str = None) -> Any:
        """获取状态"""
        with self.lock:
            if key is None:
                return self.state.copy()
            return self.state.get(key)
    
    def set_state(self, key: str, value: Any):
        """设置状态"""
        with self.lock:
            old_value = self.state.get(key)
            self.state[key] = value
            
            # 通知观察者
            self._notify_observers(key, old_value, value)
    
    def update_state(self, updates: Dict[str, Any]):
        """批量更新状态"""
        with self.lock:
            changes = {}
            for key, value in updates.items():
                old_value = self.state.get(key)
                self.state[key] = value
                changes[key] = {"old": old_value, "new": value}
            
            # 批量通知
            for key, change in changes.items():
                self._notify_observers(key, change["old"], change["new"])
    
    def delete_state(self, key: str) -> bool:
        """删除状态"""
        with self.lock:
            if key in self.state:
                old_value = self.state[key]
                del self.state[key]
                self._notify_observers(key, old_value, None)
                return True
            return False
    
    def add_observer(self, observer: Callable):
        """添加状态观察者"""
        self.observers.append(observer)
    
    def remove_observer(self, observer: Callable):
        """移除状态观察者"""
        if observer in self.observers:
            self.observers.remove(observer)
    
    def _notify_observers(self, key: str, old_value: Any, new_value: Any):
        """通知观察者"""
        for observer in self.observers:
            try:
                observer(key, old_value, new_value)
            except Exception as e:
                print(f"状态观察者错误: {e}")
    
    def save_to_file(self, filepath: str):
        """保存状态到文件"""
        with self.lock:
            with open(filepath, 'w', encoding='utf-8') as f:
                json.dump(self.state, f, ensure_ascii=False, indent=2)
    
    def load_from_file(self, filepath: str):
        """从文件加载状态"""
        try:
            with open(filepath, 'r', encoding='utf-8') as f:
                loaded_state = json.load(f)
            
            with self.lock:
                self.state = loaded_state
                
        except FileNotFoundError:
            print(f"状态文件 {filepath} 不存在")
        except Exception as e:
            print(f"加载状态文件错误: {e}")
 
class SessionManager:
    """会话管理器"""
    
    def __init__(self):
        self.sessions: Dict[str, StateManager] = {}
        self.active_session: Optional[str] = None
    
    def create_session(self, session_id: str) -> StateManager:
        """创建新会话"""
        if session_id in self.sessions:
            raise ValueError(f"会话 {session_id} 已存在")
        
        state_manager = StateManager()
        self.sessions[session_id] = state_manager
        
        # 初始化会话状态
        state_manager.set_state("session_id", session_id)
        state_manager.set_state("created_at", time.time())
        state_manager.set_state("status", "active")
        
        return state_manager
    
    def get_session(self, session_id: str) -> Optional[StateManager]:
        """获取会话"""
        return self.sessions.get(session_id)
    
    def set_active_session(self, session_id: str):
        """设置活跃会话"""
        if session_id in self.sessions:
            self.active_session = session_id
        else:
            raise ValueError(f"会话 {session_id} 不存在")
    
    def get_active_session(self) -> Optional[StateManager]:
        """获取活跃会话"""
        if self.active_session:
            return self.sessions.get(self.active_session)
        return None
    
    def close_session(self, session_id: str):
        """关闭会话"""
        if session_id in self.sessions:
            session = self.sessions[session_id]
            session.set_state("status", "closed")
            session.set_state("closed_at", time.time())
            
            if self.active_session == session_id:
                self.active_session = None
    
    def list_sessions(self) -> List[str]:
        """列出所有会话"""
        return list(self.sessions.keys())
 
# 使用示例
session_manager = SessionManager()
 
# 创建会话
session = session_manager.create_session("session_001")
session_manager.set_active_session("session_001")
 
# 设置会话状态
session.set_state("current_task", "创建Web服务器")
session.set_state("agent_state", "thinking")
session.set_state("progress", 0.1)
 
# 添加状态观察者
def state_observer(key: str, old_value: Any, new_value: Any):
    print(f"状态变化: {key} = {old_value} -> {new_value}")
 
session.add_observer(state_observer)
 
# 更新状态
session.update_state({
    "progress": 0.3,
    "agent_state": "executing",
    "current_action": "writing_code"
})
 
print(f"当前状态: {session.get_state()}")

小结

在本章中,我们分析了OpenHands的核心架构:

  1. Agent架构:模块化的Agent设计,支持多种Agent类型
  2. 事件系统:异步事件驱动架构,实现组件间解耦
  3. 状态管理:集中式状态管理,支持会话和持久化
  4. 控制器模式:Agent控制器负责协调Agent和环境交互

OpenHands的架构设计体现了大型LLM应用的最佳实践,为我们提供了宝贵的参考。

思考题

  1. 为什么OpenHands选择事件驱动架构而不是同步调用?
  2. 如何设计一个支持多Agent协作的控制器?
  3. 状态管理系统如何处理并发访问问题?
  4. 如何扩展Agent架构以支持新的Agent类型?

下一章我们将深入分析OpenHands的Agent实现细节。