教程目录

TUTORIAL_SUMMARY

performance-optimization

初级

未知
未知作者
更新于 2025-06-14

模型评估与性能优化

性能优化概述

LLM应用的性能优化涉及多个维度:推理速度、内存使用、成本控制和质量保证。有效的优化策略能够显著提升用户体验并降低运营成本。

推理性能优化

1. 批处理优化

PYTHON
import asyncio
import time
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from collections import deque
import threading
 
@dataclass
class BatchRequest:
    """批处理请求"""
    id: str
    prompt: str
    params: Dict[str, Any]
    future: asyncio.Future
    timestamp: float
 
class BatchProcessor:
    """批处理器"""
    
    def __init__(self, llm_client, batch_size: int = 8, max_wait_time: float = 0.1):
        self.llm_client = llm_client
        self.batch_size = batch_size
        self.max_wait_time = max_wait_time
        
        self.request_queue = deque()
        self.processing = False
        self.stats = {
            "total_requests": 0,
            "total_batches": 0,
            "avg_batch_size": 0.0,
            "avg_processing_time": 0.0
        }
    
    async def process_request(self, prompt: str, params: Dict[str, Any] = None) -> str:
        """处理单个请求"""
        request_id = f"req_{int(time.time() * 1000000)}"
        future = asyncio.Future()
        
        request = BatchRequest(
            id=request_id,
            prompt=prompt,
            params=params or {},
            future=future,
            timestamp=time.time()
        )
        
        self.request_queue.append(request)
        self.stats["total_requests"] += 1
        
        # 启动批处理
        if not self.processing:
            asyncio.create_task(self._process_batches())
        
        return await future
    
    async def _process_batches(self):
        """处理批次"""
        self.processing = True
        
        while self.request_queue:
            batch_start_time = time.time()
            
            # 收集批次
            batch = []
            batch_deadline = time.time() + self.max_wait_time
            
            while (len(batch) < self.batch_size and 
                   self.request_queue and 
                   time.time() < batch_deadline):
                
                batch.append(self.request_queue.popleft())
                
                # 如果队列为空,等待一小段时间看是否有新请求
                if not self.request_queue and len(batch) < self.batch_size:
                    await asyncio.sleep(0.01)
            
            if batch:
                await self._process_batch(batch)
                
                # 更新统计
                processing_time = time.time() - batch_start_time
                self._update_stats(len(batch), processing_time)
        
        self.processing = False
    
    async def _process_batch(self, batch: List[BatchRequest]):
        """处理单个批次"""
        try:
            # 准备批量输入
            prompts = [req.prompt for req in batch]
            
            # 批量调用LLM
            responses = await self._batch_generate(prompts)
            
            # 分发结果
            for request, response in zip(batch, responses):
                if not request.future.done():
                    request.future.set_result(response)
                    
        except Exception as e:
            # 处理错误
            for request in batch:
                if not request.future.done():
                    request.future.set_exception(e)
    
    async def _batch_generate(self, prompts: List[str]) -> List[str]:
        """批量生成(模拟实现)"""
        # 这里应该调用支持批处理的LLM API
        # 简化为并发调用
        tasks = []
        for prompt in prompts:
            task = asyncio.create_task(self._single_generate(prompt))
            tasks.append(task)
        
        return await asyncio.gather(*tasks)
    
    async def _single_generate(self, prompt: str) -> str:
        """单个生成(模拟)"""
        # 模拟LLM调用
        await asyncio.sleep(0.1)
        return f"Response to: {prompt[:50]}..."
    
    def _update_stats(self, batch_size: int, processing_time: float):
        """更新统计信息"""
        self.stats["total_batches"] += 1
        
        # 更新平均批次大小
        total_requests = self.stats["total_requests"]
        total_batches = self.stats["total_batches"]
        self.stats["avg_batch_size"] = total_requests / total_batches
        
        # 更新平均处理时间
        current_avg = self.stats["avg_processing_time"]
        self.stats["avg_processing_time"] = (
            (current_avg * (total_batches - 1) + processing_time) / total_batches
        )
    
    def get_stats(self) -> Dict[str, Any]:
        """获取统计信息"""
        return self.stats.copy()
 
