教程目录

TUTORIAL_SUMMARY

vector-databases

初级

未知
未知作者
更新于 2025-06-14

向量数据库与检索优化

向量数据库概述

向量数据库是专门为存储和检索高维向量数据而设计的数据库系统。在LLM应用中,向量数据库是RAG系统的核心组件,负责高效地存储文档向量并支持快速的相似度搜索。

向量数据库的核心特性

1. 高维向量存储

PYTHON
import numpy as np
import sqlite3
import pickle
import json
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
import time
 
@dataclass
class VectorRecord:
    """向量记录"""
    id: str
    vector: np.ndarray
    metadata: Dict[str, Any]
    timestamp: float
 
class SimpleVectorDB:
    """简单向量数据库实现"""
    
    def __init__(self, db_path: str = "vector_db.sqlite", dimension: int = 384):
        self.db_path = db_path
        self.dimension = dimension
        self.vectors = {}  # 内存中的向量缓存
        self._init_database()
    
    def _init_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 创建向量表
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS vectors (
                id TEXT PRIMARY KEY,
                vector BLOB,
                metadata TEXT,
                timestamp REAL,
                dimension INTEGER
            )
        """)
        
        # 创建索引
        cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_timestamp ON vectors(timestamp)
        """)
        
        conn.commit()
        conn.close()
        
        # 加载现有向量到内存
        self._load_vectors()
    
    def _load_vectors(self):
        """加载向量到内存"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("SELECT id, vector, metadata, timestamp FROM vectors")
        rows = cursor.fetchall()
        
        for row in rows:
            vector_id, vector_blob, metadata_json, timestamp = row
            vector = pickle.loads(vector_blob)
            metadata = json.loads(metadata_json)
            
            self.vectors[vector_id] = VectorRecord(
                id=vector_id,
                vector=vector,
                metadata=metadata,
                timestamp=timestamp
            )
        
        conn.close()
        print(f"加载了 {len(self.vectors)} 个向量到内存")
    
    def insert(self, vector_id: str, vector: np.ndarray, metadata: Dict[str, Any] = None):
        """插入向量"""
        if vector.shape[0] != self.dimension:
            raise ValueError(f"向量维度必须是 {self.dimension}")
        
        # 标准化向量
        normalized_vector = self._normalize_vector(vector)
        
        record = VectorRecord(
            id=vector_id,
            vector=normalized_vector,
            metadata=metadata or {},
            timestamp=time.time()
        )
        
        # 存储到内存
        self.vectors[vector_id] = record
        
        # 持久化到数据库
        self._persist_vector(record)
    
    def _normalize_vector(self, vector: np.ndarray) -> np.ndarray:
        """标准化向量"""
        norm = np.linalg.norm(vector)
        if norm == 0:
            return vector
        return vector / norm
    
    def _persist_vector(self, record: VectorRecord):
        """持久化向量到数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        vector_blob = pickle.dumps(record.vector)
        metadata_json = json.dumps(record.metadata, ensure_ascii=False)
        
        cursor.execute("""
            INSERT OR REPLACE INTO vectors (id, vector, metadata, timestamp, dimension)
            VALUES (?, ?, ?, ?, ?)
        """, (record.id, vector_blob, metadata_json, record.timestamp, self.dimension))
        
        conn.commit()
        conn.close()
    
    def search(self, query_vector: np.ndarray, top_k: int = 10, 
              filter_metadata: Dict[str, Any] = None) -> List[Tuple[str, float, Dict[str, Any]]]:
        """向量搜索"""
        if query_vector.shape[0] != self.dimension:
            raise ValueError(f"查询向量维度必须是 {self.dimension}")
        
        # 标准化查询向量
        normalized_query = self._normalize_vector(query_vector)
        
        # 计算相似度
        similarities = []
        
        for vector_id, record in self.vectors.items():
            # 应用元数据过滤
            if filter_metadata and not self._match_filter(record.metadata, filter_metadata):
                continue
            
            # 计算余弦相似度
            similarity = np.dot(normalized_query, record.vector)
            similarities.append((vector_id, similarity, record.metadata))
        
        # 排序并返回top-k
        similarities.sort(key=lambda x: x[1], reverse=True)
        return similarities[:top_k]
    
    def _match_filter(self, metadata: Dict[str, Any], filter_metadata: Dict[str, Any]) -> bool:
        """检查元数据是否匹配过滤条件"""
        for key, value in filter_metadata.items():
            if key not in metadata or metadata[key] != value:
                return False
        return True
    
    def delete(self, vector_id: str) -> bool:
        """删除向量"""
        if vector_id not in self.vectors:
            return False
        
        # 从内存删除
        del self.vectors[vector_id]
        
        # 从数据库删除
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("DELETE FROM vectors WHERE id = ?", (vector_id,))
        conn.commit()
        conn.close()
        
        return True
    
    def get_statistics(self) -> Dict[str, Any]:
        """获取数据库统计信息"""
        return {
            "total_vectors": len(self.vectors),
            "dimension": self.dimension,
            "memory_usage_mb": sum(record.vector.nbytes for record in self.vectors.values()) / (1024 * 1024)
        }
 
