vector-databases
初级
未知
未知作者
更新于 2025-06-14
向量数据库与检索优化
向量数据库概述
向量数据库是专门为存储和检索高维向量数据而设计的数据库系统。在LLM应用中,向量数据库是RAG系统的核心组件,负责高效地存储文档向量并支持快速的相似度搜索。
向量数据库的核心特性
1. 高维向量存储
PYTHON
import numpy as np
import sqlite3
import pickle
import json
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
import time
@dataclass
class VectorRecord:
"""向量记录"""
id: str
vector: np.ndarray
metadata: Dict[str, Any]
timestamp: float
class SimpleVectorDB:
"""简单向量数据库实现"""
def __init__(self, db_path: str = "vector_db.sqlite", dimension: int = 384):
self.db_path = db_path
self.dimension = dimension
self.vectors = {} # 内存中的向量缓存
self._init_database()
def _init_database(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建向量表
cursor.execute("""
CREATE TABLE IF NOT EXISTS vectors (
id TEXT PRIMARY KEY,
vector BLOB,
metadata TEXT,
timestamp REAL,
dimension INTEGER
)
""")
# 创建索引
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_timestamp ON vectors(timestamp)
""")
conn.commit()
conn.close()
# 加载现有向量到内存
self._load_vectors()
def _load_vectors(self):
"""加载向量到内存"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT id, vector, metadata, timestamp FROM vectors")
rows = cursor.fetchall()
for row in rows:
vector_id, vector_blob, metadata_json, timestamp = row
vector = pickle.loads(vector_blob)
metadata = json.loads(metadata_json)
self.vectors[vector_id] = VectorRecord(
id=vector_id,
vector=vector,
metadata=metadata,
timestamp=timestamp
)
conn.close()
print(f"加载了 {len(self.vectors)} 个向量到内存")
def insert(self, vector_id: str, vector: np.ndarray, metadata: Dict[str, Any] = None):
"""插入向量"""
if vector.shape[0] != self.dimension:
raise ValueError(f"向量维度必须是 {self.dimension}")
# 标准化向量
normalized_vector = self._normalize_vector(vector)
record = VectorRecord(
id=vector_id,
vector=normalized_vector,
metadata=metadata or {},
timestamp=time.time()
)
# 存储到内存
self.vectors[vector_id] = record
# 持久化到数据库
self._persist_vector(record)
def _normalize_vector(self, vector: np.ndarray) -> np.ndarray:
"""标准化向量"""
norm = np.linalg.norm(vector)
if norm == 0:
return vector
return vector / norm
def _persist_vector(self, record: VectorRecord):
"""持久化向量到数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
vector_blob = pickle.dumps(record.vector)
metadata_json = json.dumps(record.metadata, ensure_ascii=False)
cursor.execute("""
INSERT OR REPLACE INTO vectors (id, vector, metadata, timestamp, dimension)
VALUES (?, ?, ?, ?, ?)
""", (record.id, vector_blob, metadata_json, record.timestamp, self.dimension))
conn.commit()
conn.close()
def search(self, query_vector: np.ndarray, top_k: int = 10,
filter_metadata: Dict[str, Any] = None) -> List[Tuple[str, float, Dict[str, Any]]]:
"""向量搜索"""
if query_vector.shape[0] != self.dimension:
raise ValueError(f"查询向量维度必须是 {self.dimension}")
# 标准化查询向量
normalized_query = self._normalize_vector(query_vector)
# 计算相似度
similarities = []
for vector_id, record in self.vectors.items():
# 应用元数据过滤
if filter_metadata and not self._match_filter(record.metadata, filter_metadata):
continue
# 计算余弦相似度
similarity = np.dot(normalized_query, record.vector)
similarities.append((vector_id, similarity, record.metadata))
# 排序并返回top-k
similarities.sort(key=lambda x: x[1], reverse=True)
return similarities[:top_k]
def _match_filter(self, metadata: Dict[str, Any], filter_metadata: Dict[str, Any]) -> bool:
"""检查元数据是否匹配过滤条件"""
for key, value in filter_metadata.items():
if key not in metadata or metadata[key] != value:
return False
return True
def delete(self, vector_id: str) -> bool:
"""删除向量"""
if vector_id not in self.vectors:
return False
# 从内存删除
del self.vectors[vector_id]
# 从数据库删除
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("DELETE FROM vectors WHERE id = ?", (vector_id,))
conn.commit()
conn.close()
return True
def get_statistics(self) -> Dict[str, Any]:
"""获取数据库统计信息"""
return {
"total_vectors": len(self.vectors),
"dimension": self.dimension,
"memory_usage_mb": sum(record.vector.nbytes for record in self.vectors.values()) / (1024 * 1024)
}
# 使用示例
vector_db = SimpleVectorDB(dimension=384)
# 插入向量
sample_vectors = np.random.random((5, 384))
for i, vector in enumerate(sample_vectors):
vector_db.insert(
vector_id=f"doc_{i}",
vector=vector,
metadata={"source": f"document_{i}.txt", "category": "tech"}
)
# 搜索向量
query_vector = np.random.random(384)
results = vector_db.search(query_vector, top_k=3)
print("搜索结果:")
for vector_id, similarity, metadata in results:
print(f"ID: {vector_id}, 相似度: {similarity:.3f}, 元数据: {metadata}")
# 获取统计信息
stats = vector_db.get_statistics()
print(f"数据库统计: {stats}")
2. 高效索引算法
PYTHON
import faiss
import numpy as np
from typing import List, Tuple
class FAISSVectorDB:
"""基于FAISS的向量数据库"""
def __init__(self, dimension: int, index_type: str = "IVF"):
self.dimension = dimension
self.index_type = index_type
self.index = None
self.id_map = {} # ID到索引位置的映射
self.metadata_map = {} # 存储元数据
self.next_id = 0
self._create_index()
def _create_index(self):
"""创建FAISS索引"""
if self.index_type == "Flat":
# 暴力搜索,精确但慢
self.index = faiss.IndexFlatIP(self.dimension) # 内积索引
elif self.index_type == "IVF":
# 倒排文件索引,平衡精度和速度
nlist = 100 # 聚类中心数量
quantizer = faiss.IndexFlatIP(self.dimension)
self.index = faiss.IndexIVFFlat(quantizer, self.dimension, nlist)
elif self.index_type == "HNSW":
# 分层导航小世界图,快速近似搜索
M = 16 # 连接数
self.index = faiss.IndexHNSWFlat(self.dimension, M)
elif self.index_type == "PQ":
# 乘积量化,内存效率高
m = 8 # 子向量数量
nbits = 8 # 每个子向量的位数
self.index = faiss.IndexPQ(self.dimension, m, nbits)
else:
raise ValueError(f"不支持的索引类型: {self.index_type}")
print(f"创建了 {self.index_type} 索引,维度: {self.dimension}")
def train_index(self, training_vectors: np.ndarray):
"""训练索引(某些索引类型需要)"""
if hasattr(self.index, 'is_trained') and not self.index.is_trained:
print("训练索引...")
self.index.train(training_vectors.astype(np.float32))
print("索引训练完成")
def add_vectors(self, vectors: np.ndarray, ids: List[str],
metadata_list: List[Dict[str, Any]] = None):
"""批量添加向量"""
if vectors.shape[1] != self.dimension:
raise ValueError(f"向量维度必须是 {self.dimension}")
# 标准化向量
normalized_vectors = self._normalize_vectors(vectors)
# 训练索引(如果需要)
if hasattr(self.index, 'is_trained') and not self.index.is_trained:
self.train_index(normalized_vectors)
# 添加向量到索引
start_id = self.next_id
self.index.add(normalized_vectors.astype(np.float32))
# 更新映射
for i, vector_id in enumerate(ids):
internal_id = start_id + i
self.id_map[vector_id] = internal_id
if metadata_list:
self.metadata_map[vector_id] = metadata_list[i]
self.next_id += len(ids)
print(f"添加了 {len(ids)} 个向量")
def _normalize_vectors(self, vectors: np.ndarray) -> np.ndarray:
"""标准化向量"""
norms = np.linalg.norm(vectors, axis=1, keepdims=True)
norms[norms == 0] = 1 # 避免除零
return vectors / norms
def search(self, query_vector: np.ndarray, top_k: int = 10) -> List[Tuple[str, float]]:
"""搜索相似向量"""
if query_vector.shape[0] != self.dimension:
raise ValueError(f"查询向量维度必须是 {self.dimension}")
# 标准化查询向量
normalized_query = self._normalize_vectors(query_vector.reshape(1, -1))
# 执行搜索
scores, indices = self.index.search(normalized_query.astype(np.float32), top_k)
# 转换结果
results = []
for score, idx in zip(scores[0], indices[0]):
if idx == -1: # FAISS返回-1表示无效结果
continue
# 找到对应的ID
vector_id = None
for vid, internal_id in self.id_map.items():
if internal_id == idx:
vector_id = vid
break
if vector_id:
results.append((vector_id, float(score)))
return results
def search_with_metadata(self, query_vector: np.ndarray, top_k: int = 10) -> List[Tuple[str, float, Dict[str, Any]]]:
"""搜索并返回元数据"""
basic_results = self.search(query_vector, top_k)
results_with_metadata = []
for vector_id, score in basic_results:
metadata = self.metadata_map.get(vector_id, {})
results_with_metadata.append((vector_id, score, metadata))
return results_with_metadata
def get_index_info(self) -> Dict[str, Any]:
"""获取索引信息"""
info = {
"index_type": self.index_type,
"dimension": self.dimension,
"total_vectors": self.index.ntotal,
"is_trained": getattr(self.index, 'is_trained', True)
}
# 添加特定索引的信息
if self.index_type == "IVF":
info["nlist"] = self.index.nlist
info["nprobe"] = getattr(self.index, 'nprobe', 1)
elif self.index_type == "HNSW":
info["M"] = self.index.hnsw.M
info["efConstruction"] = self.index.hnsw.efConstruction
info["efSearch"] = self.index.hnsw.efSearch
return info
def save_index(self, filepath: str):
"""保存索引到文件"""
faiss.write_index(self.index, filepath)
# 保存映射信息
import pickle
mapping_file = filepath + ".mapping"
with open(mapping_file, 'wb') as f:
pickle.dump({
'id_map': self.id_map,
'metadata_map': self.metadata_map,
'next_id': self.next_id
}, f)
print(f"索引已保存到 {filepath}")
def load_index(self, filepath: str):
"""从文件加载索引"""
self.index = faiss.read_index(filepath)
# 加载映射信息
import pickle
mapping_file = filepath + ".mapping"
try:
with open(mapping_file, 'rb') as f:
mapping_data = pickle.load(f)
self.id_map = mapping_data['id_map']
self.metadata_map = mapping_data['metadata_map']
self.next_id = mapping_data['next_id']
except FileNotFoundError:
print("警告: 未找到映射文件,ID映射将为空")
print(f"索引已从 {filepath} 加载")
# 使用示例
# 创建不同类型的索引
faiss_db_flat = FAISSVectorDB(dimension=384, index_type="Flat")
faiss_db_ivf = FAISSVectorDB(dimension=384, index_type="IVF")
faiss_db_hnsw = FAISSVectorDB(dimension=384, index_type="HNSW")
# 生成测试数据
test_vectors = np.random.random((1000, 384)).astype(np.float32)
test_ids = [f"doc_{i}" for i in range(1000)]
test_metadata = [{"category": f"cat_{i%5}", "source": f"src_{i%10}"} for i in range(1000)]
# 添加向量
faiss_db_ivf.add_vectors(test_vectors, test_ids, test_metadata)
# 搜索测试
query = np.random.random(384)
results = faiss_db_ivf.search_with_metadata(query, top_k=5)
print("FAISS搜索结果:")
for vector_id, score, metadata in results:
print(f"ID: {vector_id}, 分数: {score:.3f}, 元数据: {metadata}")
# 获取索引信息
info = faiss_db_ivf.get_index_info()
print(f"索引信息: {info}")
3. 检索优化策略
PYTHON
class OptimizedRetriever:
"""优化的检索器"""
def __init__(self, vector_db, embedder):
self.vector_db = vector_db
self.embedder = embedder
self.query_cache = {} # 查询缓存
self.performance_stats = {
"total_queries": 0,
"cache_hits": 0,
"avg_search_time": 0.0
}
def search_with_reranking(self, query: str, top_k: int = 10,
rerank_top_k: int = 50) -> List[Tuple[str, float, Dict[str, Any]]]:
"""带重排序的搜索"""
import time
start_time = time.time()
# 1. 初始检索(获取更多候选)
query_vector = self.embedder.embed_text(query)
initial_results = self.vector_db.search_with_metadata(query_vector, rerank_top_k)
if not initial_results:
return []
# 2. 重排序(使用更精确的相似度计算)
reranked_results = []
for vector_id, score, metadata in initial_results:
# 获取原始文档内容
content = metadata.get("content", "")
# 计算更精确的相似度
if content:
content_vector = self.embedder.embed_text(content)
precise_score = self._compute_precise_similarity(query_vector, content_vector)
reranked_results.append((vector_id, precise_score, metadata))
else:
reranked_results.append((vector_id, score, metadata))
# 3. 按新分数排序
reranked_results.sort(key=lambda x: x[1], reverse=True)
# 4. 更新性能统计
search_time = time.time() - start_time
self._update_performance_stats(search_time)
return reranked_results[:top_k]
def search_with_query_expansion(self, query: str, top_k: int = 10) -> List[Tuple[str, float, Dict[str, Any]]]:
"""带查询扩展的搜索"""
# 1. 生成扩展查询
expanded_queries = self._expand_query(query)
# 2. 对每个查询进行搜索
all_results = {}
for expanded_query in expanded_queries:
query_vector = self.embedder.embed_text(expanded_query)
results = self.vector_db.search_with_metadata(query_vector, top_k * 2)
# 合并结果
for vector_id, score, metadata in results:
if vector_id in all_results:
# 取最高分数
all_results[vector_id] = max(all_results[vector_id], (score, metadata))
else:
all_results[vector_id] = (score, metadata)
# 3. 排序并返回
final_results = [
(vector_id, score, metadata)
for vector_id, (score, metadata) in all_results.items()
]
final_results.sort(key=lambda x: x[1], reverse=True)
return final_results[:top_k]
def search_with_cache(self, query: str, top_k: int = 10) -> List[Tuple[str, float, Dict[str, Any]]]:
"""带缓存的搜索"""
# 生成缓存键
cache_key = f"{query}_{top_k}"
# 检查缓存
if cache_key in self.query_cache:
self.performance_stats["cache_hits"] += 1
return self.query_cache[cache_key]
# 执行搜索
query_vector = self.embedder.embed_text(query)
results = self.vector_db.search_with_metadata(query_vector, top_k)
# 缓存结果
self.query_cache[cache_key] = results
# 限制缓存大小
if len(self.query_cache) > 1000:
# 删除最旧的缓存项
oldest_key = next(iter(self.query_cache))
del self.query_cache[oldest_key]
return results
def hybrid_search(self, query: str, top_k: int = 10,
vector_weight: float = 0.7) -> List[Tuple[str, float, Dict[str, Any]]]:
"""混合搜索(向量+关键词)"""
# 1. 向量搜索
query_vector = self.embedder.embed_text(query)
vector_results = self.vector_db.search_with_metadata(query_vector, top_k * 2)
# 2. 关键词搜索(简化实现)
keyword_results = self._keyword_search(query, top_k * 2)
# 3. 合并和重新评分
combined_scores = {}
# 处理向量搜索结果
for vector_id, vector_score, metadata in vector_results:
combined_scores[vector_id] = {
"vector_score": vector_score,
"keyword_score": 0.0,
"metadata": metadata
}
# 处理关键词搜索结果
for vector_id, keyword_score, metadata in keyword_results:
if vector_id in combined_scores:
combined_scores[vector_id]["keyword_score"] = keyword_score
else:
combined_scores[vector_id] = {
"vector_score": 0.0,
"keyword_score": keyword_score,
"metadata": metadata
}
# 4. 计算最终分数
final_results = []
for vector_id, scores in combined_scores.items():
final_score = (vector_weight * scores["vector_score"] +
(1 - vector_weight) * scores["keyword_score"])
final_results.append((vector_id, final_score, scores["metadata"]))
# 5. 排序并返回
final_results.sort(key=lambda x: x[1], reverse=True)
return final_results[:top_k]
def _compute_precise_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
"""计算精确相似度"""
# 使用余弦相似度
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
def _expand_query(self, query: str) -> List[str]:
"""扩展查询"""
# 简化的查询扩展
expanded = [query]
# 添加同义词(这里用简单的规则)
synonyms = {
"机器学习": ["ML", "人工智能", "AI"],
"深度学习": ["神经网络", "DL"],
"自然语言处理": ["NLP", "文本处理"]
}
for term, syns in synonyms.items():
if term in query:
for syn in syns:
expanded.append(query.replace(term, syn))
return expanded
def _keyword_search(self, query: str, top_k: int) -> List[Tuple[str, float, Dict[str, Any]]]:
"""关键词搜索(简化实现)"""
# 这里应该实现真正的关键词搜索
# 简化为返回空结果
return []
def _update_performance_stats(self, search_time: float):
"""更新性能统计"""
self.performance_stats["total_queries"] += 1
# 更新平均搜索时间
total_time = (self.performance_stats["avg_search_time"] *
(self.performance_stats["total_queries"] - 1) + search_time)
self.performance_stats["avg_search_time"] = total_time / self.performance_stats["total_queries"]
def get_performance_stats(self) -> Dict[str, Any]:
"""获取性能统计"""
stats = self.performance_stats.copy()
if stats["total_queries"] > 0:
stats["cache_hit_rate"] = stats["cache_hits"] / stats["total_queries"]
else:
stats["cache_hit_rate"] = 0.0
return stats
# 使用示例
optimized_retriever = OptimizedRetriever(faiss_db_ivf, embedder)
# 测试不同的搜索策略
query = "深度学习的应用"
# 重排序搜索
rerank_results = optimized_retriever.search_with_reranking(query, top_k=5)
print("重排序搜索结果:")
for vector_id, score, metadata in rerank_results[:3]:
print(f"ID: {vector_id}, 分数: {score:.3f}")
# 查询扩展搜索
expansion_results = optimized_retriever.search_with_query_expansion(query, top_k=5)
print("\n查询扩展搜索结果:")
for vector_id, score, metadata in expansion_results[:3]:
print(f"ID: {vector_id}, 分数: {score:.3f}")
# 缓存搜索
cache_results = optimized_retriever.search_with_cache(query, top_k=5)
print("\n缓存搜索结果:")
for vector_id, score, metadata in cache_results[:3]:
print(f"ID: {vector_id}, 分数: {score:.3f}")
# 性能统计
perf_stats = optimized_retriever.get_performance_stats()
print(f"\n性能统计: {perf_stats}")
小结
在本章中,我们学习了:
- 向量数据库基础:高维向量存储和管理
- 索引算法:FAISS的不同索引类型和特点
- 检索优化:重排序、查询扩展、缓存等优化策略
- 混合搜索:结合向量搜索和关键词搜索
- 性能监控:搜索性能的统计和优化
向量数据库是RAG系统的核心基础设施,选择合适的索引算法和优化策略对系统性能至关重要。
思考题
- 如何选择适合特定应用场景的向量索引类型?
- 在什么情况下应该使用混合搜索而不是纯向量搜索?
- 如何平衡搜索精度和搜索速度?
- 如何设计一个支持实时更新的向量数据库?
下一章我们将学习LLM微调与适配技术。