# 使用示例
batch_processor = BatchProcessor(llm_client, batch_size=4, max_wait_time=0.05)
 
async def test_batch_processing():
    """测试批处理"""
    # 并发发送多个请求
    tasks = []
    for i in range(10):
        task = batch_processor.process_request(f"请求 {i}: 解释机器学习")
        tasks.append(task)
    
    # 等待所有请求完成
    responses = await asyncio.gather(*tasks)
    
    print(f"处理了 {len(responses)} 个请求")
    print(f"批处理统计: {batch_processor.get_stats()}")
 
# asyncio.run(test_batch_processing())

2. 缓存系统

PYTHON
import hashlib
import pickle
import redis
from typing import Optional, Any
import json
 
class LLMCache:
    """LLM缓存系统"""
    
    def __init__(self, cache_type: str = "memory", redis_url: str = None):
        self.cache_type = cache_type
        
        if cache_type == "memory":
            self.cache = {}
            self.max_size = 10000
        elif cache_type == "redis":
            self.redis_client = redis.from_url(redis_url or "redis://localhost:6379")
        else:
            raise ValueError(f"不支持的缓存类型: {cache_type}")
        
        self.stats = {
            "hits": 0,
            "misses": 0,
            "total_requests": 0
        }
    
    def _generate_key(self, prompt: str, params: Dict[str, Any]) -> str:
        """生成缓存键"""
        # 创建包含prompt和参数的字符串
        cache_input = {
            "prompt": prompt,
            "params": sorted(params.items()) if params else []
        }
        
        cache_str = json.dumps(cache_input, sort_keys=True, ensure_ascii=False)
        
        # 生成哈希
        return hashlib.md5(cache_str.encode()).hexdigest()
    
    def get(self, prompt: str, params: Dict[str, Any] = None) -> Optional[str]:
        """获取缓存"""
        self.stats["total_requests"] += 1
        
        key = self._generate_key(prompt, params or {})
        
        try:
            if self.cache_type == "memory":
                result = self.cache.get(key)
            else:  # redis
                cached_data = self.redis_client.get(key)
                result = pickle.loads(cached_data) if cached_data else None
            
            if result is not None:
                self.stats["hits"] += 1
                return result
            else:
                self.stats["misses"] += 1
                return None
                
        except Exception as e:
            print(f"缓存获取错误: {e}")
            self.stats["misses"] += 1
            return None
    
    def set(self, prompt: str, response: str, params: Dict[str, Any] = None, ttl: int = 3600):
        """设置缓存"""
        key = self._generate_key(prompt, params or {})
        
        try:
            if self.cache_type == "memory":
                # 内存缓存大小限制
                if len(self.cache) >= self.max_size:
                    # 删除最旧的项(简化LRU)
                    oldest_key = next(iter(self.cache))
                    del self.cache[oldest_key]
                
                self.cache[key] = response
                
            else:  # redis
                cached_data = pickle.dumps(response)
                self.redis_client.setex(key, ttl, cached_data)
                
        except Exception as e:
            print(f"缓存设置错误: {e}")
    
    def clear(self):
        """清空缓存"""
        if self.cache_type == "memory":
            self.cache.clear()
        else:  # redis
            self.redis_client.flushdb()
        
        print("缓存已清空")
    
    def get_hit_rate(self) -> float:
        """获取缓存命中率"""
        total = self.stats["total_requests"]
        if total == 0:
            return 0.0
        return self.stats["hits"] / total
    
    def get_stats(self) -> Dict[str, Any]:
        """获取缓存统计"""
        stats = self.stats.copy()
        stats["hit_rate"] = self.get_hit_rate()
        
        if self.cache_type == "memory":
            stats["cache_size"] = len(self.cache)
        else:
            try:
                stats["cache_size"] = self.redis_client.dbsize()
            except:
                stats["cache_size"] = -1
        
        return stats
 