# 使用示例
vector_db = SimpleVectorDB(dimension=384)
 
# 插入向量
sample_vectors = np.random.random((5, 384))
for i, vector in enumerate(sample_vectors):
    vector_db.insert(
        vector_id=f"doc_{i}",
        vector=vector,
        metadata={"source": f"document_{i}.txt", "category": "tech"}
    )
 
# 搜索向量
query_vector = np.random.random(384)
results = vector_db.search(query_vector, top_k=3)
 
print("搜索结果:")
for vector_id, similarity, metadata in results:
    print(f"ID: {vector_id}, 相似度: {similarity:.3f}, 元数据: {metadata}")
 
# 获取统计信息
stats = vector_db.get_statistics()
print(f"数据库统计: {stats}")

2. 高效索引算法

PYTHON
import faiss
import numpy as np
from typing import List, Tuple
 
class FAISSVectorDB:
    """基于FAISS的向量数据库"""
    
    def __init__(self, dimension: int, index_type: str = "IVF"):
        self.dimension = dimension
        self.index_type = index_type
        self.index = None
        self.id_map = {}  # ID到索引位置的映射
        self.metadata_map = {}  # 存储元数据
        self.next_id = 0
        
        self._create_index()
    
    def _create_index(self):
        """创建FAISS索引"""
        if self.index_type == "Flat":
            # 暴力搜索,精确但慢
            self.index = faiss.IndexFlatIP(self.dimension)  # 内积索引
            
        elif self.index_type == "IVF":
            # 倒排文件索引,平衡精度和速度
            nlist = 100  # 聚类中心数量
            quantizer = faiss.IndexFlatIP(self.dimension)
            self.index = faiss.IndexIVFFlat(quantizer, self.dimension, nlist)
            
        elif self.index_type == "HNSW":
            # 分层导航小世界图,快速近似搜索
            M = 16  # 连接数
            self.index = faiss.IndexHNSWFlat(self.dimension, M)
            
        elif self.index_type == "PQ":
            # 乘积量化,内存效率高
            m = 8  # 子向量数量
            nbits = 8  # 每个子向量的位数
            self.index = faiss.IndexPQ(self.dimension, m, nbits)
            
        else:
            raise ValueError(f"不支持的索引类型: {self.index_type}")
        
        print(f"创建了 {self.index_type} 索引,维度: {self.dimension}")
    
    def train_index(self, training_vectors: np.ndarray):
        """训练索引(某些索引类型需要)"""
        if hasattr(self.index, 'is_trained') and not self.index.is_trained:
            print("训练索引...")
            self.index.train(training_vectors.astype(np.float32))
            print("索引训练完成")
    
    def add_vectors(self, vectors: np.ndarray, ids: List[str], 
                   metadata_list: List[Dict[str, Any]] = None):
        """批量添加向量"""
        if vectors.shape[1] != self.dimension:
            raise ValueError(f"向量维度必须是 {self.dimension}")
        
        # 标准化向量
        normalized_vectors = self._normalize_vectors(vectors)
        
        # 训练索引(如果需要)
        if hasattr(self.index, 'is_trained') and not self.index.is_trained:
            self.train_index(normalized_vectors)
        
        # 添加向量到索引
        start_id = self.next_id
        self.index.add(normalized_vectors.astype(np.float32))
        
        # 更新映射
        for i, vector_id in enumerate(ids):
            internal_id = start_id + i
            self.id_map[vector_id] = internal_id
            
            if metadata_list:
                self.metadata_map[vector_id] = metadata_list[i]
        
        self.next_id += len(ids)
        print(f"添加了 {len(ids)} 个向量")
    
    def _normalize_vectors(self, vectors: np.ndarray) -> np.ndarray:
        """标准化向量"""
        norms = np.linalg.norm(vectors, axis=1, keepdims=True)
        norms[norms == 0] = 1  # 避免除零
        return vectors / norms
    
    def search(self, query_vector: np.ndarray, top_k: int = 10) -> List[Tuple[str, float]]:
        """搜索相似向量"""
        if query_vector.shape[0] != self.dimension:
            raise ValueError(f"查询向量维度必须是 {self.dimension}")
        
        # 标准化查询向量
        normalized_query = self._normalize_vectors(query_vector.reshape(1, -1))
        
        # 执行搜索
        scores, indices = self.index.search(normalized_query.astype(np.float32), top_k)
        
        # 转换结果
        results = []
        for score, idx in zip(scores[0], indices[0]):
            if idx == -1:  # FAISS返回-1表示无效结果
                continue
            
            # 找到对应的ID
            vector_id = None
            for vid, internal_id in self.id_map.items():
                if internal_id == idx:
                    vector_id = vid
                    break
            
            if vector_id:
                results.append((vector_id, float(score)))
        
        return results
    
    def search_with_metadata(self, query_vector: np.ndarray, top_k: int = 10) -> List[Tuple[str, float, Dict[str, Any]]]:
        """搜索并返回元数据"""
        basic_results = self.search(query_vector, top_k)
        
        results_with_metadata = []
        for vector_id, score in basic_results:
            metadata = self.metadata_map.get(vector_id, {})
            results_with_metadata.append((vector_id, score, metadata))
        
        return results_with_metadata
    
    def get_index_info(self) -> Dict[str, Any]:
        """获取索引信息"""
        info = {
            "index_type": self.index_type,
            "dimension": self.dimension,
            "total_vectors": self.index.ntotal,
            "is_trained": getattr(self.index, 'is_trained', True)
        }
        
        # 添加特定索引的信息
        if self.index_type == "IVF":
            info["nlist"] = self.index.nlist
            info["nprobe"] = getattr(self.index, 'nprobe', 1)
        elif self.index_type == "HNSW":
            info["M"] = self.index.hnsw.M
            info["efConstruction"] = self.index.hnsw.efConstruction
            info["efSearch"] = self.index.hnsw.efSearch
        
        return info
    
    def save_index(self, filepath: str):
        """保存索引到文件"""
        faiss.write_index(self.index, filepath)
        
        # 保存映射信息
        import pickle
        mapping_file = filepath + ".mapping"
        with open(mapping_file, 'wb') as f:
            pickle.dump({
                'id_map': self.id_map,
                'metadata_map': self.metadata_map,
                'next_id': self.next_id
            }, f)
        
        print(f"索引已保存到 {filepath}")
    
    def load_index(self, filepath: str):
        """从文件加载索引"""
        self.index = faiss.read_index(filepath)
        
        # 加载映射信息
        import pickle
        mapping_file = filepath + ".mapping"
        try:
            with open(mapping_file, 'rb') as f:
                mapping_data = pickle.load(f)
                self.id_map = mapping_data['id_map']
                self.metadata_map = mapping_data['metadata_map']
                self.next_id = mapping_data['next_id']
        except FileNotFoundError:
            print("警告: 未找到映射文件,ID映射将为空")
        
        print(f"索引已从 {filepath} 加载")
 
