performance-optimization
初级
未知
未知作者
更新于 2025-06-14
模型评估与性能优化
性能优化概述
LLM应用的性能优化涉及多个维度:推理速度、内存使用、成本控制和质量保证。有效的优化策略能够显著提升用户体验并降低运营成本。
推理性能优化
1. 批处理优化
PYTHON
import asyncio
import time
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from collections import deque
import threading
@dataclass
class BatchRequest:
"""批处理请求"""
id: str
prompt: str
params: Dict[str, Any]
future: asyncio.Future
timestamp: float
class BatchProcessor:
"""批处理器"""
def __init__(self, llm_client, batch_size: int = 8, max_wait_time: float = 0.1):
self.llm_client = llm_client
self.batch_size = batch_size
self.max_wait_time = max_wait_time
self.request_queue = deque()
self.processing = False
self.stats = {
"total_requests": 0,
"total_batches": 0,
"avg_batch_size": 0.0,
"avg_processing_time": 0.0
}
async def process_request(self, prompt: str, params: Dict[str, Any] = None) -> str:
"""处理单个请求"""
request_id = f"req_{int(time.time() * 1000000)}"
future = asyncio.Future()
request = BatchRequest(
id=request_id,
prompt=prompt,
params=params or {},
future=future,
timestamp=time.time()
)
self.request_queue.append(request)
self.stats["total_requests"] += 1
# 启动批处理
if not self.processing:
asyncio.create_task(self._process_batches())
return await future
async def _process_batches(self):
"""处理批次"""
self.processing = True
while self.request_queue:
batch_start_time = time.time()
# 收集批次
batch = []
batch_deadline = time.time() + self.max_wait_time
while (len(batch) < self.batch_size and
self.request_queue and
time.time() < batch_deadline):
batch.append(self.request_queue.popleft())
# 如果队列为空,等待一小段时间看是否有新请求
if not self.request_queue and len(batch) < self.batch_size:
await asyncio.sleep(0.01)
if batch:
await self._process_batch(batch)
# 更新统计
processing_time = time.time() - batch_start_time
self._update_stats(len(batch), processing_time)
self.processing = False
async def _process_batch(self, batch: List[BatchRequest]):
"""处理单个批次"""
try:
# 准备批量输入
prompts = [req.prompt for req in batch]
# 批量调用LLM
responses = await self._batch_generate(prompts)
# 分发结果
for request, response in zip(batch, responses):
if not request.future.done():
request.future.set_result(response)
except Exception as e:
# 处理错误
for request in batch:
if not request.future.done():
request.future.set_exception(e)
async def _batch_generate(self, prompts: List[str]) -> List[str]:
"""批量生成(模拟实现)"""
# 这里应该调用支持批处理的LLM API
# 简化为并发调用
tasks = []
for prompt in prompts:
task = asyncio.create_task(self._single_generate(prompt))
tasks.append(task)
return await asyncio.gather(*tasks)
async def _single_generate(self, prompt: str) -> str:
"""单个生成(模拟)"""
# 模拟LLM调用
await asyncio.sleep(0.1)
return f"Response to: {prompt[:50]}..."
def _update_stats(self, batch_size: int, processing_time: float):
"""更新统计信息"""
self.stats["total_batches"] += 1
# 更新平均批次大小
total_requests = self.stats["total_requests"]
total_batches = self.stats["total_batches"]
self.stats["avg_batch_size"] = total_requests / total_batches
# 更新平均处理时间
current_avg = self.stats["avg_processing_time"]
self.stats["avg_processing_time"] = (
(current_avg * (total_batches - 1) + processing_time) / total_batches
)
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
return self.stats.copy()
# 使用示例
batch_processor = BatchProcessor(llm_client, batch_size=4, max_wait_time=0.05)
async def test_batch_processing():
"""测试批处理"""
# 并发发送多个请求
tasks = []
for i in range(10):
task = batch_processor.process_request(f"请求 {i}: 解释机器学习")
tasks.append(task)
# 等待所有请求完成
responses = await asyncio.gather(*tasks)
print(f"处理了 {len(responses)} 个请求")
print(f"批处理统计: {batch_processor.get_stats()}")
# asyncio.run(test_batch_processing())
2. 缓存系统
PYTHON
import hashlib
import pickle
import redis
from typing import Optional, Any
import json
class LLMCache:
"""LLM缓存系统"""
def __init__(self, cache_type: str = "memory", redis_url: str = None):
self.cache_type = cache_type
if cache_type == "memory":
self.cache = {}
self.max_size = 10000
elif cache_type == "redis":
self.redis_client = redis.from_url(redis_url or "redis://localhost:6379")
else:
raise ValueError(f"不支持的缓存类型: {cache_type}")
self.stats = {
"hits": 0,
"misses": 0,
"total_requests": 0
}
def _generate_key(self, prompt: str, params: Dict[str, Any]) -> str:
"""生成缓存键"""
# 创建包含prompt和参数的字符串
cache_input = {
"prompt": prompt,
"params": sorted(params.items()) if params else []
}
cache_str = json.dumps(cache_input, sort_keys=True, ensure_ascii=False)
# 生成哈希
return hashlib.md5(cache_str.encode()).hexdigest()
def get(self, prompt: str, params: Dict[str, Any] = None) -> Optional[str]:
"""获取缓存"""
self.stats["total_requests"] += 1
key = self._generate_key(prompt, params or {})
try:
if self.cache_type == "memory":
result = self.cache.get(key)
else: # redis
cached_data = self.redis_client.get(key)
result = pickle.loads(cached_data) if cached_data else None
if result is not None:
self.stats["hits"] += 1
return result
else:
self.stats["misses"] += 1
return None
except Exception as e:
print(f"缓存获取错误: {e}")
self.stats["misses"] += 1
return None
def set(self, prompt: str, response: str, params: Dict[str, Any] = None, ttl: int = 3600):
"""设置缓存"""
key = self._generate_key(prompt, params or {})
try:
if self.cache_type == "memory":
# 内存缓存大小限制
if len(self.cache) >= self.max_size:
# 删除最旧的项(简化LRU)
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
self.cache[key] = response
else: # redis
cached_data = pickle.dumps(response)
self.redis_client.setex(key, ttl, cached_data)
except Exception as e:
print(f"缓存设置错误: {e}")
def clear(self):
"""清空缓存"""
if self.cache_type == "memory":
self.cache.clear()
else: # redis
self.redis_client.flushdb()
print("缓存已清空")
def get_hit_rate(self) -> float:
"""获取缓存命中率"""
total = self.stats["total_requests"]
if total == 0:
return 0.0
return self.stats["hits"] / total
def get_stats(self) -> Dict[str, Any]:
"""获取缓存统计"""
stats = self.stats.copy()
stats["hit_rate"] = self.get_hit_rate()
if self.cache_type == "memory":
stats["cache_size"] = len(self.cache)
else:
try:
stats["cache_size"] = self.redis_client.dbsize()
except:
stats["cache_size"] = -1
return stats
class CachedLLMClient:
"""带缓存的LLM客户端"""
def __init__(self, llm_client, cache: LLMCache):
self.llm_client = llm_client
self.cache = cache
async def generate(self, prompt: str, **kwargs) -> str:
"""生成文本(带缓存)"""
# 检查缓存
cached_response = self.cache.get(prompt, kwargs)
if cached_response is not None:
return cached_response
# 调用LLM
response = await self.llm_client.generate(prompt, **kwargs)
# 缓存结果
self.cache.set(prompt, response, kwargs)
return response
def get_cache_stats(self) -> Dict[str, Any]:
"""获取缓存统计"""
return self.cache.get_stats()
# 使用示例
cache = LLMCache(cache_type="memory")
cached_client = CachedLLMClient(llm_client, cache)
async def test_caching():
"""测试缓存"""
prompt = "解释什么是人工智能"
# 第一次调用(缓存未命中)
start_time = time.time()
response1 = await cached_client.generate(prompt)
time1 = time.time() - start_time
# 第二次调用(缓存命中)
start_time = time.time()
response2 = await cached_client.generate(prompt)
time2 = time.time() - start_time
print(f"第一次调用时间: {time1:.3f}s")
print(f"第二次调用时间: {time2:.3f}s")
print(f"响应一致: {response1 == response2}")
print(f"缓存统计: {cached_client.get_cache_stats()}")
# asyncio.run(test_caching())
3. 模型量化
PYTHON
import torch
import torch.quantization as quantization
from transformers import AutoTokenizer, AutoModelForCausalLM
class ModelQuantizer:
"""模型量化器"""
def __init__(self, model_name: str):
self.model_name = model_name
self.tokenizer = None
self.model = None
self.quantized_model = None
def load_model(self):
"""加载原始模型"""
print(f"加载模型: {self.model_name}")
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
self.model = AutoModelForCausalLM.from_pretrained(
self.model_name,
torch_dtype=torch.float32 # 量化需要float32
)
print(f"原始模型大小: {self._get_model_size(self.model):.2f} MB")
def quantize_dynamic(self):
"""动态量化"""
print("执行动态量化...")
self.quantized_model = torch.quantization.quantize_dynamic(
self.model,
{torch.nn.Linear}, # 量化线性层
dtype=torch.qint8
)
quantized_size = self._get_model_size(self.quantized_model)
original_size = self._get_model_size(self.model)
compression_ratio = original_size / quantized_size
print(f"量化后模型大小: {quantized_size:.2f} MB")
print(f"压缩比: {compression_ratio:.2f}x")
def quantize_static(self, calibration_data: List[str]):
"""静态量化"""
print("执行静态量化...")
# 准备量化配置
self.model.eval()
self.model.qconfig = quantization.get_default_qconfig('fbgemm')
# 准备模型
prepared_model = quantization.prepare(self.model)
# 校准
print("校准模型...")
with torch.no_grad():
for text in calibration_data[:100]: # 使用部分数据校准
inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
prepared_model(**inputs)
# 转换为量化模型
self.quantized_model = quantization.convert(prepared_model)
quantized_size = self._get_model_size(self.quantized_model)
original_size = self._get_model_size(self.model)
compression_ratio = original_size / quantized_size
print(f"静态量化后模型大小: {quantized_size:.2f} MB")
print(f"压缩比: {compression_ratio:.2f}x")
def _get_model_size(self, model) -> float:
"""获取模型大小(MB)"""
param_size = 0
buffer_size = 0
for param in model.parameters():
param_size += param.nelement() * param.element_size()
for buffer in model.buffers():
buffer_size += buffer.nelement() * buffer.element_size()
return (param_size + buffer_size) / (1024 * 1024)
def benchmark_inference(self, test_prompts: List[str], num_runs: int = 10) -> Dict[str, float]:
"""基准测试推理性能"""
results = {}
# 测试原始模型
if self.model is not None:
original_time = self._benchmark_model(self.model, test_prompts, num_runs)
results["original_model"] = original_time
# 测试量化模型
if self.quantized_model is not None:
quantized_time = self._benchmark_model(self.quantized_model, test_prompts, num_runs)
results["quantized_model"] = quantized_time
if "original_model" in results:
speedup = results["original_model"] / results["quantized_model"]
results["speedup"] = speedup
return results
def _benchmark_model(self, model, test_prompts: List[str], num_runs: int) -> float:
"""基准测试单个模型"""
model.eval()
total_time = 0.0
with torch.no_grad():
for _ in range(num_runs):
start_time = time.time()
for prompt in test_prompts:
inputs = self.tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512)
outputs = model.generate(**inputs, max_length=100, do_sample=False)
total_time += time.time() - start_time
return total_time / num_runs
def save_quantized_model(self, save_path: str):
"""保存量化模型"""
if self.quantized_model is None:
raise ValueError("没有量化模型可保存")
torch.save(self.quantized_model.state_dict(), save_path)
print(f"量化模型已保存到: {save_path}")
# 使用示例(需要适当的模型和环境)
"""
quantizer = ModelQuantizer("gpt2")
quantizer.load_model()
# 动态量化
quantizer.quantize_dynamic()
# 基准测试
test_prompts = ["Hello world", "What is AI?", "Explain machine learning"]
benchmark_results = quantizer.benchmark_inference(test_prompts)
print(f"基准测试结果: {benchmark_results}")
"""
成本优化策略
1. 智能模型选择
PYTHON
class ModelSelector:
"""智能模型选择器"""
def __init__(self):
self.models = {
"gpt-3.5-turbo": {
"cost_per_1k_tokens": 0.002,
"max_tokens": 4096,
"capabilities": ["general", "coding", "analysis"],
"speed": "fast"
},
"gpt-4": {
"cost_per_1k_tokens": 0.03,
"max_tokens": 8192,
"capabilities": ["general", "coding", "analysis", "complex_reasoning"],
"speed": "medium"
},
"claude-3-haiku": {
"cost_per_1k_tokens": 0.00025,
"max_tokens": 200000,
"capabilities": ["general", "analysis"],
"speed": "very_fast"
}
}
self.usage_stats = {}
def select_model(self, task_type: str, complexity: str, budget_limit: float = None) -> str:
"""选择最适合的模型"""
candidates = []
for model_name, model_info in self.models.items():
# 检查能力匹配
if task_type in model_info["capabilities"]:
# 检查预算限制
if budget_limit is None or model_info["cost_per_1k_tokens"] <= budget_limit:
score = self._calculate_model_score(model_info, complexity)
candidates.append((model_name, score))
if not candidates:
# 如果没有匹配的模型,选择最便宜的
cheapest = min(self.models.items(), key=lambda x: x[1]["cost_per_1k_tokens"])
return cheapest[0]
# 选择得分最高的模型
best_model = max(candidates, key=lambda x: x[1])
return best_model[0]
def _calculate_model_score(self, model_info: Dict, complexity: str) -> float:
"""计算模型得分"""
base_score = 1.0
# 根据复杂度调整得分
if complexity == "simple":
# 简单任务偏好便宜快速的模型
base_score += (1.0 / model_info["cost_per_1k_tokens"]) * 0.1
if model_info["speed"] == "very_fast":
base_score += 0.3
elif model_info["speed"] == "fast":
base_score += 0.2
elif complexity == "complex":
# 复杂任务偏好能力强的模型
base_score += len(model_info["capabilities"]) * 0.2
if "complex_reasoning" in model_info["capabilities"]:
base_score += 0.5
return base_score
def estimate_cost(self, model_name: str, input_tokens: int, output_tokens: int) -> float:
"""估算成本"""
if model_name not in self.models:
return 0.0
total_tokens = input_tokens + output_tokens
cost_per_token = self.models[model_name]["cost_per_1k_tokens"] / 1000
return total_tokens * cost_per_token
def track_usage(self, model_name: str, tokens_used: int, cost: float):
"""跟踪使用情况"""
if model_name not in self.usage_stats:
self.usage_stats[model_name] = {
"total_tokens": 0,
"total_cost": 0.0,
"request_count": 0
}
stats = self.usage_stats[model_name]
stats["total_tokens"] += tokens_used
stats["total_cost"] += cost
stats["request_count"] += 1
def get_usage_report(self) -> Dict[str, Any]:
"""获取使用报告"""
total_cost = sum(stats["total_cost"] for stats in self.usage_stats.values())
total_tokens = sum(stats["total_tokens"] for stats in self.usage_stats.values())
report = {
"total_cost": total_cost,
"total_tokens": total_tokens,
"models": self.usage_stats.copy()
}
# 计算每个模型的占比
for model_name, stats in report["models"].items():
if total_cost > 0:
stats["cost_percentage"] = (stats["total_cost"] / total_cost) * 100
if total_tokens > 0:
stats["token_percentage"] = (stats["total_tokens"] / total_tokens) * 100
return report
# 使用示例
selector = ModelSelector()
# 选择模型
simple_task_model = selector.select_model("general", "simple", budget_limit=0.01)
complex_task_model = selector.select_model("complex_reasoning", "complex")
print(f"简单任务推荐模型: {simple_task_model}")
print(f"复杂任务推荐模型: {complex_task_model}")
# 估算成本
cost = selector.estimate_cost("gpt-3.5-turbo", input_tokens=100, output_tokens=50)
print(f"估算成本: ${cost:.6f}")
# 跟踪使用
selector.track_usage("gpt-3.5-turbo", tokens_used=150, cost=cost)
# 获取使用报告
report = selector.get_usage_report()
print(f"使用报告: {report}")
小结
在本章中,我们学习了:
- 推理优化:批处理、缓存、模型量化等技术
- 内存优化:模型压缩和动态加载策略
- 成本优化:智能模型选择和使用跟踪
- 性能监控:全面的性能指标和优化建议
- 实践技巧:生产环境中的优化最佳实践
性能优化是LLM应用成功部署的关键,需要在质量、速度、成本之间找到最佳平衡点。
思考题
- 如何设计一个自适应的模型选择策略?
- 在什么情况下应该使用模型量化?
- 如何平衡缓存命中率和内存使用?
- 如何设计一个全面的性能监控系统?
恭喜你完成了高级LLM技术阶段的学习!接下来我们将继续完成剩余的教程阶段。