class CachedLLMClient:
    """带缓存的LLM客户端"""
    
    def __init__(self, llm_client, cache: LLMCache):
        self.llm_client = llm_client
        self.cache = cache
    
    async def generate(self, prompt: str, **kwargs) -> str:
        """生成文本(带缓存)"""
        # 检查缓存
        cached_response = self.cache.get(prompt, kwargs)
        if cached_response is not None:
            return cached_response
        
        # 调用LLM
        response = await self.llm_client.generate(prompt, **kwargs)
        
        # 缓存结果
        self.cache.set(prompt, response, kwargs)
        
        return response
    
    def get_cache_stats(self) -> Dict[str, Any]:
        """获取缓存统计"""
        return self.cache.get_stats()
 
# 使用示例
cache = LLMCache(cache_type="memory")
cached_client = CachedLLMClient(llm_client, cache)
 
async def test_caching():
    """测试缓存"""
    prompt = "解释什么是人工智能"
    
    # 第一次调用(缓存未命中)
    start_time = time.time()
    response1 = await cached_client.generate(prompt)
    time1 = time.time() - start_time
    
    # 第二次调用(缓存命中)
    start_time = time.time()
    response2 = await cached_client.generate(prompt)
    time2 = time.time() - start_time
    
    print(f"第一次调用时间: {time1:.3f}s")
    print(f"第二次调用时间: {time2:.3f}s")
    print(f"响应一致: {response1 == response2}")
    print(f"缓存统计: {cached_client.get_cache_stats()}")
 
# asyncio.run(test_caching())

3. 模型量化

PYTHON
import torch
import torch.quantization as quantization
from transformers import AutoTokenizer, AutoModelForCausalLM
 