# 使用示例
# 创建不同类型的索引
faiss_db_flat = FAISSVectorDB(dimension=384, index_type="Flat")
faiss_db_ivf = FAISSVectorDB(dimension=384, index_type="IVF")
faiss_db_hnsw = FAISSVectorDB(dimension=384, index_type="HNSW")
 
# 生成测试数据
test_vectors = np.random.random((1000, 384)).astype(np.float32)
test_ids = [f"doc_{i}" for i in range(1000)]
test_metadata = [{"category": f"cat_{i%5}", "source": f"src_{i%10}"} for i in range(1000)]
 
# 添加向量
faiss_db_ivf.add_vectors(test_vectors, test_ids, test_metadata)
 
# 搜索测试
query = np.random.random(384)
results = faiss_db_ivf.search_with_metadata(query, top_k=5)
 
print("FAISS搜索结果:")
for vector_id, score, metadata in results:
    print(f"ID: {vector_id}, 分数: {score:.3f}, 元数据: {metadata}")
 
# 获取索引信息
info = faiss_db_ivf.get_index_info()
print(f"索引信息: {info}")

3. 检索优化策略

PYTHON
class OptimizedRetriever:
    """优化的检索器"""
    
    def __init__(self, vector_db, embedder):
        self.vector_db = vector_db
        self.embedder = embedder
        self.query_cache = {}  # 查询缓存
        self.performance_stats = {
            "total_queries": 0,
            "cache_hits": 0,
            "avg_search_time": 0.0
        }
    
    def search_with_reranking(self, query: str, top_k: int = 10, 
                            rerank_top_k: int = 50) -> List[Tuple[str, float, Dict[str, Any]]]:
        """带重排序的搜索"""
        import time
        start_time = time.time()
        
        # 1. 初始检索(获取更多候选)
        query_vector = self.embedder.embed_text(query)
        initial_results = self.vector_db.search_with_metadata(query_vector, rerank_top_k)
        
        if not initial_results:
            return []
        
        # 2. 重排序(使用更精确的相似度计算)
        reranked_results = []
        
        for vector_id, score, metadata in initial_results:
            # 获取原始文档内容
            content = metadata.get("content", "")
            
            # 计算更精确的相似度
            if content:
                content_vector = self.embedder.embed_text(content)
                precise_score = self._compute_precise_similarity(query_vector, content_vector)
                reranked_results.append((vector_id, precise_score, metadata))
            else:
                reranked_results.append((vector_id, score, metadata))
        
        # 3. 按新分数排序
        reranked_results.sort(key=lambda x: x[1], reverse=True)
        
        # 4. 更新性能统计
        search_time = time.time() - start_time
        self._update_performance_stats(search_time)
        
        return reranked_results[:top_k]
    
    def search_with_query_expansion(self, query: str, top_k: int = 10) -> List[Tuple[str, float, Dict[str, Any]]]:
        """带查询扩展的搜索"""
        # 1. 生成扩展查询
        expanded_queries = self._expand_query(query)
        
        # 2. 对每个查询进行搜索
        all_results = {}
        
        for expanded_query in expanded_queries:
            query_vector = self.embedder.embed_text(expanded_query)
            results = self.vector_db.search_with_metadata(query_vector, top_k * 2)
            
            # 合并结果
            for vector_id, score, metadata in results:
                if vector_id in all_results:
                    # 取最高分数
                    all_results[vector_id] = max(all_results[vector_id], (score, metadata))
                else:
                    all_results[vector_id] = (score, metadata)
        
        # 3. 排序并返回
        final_results = [
            (vector_id, score, metadata) 
            for vector_id, (score, metadata) in all_results.items()
        ]
        final_results.sort(key=lambda x: x[1], reverse=True)
        
        return final_results[:top_k]
    
    def search_with_cache(self, query: str, top_k: int = 10) -> List[Tuple[str, float, Dict[str, Any]]]:
        """带缓存的搜索"""
        # 生成缓存键
        cache_key = f"{query}_{top_k}"
        
        # 检查缓存
        if cache_key in self.query_cache:
            self.performance_stats["cache_hits"] += 1
            return self.query_cache[cache_key]
        
        # 执行搜索
        query_vector = self.embedder.embed_text(query)
        results = self.vector_db.search_with_metadata(query_vector, top_k)
        
        # 缓存结果
        self.query_cache[cache_key] = results
        
        # 限制缓存大小
        if len(self.query_cache) > 1000:
            # 删除最旧的缓存项
            oldest_key = next(iter(self.query_cache))
            del self.query_cache[oldest_key]
        
        return results
    
    def hybrid_search(self, query: str, top_k: int = 10, 
                     vector_weight: float = 0.7) -> List[Tuple[str, float, Dict[str, Any]]]:
        """混合搜索(向量+关键词)"""
        # 1. 向量搜索
        query_vector = self.embedder.embed_text(query)
        vector_results = self.vector_db.search_with_metadata(query_vector, top_k * 2)
        
        # 2. 关键词搜索(简化实现)
        keyword_results = self._keyword_search(query, top_k * 2)
        
        # 3. 合并和重新评分
        combined_scores = {}
        
        # 处理向量搜索结果
        for vector_id, vector_score, metadata in vector_results:
            combined_scores[vector_id] = {
                "vector_score": vector_score,
                "keyword_score": 0.0,
                "metadata": metadata
            }
        
        # 处理关键词搜索结果
        for vector_id, keyword_score, metadata in keyword_results:
            if vector_id in combined_scores:
                combined_scores[vector_id]["keyword_score"] = keyword_score
            else:
                combined_scores[vector_id] = {
                    "vector_score": 0.0,
                    "keyword_score": keyword_score,
                    "metadata": metadata
                }
        
        # 4. 计算最终分数
        final_results = []
        for vector_id, scores in combined_scores.items():
            final_score = (vector_weight * scores["vector_score"] + 
                          (1 - vector_weight) * scores["keyword_score"])
            final_results.append((vector_id, final_score, scores["metadata"]))
        
        # 5. 排序并返回
        final_results.sort(key=lambda x: x[1], reverse=True)
        return final_results[:top_k]
    
    def _compute_precise_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
        """计算精确相似度"""
        # 使用余弦相似度
        dot_product = np.dot(vec1, vec2)
        norm1 = np.linalg.norm(vec1)
        norm2 = np.linalg.norm(vec2)
        
        if norm1 == 0 or norm2 == 0:
            return 0.0
        
        return dot_product / (norm1 * norm2)
    
    def _expand_query(self, query: str) -> List[str]:
        """扩展查询"""
        # 简化的查询扩展
        expanded = [query]
        
        # 添加同义词(这里用简单的规则)
        synonyms = {
            "机器学习": ["ML", "人工智能", "AI"],
            "深度学习": ["神经网络", "DL"],
            "自然语言处理": ["NLP", "文本处理"]
        }
        
        for term, syns in synonyms.items():
            if term in query:
                for syn in syns:
                    expanded.append(query.replace(term, syn))
        
        return expanded
    
    def _keyword_search(self, query: str, top_k: int) -> List[Tuple[str, float, Dict[str, Any]]]:
        """关键词搜索(简化实现)"""
        # 这里应该实现真正的关键词搜索
        # 简化为返回空结果
        return []
    
    def _update_performance_stats(self, search_time: float):
        """更新性能统计"""
        self.performance_stats["total_queries"] += 1
        
        # 更新平均搜索时间
        total_time = (self.performance_stats["avg_search_time"] * 
                     (self.performance_stats["total_queries"] - 1) + search_time)
        self.performance_stats["avg_search_time"] = total_time / self.performance_stats["total_queries"]
    
    def get_performance_stats(self) -> Dict[str, Any]:
        """获取性能统计"""
        stats = self.performance_stats.copy()
        if stats["total_queries"] > 0:
            stats["cache_hit_rate"] = stats["cache_hits"] / stats["total_queries"]
        else:
            stats["cache_hit_rate"] = 0.0
        
        return stats
 
