architecture-overview
初级
未知
未知作者
更新于 2025-06-14
OpenHands架构概览
项目简介
OpenHands(原OpenDevin)是一个开源的AI软件工程师项目,旨在创建能够执行复杂软件开发任务的AI Agent。该项目展示了如何构建大规模、生产级的LLM应用系统。
核心架构组件
1. Agent架构设计
OpenHands采用了模块化的Agent架构,支持多种Agent类型:
PYTHON
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
class ActionType(Enum):
"""动作类型"""
RUN = "run"
WRITE = "write"
READ = "read"
BROWSE = "browse"
THINK = "think"
FINISH = "finish"
@dataclass
class Action:
"""Agent动作"""
type: ActionType
content: str
args: Dict[str, Any] = None
timestamp: float = None
@dataclass
class Observation:
"""环境观察"""
type: str
content: str
success: bool = True
metadata: Dict[str, Any] = None
class BaseAgent(ABC):
"""基础Agent类"""
def __init__(self, name: str, llm_config: Dict[str, Any]):
self.name = name
self.llm_config = llm_config
self.action_history = []
self.observation_history = []
self.state = "idle"
@abstractmethod
async def step(self, observation: Observation) -> Action:
"""执行一步推理"""
pass
@abstractmethod
def reset(self):
"""重置Agent状态"""
pass
def add_action(self, action: Action):
"""添加动作到历史"""
self.action_history.append(action)
def add_observation(self, observation: Observation):
"""添加观察到历史"""
self.observation_history.append(observation)
def get_context(self, max_history: int = 10) -> str:
"""获取上下文信息"""
context_parts = []
# 获取最近的动作和观察
recent_items = []
for i in range(min(len(self.action_history), max_history)):
action = self.action_history[-(i+1)]
observation = self.observation_history[-(i+1)] if i < len(self.observation_history) else None
recent_items.append((action, observation))
recent_items.reverse()
for action, observation in recent_items:
context_parts.append(f"Action: {action.type.value} - {action.content}")
if observation:
context_parts.append(f"Observation: {observation.content}")
return "\n".join(context_parts)
class CodeActAgent(BaseAgent):
"""代码执行Agent"""
def __init__(self, name: str, llm_config: Dict[str, Any]):
super().__init__(name, llm_config)
self.system_prompt = """
你是一个AI软件工程师,能够执行各种软件开发任务。
你可以:
1. 运行shell命令
2. 读写文件
3. 浏览网页
4. 思考和规划
请根据用户的需求,逐步完成任务。
"""
async def step(self, observation: Observation) -> Action:
"""执行推理步骤"""
# 构建提示
context = self.get_context()
prompt = f"""
{self.system_prompt}
当前任务上下文:
{context}
最新观察:
{observation.content}
请选择下一步动作。可用动作类型:
- run: 执行shell命令
- write: 写入文件
- read: 读取文件
- browse: 浏览网页
- think: 思考和规划
- finish: 完成任务
请以JSON格式返回动作:
{{"type": "动作类型", "content": "动作内容", "args": {{"key": "value"}}}}
"""
# 调用LLM
response = await self._call_llm(prompt)
# 解析响应
action = self._parse_action(response)
# 添加到历史
self.add_action(action)
self.add_observation(observation)
return action
async def _call_llm(self, prompt: str) -> str:
"""调用LLM"""
# 这里应该调用实际的LLM API
# 简化实现
return '{"type": "think", "content": "正在分析任务..."}'
def _parse_action(self, response: str) -> Action:
"""解析LLM响应为动作"""
try:
import json
data = json.loads(response)
action_type = ActionType(data["type"])
content = data["content"]
args = data.get("args", {})
return Action(
type=action_type,
content=content,
args=args,
timestamp=time.time()
)
except Exception as e:
# 解析失败,返回思考动作
return Action(
type=ActionType.THINK,
content=f"解析错误: {str(e)}",
timestamp=time.time()
)
def reset(self):
"""重置Agent"""
self.action_history = []
self.observation_history = []
self.state = "idle"
# 使用示例
agent = CodeActAgent("codeact_agent", {"model": "gpt-4", "temperature": 0.1})
# 模拟执行步骤
observation = Observation(
type="user_message",
content="请创建一个Python脚本来计算斐波那契数列"
)
# action = await agent.step(observation)
# print(f"Agent动作: {action.type.value} - {action.content}")
2. 事件驱动系统
OpenHands使用事件驱动架构来处理异步操作:
PYTHON
import asyncio
from typing import Callable, Dict, List
from dataclasses import dataclass
from enum import Enum
class EventType(Enum):
"""事件类型"""
AGENT_ACTION = "agent_action"
ENVIRONMENT_UPDATE = "environment_update"
USER_MESSAGE = "user_message"
TASK_COMPLETE = "task_complete"
ERROR_OCCURRED = "error_occurred"
@dataclass
class Event:
"""事件数据结构"""
type: EventType
data: Dict[str, Any]
source: str
timestamp: float
id: str = None
class EventBus:
"""事件总线"""
def __init__(self):
self.subscribers: Dict[EventType, List[Callable]] = {}
self.event_history: List[Event] = []
self.running = False
def subscribe(self, event_type: EventType, handler: Callable):
"""订阅事件"""
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)
def unsubscribe(self, event_type: EventType, handler: Callable):
"""取消订阅"""
if event_type in self.subscribers:
self.subscribers[event_type].remove(handler)
async def publish(self, event: Event):
"""发布事件"""
# 记录事件
self.event_history.append(event)
# 通知订阅者
if event.type in self.subscribers:
tasks = []
for handler in self.subscribers[event.type]:
task = asyncio.create_task(handler(event))
tasks.append(task)
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
def get_event_history(self, event_type: EventType = None, limit: int = 100) -> List[Event]:
"""获取事件历史"""
if event_type:
filtered_events = [e for e in self.event_history if e.type == event_type]
else:
filtered_events = self.event_history
return filtered_events[-limit:]
class AgentController:
"""Agent控制器"""
def __init__(self, agent: BaseAgent, event_bus: EventBus):
self.agent = agent
self.event_bus = event_bus
self.running = False
# 订阅相关事件
self.event_bus.subscribe(EventType.USER_MESSAGE, self.handle_user_message)
self.event_bus.subscribe(EventType.ENVIRONMENT_UPDATE, self.handle_environment_update)
async def start(self):
"""启动Agent控制器"""
self.running = True
print(f"Agent {self.agent.name} 已启动")
async def stop(self):
"""停止Agent控制器"""
self.running = False
print(f"Agent {self.agent.name} 已停止")
async def handle_user_message(self, event: Event):
"""处理用户消息"""
if not self.running:
return
message = event.data.get("message", "")
# 创建观察
observation = Observation(
type="user_message",
content=message,
metadata={"event_id": event.id}
)
# Agent执行步骤
action = await self.agent.step(observation)
# 发布Agent动作事件
action_event = Event(
type=EventType.AGENT_ACTION,
data={
"action": action,
"agent_name": self.agent.name
},
source=f"agent_{self.agent.name}",
timestamp=time.time(),
id=f"action_{int(time.time() * 1000000)}"
)
await self.event_bus.publish(action_event)
async def handle_environment_update(self, event: Event):
"""处理环境更新"""
if not self.running:
return
update_data = event.data
# 创建观察
observation = Observation(
type="environment_update",
content=str(update_data),
success=update_data.get("success", True),
metadata=update_data
)
# 如果需要,让Agent响应环境变化
if update_data.get("requires_response", False):
action = await self.agent.step(observation)
# 发布响应动作
action_event = Event(
type=EventType.AGENT_ACTION,
data={
"action": action,
"agent_name": self.agent.name,
"triggered_by": event.id
},
source=f"agent_{self.agent.name}",
timestamp=time.time()
)
await self.event_bus.publish(action_event)
# 使用示例
event_bus = EventBus()
agent = CodeActAgent("main_agent", {"model": "gpt-4"})
controller = AgentController(agent, event_bus)
async def demo_event_system():
"""演示事件系统"""
# 启动控制器
await controller.start()
# 发布用户消息事件
user_event = Event(
type=EventType.USER_MESSAGE,
data={"message": "请帮我创建一个简单的Web服务器"},
source="user",
timestamp=time.time(),
id="msg_001"
)
await event_bus.publish(user_event)
# 等待处理
await asyncio.sleep(1)
# 查看事件历史
history = event_bus.get_event_history()
print(f"处理了 {len(history)} 个事件")
await controller.stop()
# asyncio.run(demo_event_system())
3. 状态管理系统
PYTHON
from typing import Any, Dict, Optional
import json
import threading
class StateManager:
"""状态管理器"""
def __init__(self):
self.state: Dict[str, Any] = {}
self.lock = threading.RLock()
self.observers: List[Callable] = []
def get_state(self, key: str = None) -> Any:
"""获取状态"""
with self.lock:
if key is None:
return self.state.copy()
return self.state.get(key)
def set_state(self, key: str, value: Any):
"""设置状态"""
with self.lock:
old_value = self.state.get(key)
self.state[key] = value
# 通知观察者
self._notify_observers(key, old_value, value)
def update_state(self, updates: Dict[str, Any]):
"""批量更新状态"""
with self.lock:
changes = {}
for key, value in updates.items():
old_value = self.state.get(key)
self.state[key] = value
changes[key] = {"old": old_value, "new": value}
# 批量通知
for key, change in changes.items():
self._notify_observers(key, change["old"], change["new"])
def delete_state(self, key: str) -> bool:
"""删除状态"""
with self.lock:
if key in self.state:
old_value = self.state[key]
del self.state[key]
self._notify_observers(key, old_value, None)
return True
return False
def add_observer(self, observer: Callable):
"""添加状态观察者"""
self.observers.append(observer)
def remove_observer(self, observer: Callable):
"""移除状态观察者"""
if observer in self.observers:
self.observers.remove(observer)
def _notify_observers(self, key: str, old_value: Any, new_value: Any):
"""通知观察者"""
for observer in self.observers:
try:
observer(key, old_value, new_value)
except Exception as e:
print(f"状态观察者错误: {e}")
def save_to_file(self, filepath: str):
"""保存状态到文件"""
with self.lock:
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(self.state, f, ensure_ascii=False, indent=2)
def load_from_file(self, filepath: str):
"""从文件加载状态"""
try:
with open(filepath, 'r', encoding='utf-8') as f:
loaded_state = json.load(f)
with self.lock:
self.state = loaded_state
except FileNotFoundError:
print(f"状态文件 {filepath} 不存在")
except Exception as e:
print(f"加载状态文件错误: {e}")
class SessionManager:
"""会话管理器"""
def __init__(self):
self.sessions: Dict[str, StateManager] = {}
self.active_session: Optional[str] = None
def create_session(self, session_id: str) -> StateManager:
"""创建新会话"""
if session_id in self.sessions:
raise ValueError(f"会话 {session_id} 已存在")
state_manager = StateManager()
self.sessions[session_id] = state_manager
# 初始化会话状态
state_manager.set_state("session_id", session_id)
state_manager.set_state("created_at", time.time())
state_manager.set_state("status", "active")
return state_manager
def get_session(self, session_id: str) -> Optional[StateManager]:
"""获取会话"""
return self.sessions.get(session_id)
def set_active_session(self, session_id: str):
"""设置活跃会话"""
if session_id in self.sessions:
self.active_session = session_id
else:
raise ValueError(f"会话 {session_id} 不存在")
def get_active_session(self) -> Optional[StateManager]:
"""获取活跃会话"""
if self.active_session:
return self.sessions.get(self.active_session)
return None
def close_session(self, session_id: str):
"""关闭会话"""
if session_id in self.sessions:
session = self.sessions[session_id]
session.set_state("status", "closed")
session.set_state("closed_at", time.time())
if self.active_session == session_id:
self.active_session = None
def list_sessions(self) -> List[str]:
"""列出所有会话"""
return list(self.sessions.keys())
# 使用示例
session_manager = SessionManager()
# 创建会话
session = session_manager.create_session("session_001")
session_manager.set_active_session("session_001")
# 设置会话状态
session.set_state("current_task", "创建Web服务器")
session.set_state("agent_state", "thinking")
session.set_state("progress", 0.1)
# 添加状态观察者
def state_observer(key: str, old_value: Any, new_value: Any):
print(f"状态变化: {key} = {old_value} -> {new_value}")
session.add_observer(state_observer)
# 更新状态
session.update_state({
"progress": 0.3,
"agent_state": "executing",
"current_action": "writing_code"
})
print(f"当前状态: {session.get_state()}")
小结
在本章中,我们分析了OpenHands的核心架构:
- Agent架构:模块化的Agent设计,支持多种Agent类型
- 事件系统:异步事件驱动架构,实现组件间解耦
- 状态管理:集中式状态管理,支持会话和持久化
- 控制器模式:Agent控制器负责协调Agent和环境交互
OpenHands的架构设计体现了大型LLM应用的最佳实践,为我们提供了宝贵的参考。
思考题
- 为什么OpenHands选择事件驱动架构而不是同步调用?
- 如何设计一个支持多Agent协作的控制器?
- 状态管理系统如何处理并发访问问题?
- 如何扩展Agent架构以支持新的Agent类型?
下一章我们将深入分析OpenHands的Agent实现细节。