class ModelQuantizer:
    """模型量化器"""
    
    def __init__(self, model_name: str):
        self.model_name = model_name
        self.tokenizer = None
        self.model = None
        self.quantized_model = None
    
    def load_model(self):
        """加载原始模型"""
        print(f"加载模型: {self.model_name}")
        
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        self.model = AutoModelForCausalLM.from_pretrained(
            self.model_name,
            torch_dtype=torch.float32  # 量化需要float32
        )
        
        print(f"原始模型大小: {self._get_model_size(self.model):.2f} MB")
    
    def quantize_dynamic(self):
        """动态量化"""
        print("执行动态量化...")
        
        self.quantized_model = torch.quantization.quantize_dynamic(
            self.model,
            {torch.nn.Linear},  # 量化线性层
            dtype=torch.qint8
        )
        
        quantized_size = self._get_model_size(self.quantized_model)
        original_size = self._get_model_size(self.model)
        compression_ratio = original_size / quantized_size
        
        print(f"量化后模型大小: {quantized_size:.2f} MB")
        print(f"压缩比: {compression_ratio:.2f}x")
    
    def quantize_static(self, calibration_data: List[str]):
        """静态量化"""
        print("执行静态量化...")
        
        # 准备量化配置
        self.model.eval()
        self.model.qconfig = quantization.get_default_qconfig('fbgemm')
        
        # 准备模型
        prepared_model = quantization.prepare(self.model)
        
        # 校准
        print("校准模型...")
        with torch.no_grad():
            for text in calibration_data[:100]:  # 使用部分数据校准
                inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
                prepared_model(**inputs)
        
        # 转换为量化模型
        self.quantized_model = quantization.convert(prepared_model)
        
        quantized_size = self._get_model_size(self.quantized_model)
        original_size = self._get_model_size(self.model)
        compression_ratio = original_size / quantized_size
        
        print(f"静态量化后模型大小: {quantized_size:.2f} MB")
        print(f"压缩比: {compression_ratio:.2f}x")
    
    def _get_model_size(self, model) -> float:
        """获取模型大小(MB)"""
        param_size = 0
        buffer_size = 0
        
        for param in model.parameters():
            param_size += param.nelement() * param.element_size()
        
        for buffer in model.buffers():
            buffer_size += buffer.nelement() * buffer.element_size()
        
        return (param_size + buffer_size) / (1024 * 1024)
    
    def benchmark_inference(self, test_prompts: List[str], num_runs: int = 10) -> Dict[str, float]:
        """基准测试推理性能"""
        results = {}
        
        # 测试原始模型
        if self.model is not None:
            original_time = self._benchmark_model(self.model, test_prompts, num_runs)
            results["original_model"] = original_time
        
        # 测试量化模型
        if self.quantized_model is not None:
            quantized_time = self._benchmark_model(self.quantized_model, test_prompts, num_runs)
            results["quantized_model"] = quantized_time
            
            if "original_model" in results:
                speedup = results["original_model"] / results["quantized_model"]
                results["speedup"] = speedup
        
        return results
    
    def _benchmark_model(self, model, test_prompts: List[str], num_runs: int) -> float:
        """基准测试单个模型"""
        model.eval()
        total_time = 0.0
        
        with torch.no_grad():
            for _ in range(num_runs):
                start_time = time.time()
                
                for prompt in test_prompts:
                    inputs = self.tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512)
                    outputs = model.generate(**inputs, max_length=100, do_sample=False)
                
                total_time += time.time() - start_time
        
        return total_time / num_runs
    
    def save_quantized_model(self, save_path: str):
        """保存量化模型"""
        if self.quantized_model is None:
            raise ValueError("没有量化模型可保存")
        
        torch.save(self.quantized_model.state_dict(), save_path)
        print(f"量化模型已保存到: {save_path}")
 
# 使用示例(需要适当的模型和环境)
"""
quantizer = ModelQuantizer("gpt2")
quantizer.load_model()
 
# 动态量化
quantizer.quantize_dynamic()
 
# 基准测试
test_prompts = ["Hello world", "What is AI?", "Explain machine learning"]
benchmark_results = quantizer.benchmark_inference(test_prompts)
print(f"基准测试结果: {benchmark_results}")
"""

成本优化策略

1. 智能模型选择