# 使用示例
optimized_retriever = OptimizedRetriever(faiss_db_ivf, embedder)
 
# 测试不同的搜索策略
query = "深度学习的应用"
 
# 重排序搜索
rerank_results = optimized_retriever.search_with_reranking(query, top_k=5)
print("重排序搜索结果:")
for vector_id, score, metadata in rerank_results[:3]:
    print(f"ID: {vector_id}, 分数: {score:.3f}")
 
# 查询扩展搜索
expansion_results = optimized_retriever.search_with_query_expansion(query, top_k=5)
print("\n查询扩展搜索结果:")
for vector_id, score, metadata in expansion_results[:3]:
    print(f"ID: {vector_id}, 分数: {score:.3f}")
 
# 缓存搜索
cache_results = optimized_retriever.search_with_cache(query, top_k=5)
print("\n缓存搜索结果:")
for vector_id, score, metadata in cache_results[:3]:
    print(f"ID: {vector_id}, 分数: {score:.3f}")
 
# 性能统计
perf_stats = optimized_retriever.get_performance_stats()
print(f"\n性能统计: {perf_stats}")

小结

在本章中,我们学习了:

  1. 向量数据库基础:高维向量存储和管理
  2. 索引算法:FAISS的不同索引类型和特点
  3. 检索优化:重排序、查询扩展、缓存等优化策略
  4. 混合搜索:结合向量搜索和关键词搜索
  5. 性能监控:搜索性能的统计和优化

向量数据库是RAG系统的核心基础设施,选择合适的索引算法和优化策略对系统性能至关重要。

思考题

  1. 如何选择适合特定应用场景的向量索引类型?
  2. 在什么情况下应该使用混合搜索而不是纯向量搜索?
  3. 如何平衡搜索精度和搜索速度?
  4. 如何设计一个支持实时更新的向量数据库?

下一章我们将学习LLM微调与适配技术。