lessons-learned
初级
未知
未知作者
更新于 2025-06-14
OpenHands项目经验总结
架构设计经验
通过分析OpenHands项目,我们可以总结出构建大型LLM应用的重要经验和最佳实践。
核心设计原则
1. 模块化与可扩展性
OpenHands采用高度模块化的设计,每个组件都有明确的职责:
PYTHON
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional
class Component(ABC):
"""组件基类"""
def __init__(self, name: str, config: Dict[str, Any]):
self.name = name
self.config = config
self.dependencies: List[str] = []
self.status = "initialized"
@abstractmethod
async def start(self):
"""启动组件"""
pass
@abstractmethod
async def stop(self):
"""停止组件"""
pass
@abstractmethod
def health_check(self) -> bool:
"""健康检查"""
pass
class PluginManager:
"""插件管理器"""
def __init__(self):
self.plugins: Dict[str, Component] = {}
self.plugin_registry: Dict[str, type] = {}
def register_plugin(self, name: str, plugin_class: type):
"""注册插件类"""
self.plugin_registry[name] = plugin_class
def load_plugin(self, name: str, config: Dict[str, Any]) -> Component:
"""加载插件实例"""
if name not in self.plugin_registry:
raise ValueError(f"未注册的插件: {name}")
plugin_class = self.plugin_registry[name]
plugin_instance = plugin_class(name, config)
self.plugins[name] = plugin_instance
return plugin_instance
async def start_all(self):
"""启动所有插件"""
# 按依赖顺序启动
started = set()
while len(started) < len(self.plugins):
for name, plugin in self.plugins.items():
if name in started:
continue
# 检查依赖是否已启动
deps_ready = all(dep in started for dep in plugin.dependencies)
if deps_ready:
await plugin.start()
started.add(name)
print(f"插件 {name} 已启动")
async def stop_all(self):
"""停止所有插件"""
for name, plugin in self.plugins.items():
await plugin.stop()
print(f"插件 {name} 已停止")
def get_plugin(self, name: str) -> Optional[Component]:
"""获取插件实例"""
return self.plugins.get(name)
# 示例插件实现
class LLMPlugin(Component):
"""LLM插件"""
def __init__(self, name: str, config: Dict[str, Any]):
super().__init__(name, config)
self.model_name = config.get("model", "gpt-3.5-turbo")
self.client = None
async def start(self):
"""启动LLM服务"""
# 初始化LLM客户端
self.client = "LLM_CLIENT_INSTANCE" # 简化
self.status = "running"
print(f"LLM插件已启动,模型: {self.model_name}")
async def stop(self):
"""停止LLM服务"""
self.client = None
self.status = "stopped"
def health_check(self) -> bool:
"""健康检查"""
return self.client is not None and self.status == "running"
async def generate(self, prompt: str) -> str:
"""生成文本"""
if not self.health_check():
raise RuntimeError("LLM服务未就绪")
# 模拟生成
return f"Generated response for: {prompt[:50]}..."
class SandboxPlugin(Component):
"""沙箱插件"""
def __init__(self, name: str, config: Dict[str, Any]):
super().__init__(name, config)
self.container_limit = config.get("container_limit", 10)
self.active_containers = 0
async def start(self):
"""启动沙箱服务"""
self.status = "running"
print(f"沙箱插件已启动,容器限制: {self.container_limit}")
async def stop(self):
"""停止沙箱服务"""
self.status = "stopped"
def health_check(self) -> bool:
"""健康检查"""
return self.status == "running" and self.active_containers < self.container_limit
async def create_container(self) -> str:
"""创建容器"""
if self.active_containers >= self.container_limit:
raise RuntimeError("容器数量已达上限")
self.active_containers += 1
container_id = f"container_{self.active_containers}"
return container_id
async def destroy_container(self, container_id: str):
"""销毁容器"""
self.active_containers = max(0, self.active_containers - 1)
# 使用示例
plugin_manager = PluginManager()
# 注册插件
plugin_manager.register_plugin("llm", LLMPlugin)
plugin_manager.register_plugin("sandbox", SandboxPlugin)
# 加载插件
llm_plugin = plugin_manager.load_plugin("llm", {"model": "gpt-4"})
sandbox_plugin = plugin_manager.load_plugin("sandbox", {"container_limit": 5})
# 启动所有插件
# await plugin_manager.start_all()
2. 错误处理与恢复
OpenHands实现了完善的错误处理和恢复机制:
PYTHON
import traceback
from enum import Enum
from typing import Optional, Callable, Any
import asyncio
class ErrorSeverity(Enum):
"""错误严重程度"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class ErrorHandler:
"""错误处理器"""
def __init__(self):
self.error_handlers: Dict[type, Callable] = {}
self.recovery_strategies: Dict[str, Callable] = {}
self.error_history: List[Dict] = []
def register_handler(self, exception_type: type, handler: Callable):
"""注册错误处理器"""
self.error_handlers[exception_type] = handler
def register_recovery_strategy(self, error_type: str, strategy: Callable):
"""注册恢复策略"""
self.recovery_strategies[error_type] = strategy
async def handle_error(self, error: Exception, context: Dict[str, Any] = None) -> bool:
"""处理错误"""
error_info = {
"type": type(error).__name__,
"message": str(error),
"traceback": traceback.format_exc(),
"context": context or {},
"timestamp": time.time(),
"severity": self._assess_severity(error)
}
self.error_history.append(error_info)
# 查找特定处理器
handler = self.error_handlers.get(type(error))
if handler:
try:
return await handler(error, error_info)
except Exception as e:
print(f"错误处理器失败: {e}")
# 尝试通用恢复策略
error_type = type(error).__name__
if error_type in self.recovery_strategies:
try:
strategy = self.recovery_strategies[error_type]
return await strategy(error, error_info)
except Exception as e:
print(f"恢复策略失败: {e}")
# 默认处理
return await self._default_error_handling(error, error_info)
def _assess_severity(self, error: Exception) -> ErrorSeverity:
"""评估错误严重程度"""
if isinstance(error, (SystemExit, KeyboardInterrupt)):
return ErrorSeverity.CRITICAL
elif isinstance(error, (MemoryError, OSError)):
return ErrorSeverity.HIGH
elif isinstance(error, (ValueError, TypeError)):
return ErrorSeverity.MEDIUM
else:
return ErrorSeverity.LOW
async def _default_error_handling(self, error: Exception, error_info: Dict) -> bool:
"""默认错误处理"""
severity = error_info["severity"]
if severity == ErrorSeverity.CRITICAL:
print(f"严重错误,系统即将关闭: {error}")
return False
elif severity == ErrorSeverity.HIGH:
print(f"高级错误,尝试重启组件: {error}")
# 实现组件重启逻辑
return True
else:
print(f"一般错误,继续运行: {error}")
return True
def get_error_statistics(self) -> Dict[str, Any]:
"""获取错误统计"""
if not self.error_history:
return {"total_errors": 0}
error_counts = {}
severity_counts = {}
for error in self.error_history:
error_type = error["type"]
severity = error["severity"].value
error_counts[error_type] = error_counts.get(error_type, 0) + 1
severity_counts[severity] = severity_counts.get(severity, 0) + 1
return {
"total_errors": len(self.error_history),
"error_types": error_counts,
"severity_distribution": severity_counts,
"recent_errors": self.error_history[-10:]
}
class ResilientAgent:
"""具有错误恢复能力的Agent"""
def __init__(self, name: str, error_handler: ErrorHandler):
self.name = name
self.error_handler = error_handler
self.retry_count = 0
self.max_retries = 3
self.state = "idle"
async def execute_with_recovery(self, task: Callable, *args, **kwargs) -> Any:
"""带恢复机制的执行"""
for attempt in range(self.max_retries + 1):
try:
self.retry_count = attempt
result = await task(*args, **kwargs)
self.retry_count = 0 # 重置重试计数
return result
except Exception as e:
context = {
"agent_name": self.name,
"attempt": attempt + 1,
"max_retries": self.max_retries,
"task_name": task.__name__ if hasattr(task, '__name__') else str(task)
}
# 处理错误
should_continue = await self.error_handler.handle_error(e, context)
if not should_continue or attempt >= self.max_retries:
raise e
# 等待后重试
wait_time = 2 ** attempt # 指数退避
await asyncio.sleep(wait_time)
raise RuntimeError(f"任务执行失败,已重试 {self.max_retries} 次")
# 使用示例
error_handler = ErrorHandler()
# 注册错误处理器
async def handle_connection_error(error: Exception, error_info: Dict) -> bool:
"""处理连接错误"""
print(f"处理连接错误: {error}")
# 实现重连逻辑
await asyncio.sleep(1)
return True
error_handler.register_handler(ConnectionError, handle_connection_error)
# 创建具有恢复能力的Agent
resilient_agent = ResilientAgent("test_agent", error_handler)
async def risky_task():
"""可能失败的任务"""
import random
if random.random() < 0.7: # 70%失败率
raise ConnectionError("网络连接失败")
return "任务成功完成"
# 执行任务
# result = await resilient_agent.execute_with_recovery(risky_task)
# print(f"任务结果: {result}")
3. 监控与可观测性
PYTHON
import time
from typing import Dict, List, Any
from dataclasses import dataclass
from collections import defaultdict, deque
@dataclass
class Metric:
"""指标数据"""
name: str
value: float
timestamp: float
tags: Dict[str, str] = None
class MetricsCollector:
"""指标收集器"""
def __init__(self, max_history: int = 1000):
self.metrics: Dict[str, deque] = defaultdict(lambda: deque(maxlen=max_history))
self.counters: Dict[str, float] = defaultdict(float)
self.gauges: Dict[str, float] = {}
def increment_counter(self, name: str, value: float = 1.0, tags: Dict[str, str] = None):
"""增加计数器"""
self.counters[name] += value
metric = Metric(
name=name,
value=self.counters[name],
timestamp=time.time(),
tags=tags
)
self.metrics[name].append(metric)
def set_gauge(self, name: str, value: float, tags: Dict[str, str] = None):
"""设置仪表盘值"""
self.gauges[name] = value
metric = Metric(
name=name,
value=value,
timestamp=time.time(),
tags=tags
)
self.metrics[name].append(metric)
def record_histogram(self, name: str, value: float, tags: Dict[str, str] = None):
"""记录直方图值"""
metric = Metric(
name=name,
value=value,
timestamp=time.time(),
tags=tags
)
self.metrics[name].append(metric)
def get_metric_summary(self, name: str, window_seconds: int = 300) -> Dict[str, Any]:
"""获取指标摘要"""
if name not in self.metrics:
return {}
current_time = time.time()
cutoff_time = current_time - window_seconds
# 过滤时间窗口内的数据
recent_metrics = [
m for m in self.metrics[name]
if m.timestamp >= cutoff_time
]
if not recent_metrics:
return {}
values = [m.value for m in recent_metrics]
return {
"count": len(values),
"min": min(values),
"max": max(values),
"avg": sum(values) / len(values),
"latest": values[-1],
"window_seconds": window_seconds
}
def get_all_metrics(self) -> Dict[str, Any]:
"""获取所有指标"""
return {
"counters": dict(self.counters),
"gauges": dict(self.gauges),
"metric_names": list(self.metrics.keys())
}
class PerformanceMonitor:
"""性能监控器"""
def __init__(self, metrics_collector: MetricsCollector):
self.metrics = metrics_collector
self.active_operations: Dict[str, float] = {}
def start_operation(self, operation_name: str) -> str:
"""开始操作计时"""
operation_id = f"{operation_name}_{int(time.time() * 1000000)}"
self.active_operations[operation_id] = time.time()
return operation_id
def end_operation(self, operation_id: str, operation_name: str = None):
"""结束操作计时"""
if operation_id not in self.active_operations:
return
start_time = self.active_operations.pop(operation_id)
duration = time.time() - start_time
# 提取操作名称
if operation_name is None:
operation_name = operation_id.split('_')[0]
# 记录指标
self.metrics.record_histogram(f"{operation_name}_duration", duration)
self.metrics.increment_counter(f"{operation_name}_count")
def monitor_function(self, func_name: str = None):
"""函数监控装饰器"""
def decorator(func):
nonlocal func_name
if func_name is None:
func_name = func.__name__
async def async_wrapper(*args, **kwargs):
operation_id = self.start_operation(func_name)
try:
result = await func(*args, **kwargs)
self.metrics.increment_counter(f"{func_name}_success")
return result
except Exception as e:
self.metrics.increment_counter(f"{func_name}_error")
raise
finally:
self.end_operation(operation_id, func_name)
def sync_wrapper(*args, **kwargs):
operation_id = self.start_operation(func_name)
try:
result = func(*args, **kwargs)
self.metrics.increment_counter(f"{func_name}_success")
return result
except Exception as e:
self.metrics.increment_counter(f"{func_name}_error")
raise
finally:
self.end_operation(operation_id, func_name)
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
# 使用示例
metrics_collector = MetricsCollector()
performance_monitor = PerformanceMonitor(metrics_collector)
# 监控函数
@performance_monitor.monitor_function("llm_generation")
async def generate_text(prompt: str) -> str:
"""模拟LLM文本生成"""
await asyncio.sleep(0.5) # 模拟处理时间
# 模拟偶尔的错误
import random
if random.random() < 0.1:
raise Exception("生成失败")
return f"Generated: {prompt}"
# 手动记录指标
metrics_collector.increment_counter("requests_total")
metrics_collector.set_gauge("active_sessions", 5)
metrics_collector.record_histogram("response_time", 0.25)
# 获取指标摘要
summary = metrics_collector.get_metric_summary("llm_generation_duration")
print(f"LLM生成性能: {summary}")
all_metrics = metrics_collector.get_all_metrics()
print(f"所有指标: {all_metrics}")
关键经验总结
1. 架构设计原则
- 单一职责:每个组件只负责一个明确的功能
- 松耦合:组件间通过接口和事件通信,减少直接依赖
- 高内聚:相关功能集中在同一个模块中
- 可扩展性:支持插件化和动态加载
2. 安全性考虑
- 沙箱隔离:所有代码执行都在隔离环境中
- 权限控制:细粒度的权限管理
- 输入验证:严格验证所有外部输入
- 资源限制:防止资源耗尽攻击
3. 性能优化策略
- 异步处理:使用异步编程提高并发性能
- 缓存机制:缓存频繁访问的数据
- 批处理:合并相似请求减少开销
- 资源池化:复用昂贵的资源
4. 可维护性实践
- 清晰的代码结构:遵循一致的编码规范
- 完善的测试:单元测试、集成测试、端到端测试
- 详细的文档:API文档、架构文档、部署文档
- 监控和日志:全面的可观测性
小结
OpenHands项目为我们提供了构建大型LLM应用的宝贵经验:
- 模块化设计:通过插件系统实现高度可扩展的架构
- 错误恢复:完善的错误处理和自动恢复机制
- 性能监控:全面的指标收集和性能分析
- 安全隔离:多层次的安全防护措施
这些经验对于构建生产级的LLM应用具有重要的指导意义。
思考题
- 如何在保证安全性的同时提高系统性能?
- 插件系统如何处理版本兼容性问题?
- 如何设计一个支持热更新的架构?
- 监控系统如何避免对主业务的性能影响?
恭喜你完成了OpenHands架构分析!这为我们提供了构建大型LLM应用的实战经验。