PYTHON
class ModelSelector:
    """智能模型选择器"""
    
    def __init__(self):
        self.models = {
            "gpt-3.5-turbo": {
                "cost_per_1k_tokens": 0.002,
                "max_tokens": 4096,
                "capabilities": ["general", "coding", "analysis"],
                "speed": "fast"
            },
            "gpt-4": {
                "cost_per_1k_tokens": 0.03,
                "max_tokens": 8192,
                "capabilities": ["general", "coding", "analysis", "complex_reasoning"],
                "speed": "medium"
            },
            "claude-3-haiku": {
                "cost_per_1k_tokens": 0.00025,
                "max_tokens": 200000,
                "capabilities": ["general", "analysis"],
                "speed": "very_fast"
            }
        }
        
        self.usage_stats = {}
    
    def select_model(self, task_type: str, complexity: str, budget_limit: float = None) -> str:
        """选择最适合的模型"""
        candidates = []
        
        for model_name, model_info in self.models.items():
            # 检查能力匹配
            if task_type in model_info["capabilities"]:
                # 检查预算限制
                if budget_limit is None or model_info["cost_per_1k_tokens"] <= budget_limit:
                    score = self._calculate_model_score(model_info, complexity)
                    candidates.append((model_name, score))
        
        if not candidates:
            # 如果没有匹配的模型,选择最便宜的
            cheapest = min(self.models.items(), key=lambda x: x[1]["cost_per_1k_tokens"])
            return cheapest[0]
        
        # 选择得分最高的模型
        best_model = max(candidates, key=lambda x: x[1])
        return best_model[0]
    
    def _calculate_model_score(self, model_info: Dict, complexity: str) -> float:
        """计算模型得分"""
        base_score = 1.0
        
        # 根据复杂度调整得分
        if complexity == "simple":
            # 简单任务偏好便宜快速的模型
            base_score += (1.0 / model_info["cost_per_1k_tokens"]) * 0.1
            if model_info["speed"] == "very_fast":
                base_score += 0.3
            elif model_info["speed"] == "fast":
                base_score += 0.2
        elif complexity == "complex":
            # 复杂任务偏好能力强的模型
            base_score += len(model_info["capabilities"]) * 0.2
            if "complex_reasoning" in model_info["capabilities"]:
                base_score += 0.5
        
        return base_score
    
    def estimate_cost(self, model_name: str, input_tokens: int, output_tokens: int) -> float:
        """估算成本"""
        if model_name not in self.models:
            return 0.0
        
        total_tokens = input_tokens + output_tokens
        cost_per_token = self.models[model_name]["cost_per_1k_tokens"] / 1000
        
        return total_tokens * cost_per_token
    
    def track_usage(self, model_name: str, tokens_used: int, cost: float):
        """跟踪使用情况"""
        if model_name not in self.usage_stats:
            self.usage_stats[model_name] = {
                "total_tokens": 0,
                "total_cost": 0.0,
                "request_count": 0
            }
        
        stats = self.usage_stats[model_name]
        stats["total_tokens"] += tokens_used
        stats["total_cost"] += cost
        stats["request_count"] += 1
    
    def get_usage_report(self) -> Dict[str, Any]:
        """获取使用报告"""
        total_cost = sum(stats["total_cost"] for stats in self.usage_stats.values())
        total_tokens = sum(stats["total_tokens"] for stats in self.usage_stats.values())
        
        report = {
            "total_cost": total_cost,
            "total_tokens": total_tokens,
            "models": self.usage_stats.copy()
        }
        
        # 计算每个模型的占比
        for model_name, stats in report["models"].items():
            if total_cost > 0:
                stats["cost_percentage"] = (stats["total_cost"] / total_cost) * 100
            if total_tokens > 0:
                stats["token_percentage"] = (stats["total_tokens"] / total_tokens) * 100
        
        return report
 
# 使用示例
selector = ModelSelector()
 
# 选择模型
simple_task_model = selector.select_model("general", "simple", budget_limit=0.01)
complex_task_model = selector.select_model("complex_reasoning", "complex")
 
print(f"简单任务推荐模型: {simple_task_model}")
print(f"复杂任务推荐模型: {complex_task_model}")
 
# 估算成本
cost = selector.estimate_cost("gpt-3.5-turbo", input_tokens=100, output_tokens=50)
print(f"估算成本: ${cost:.6f}")
 
# 跟踪使用
selector.track_usage("gpt-3.5-turbo", tokens_used=150, cost=cost)
 
# 获取使用报告
report = selector.get_usage_report()
print(f"使用报告: {report}")

小结

在本章中,我们学习了:

  1. 推理优化:批处理、缓存、模型量化等技术
  2. 内存优化:模型压缩和动态加载策略
  3. 成本优化:智能模型选择和使用跟踪
  4. 性能监控:全面的性能指标和优化建议
  5. 实践技巧:生产环境中的优化最佳实践

性能优化是LLM应用成功部署的关键,需要在质量、速度、成本之间找到最佳平衡点。

思考题

  1. 如何设计一个自适应的模型选择策略?
  2. 在什么情况下应该使用模型量化?
  3. 如何平衡缓存命中率和内存使用?
  4. 如何设计一个全面的性能监控系统?

恭喜你完成了高级LLM技术阶段的学习!接下来我们将继续完成剩余的教程阶段。