向量数据库选型与使用
概述
向量数据库是RAG系统的核心存储组件,负责存储文档向量并支持高效的相似度检索。选择合适的向量数据库需要考虑性能、扩展性、成本和易用性等多个维度。
向量数据库对比
主流向量数据库
| 数据库 | 类型 | 特点 | 适用场景 |
|---|---|---|---|
| Pinecone | 云服务 | 托管服务,易用 | 生产环境快速部署 |
| Milvus | 开源 | 高性能,可扩展 | 大规模企业应用 |
| Chroma | 开源 | 轻量级,易集成 | 开发测试、小规模应用 |
| FAISS | 开源库 | Meta出品,高性能 | 本地部署、研究 |
| Weaviate | 开源 | 语义搜索强 | 复杂查询场景 |
| Qdrant | 开源 | Rust实现,高效 | 高性能需求 |
| pgvector | PostgreSQL扩展 | 关系数据库集成 | 已有PG基础设施 |
详细对比
| 维度 | Pinecone | Milvus | Chroma | FAISS |
|---|---|---|---|---|
| 部署方式 | 云托管 | 自托管/云 | 本地 | 本地库 |
| 学习曲线 | 低 | 中 | 低 | 高 |
| 性能 | 高 | 高 | 中 | 高 |
| 扩展性 | 自动 | 手动 | 有限 | 手动 |
| 成本 | 按量付费 | 免费/云付费 | 免费 | 免费 |
| 生产就绪 | 是 | 是 | 测试阶段 | 需封装 |
Pinecone使用
初始化与连接
python
import pinecone
pinecone.init(
api_key="your-api-key",
environment="us-east-1"
)
index_name = "my-index"
if index_name not in pinecone.list_indexes():
pinecone.create_index(
name=index_name,
dimension=1536,
metric="cosine",
pod_type="p1.x1"
)
index = pinecone.Index(index_name)插入向量
python
from openai import OpenAI
client = OpenAI()
def get_embedding(text):
response = client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
vectors = []
for i, doc in enumerate(documents):
embedding = get_embedding(doc.page_content)
vectors.append({
"id": f"doc_{i}",
"values": embedding,
"metadata": {
"text": doc.page_content,
"source": doc.metadata.get("source", "")
}
})
index.upsert(vectors=vectors)查询向量
python
query_embedding = get_embedding("查询文本")
results = index.query(
vector=query_embedding,
top_k=5,
include_metadata=True
)
for match in results.matches:
print(f"相似度: {match.score}")
print(f"内容: {match.metadata['text'][:100]}")Milvus使用
安装与启动
bash
docker run -d --name milvus-standalone \
-p 19530:19530 \
-p 9091:9091 \
milvusdb/milvus:latest standalonePython客户端
python
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
connections.connect("default", host="localhost", port="19530")
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=2000)
]
schema = CollectionSchema(fields, "文档向量集合")
collection = Collection("documents", schema)
index_params = {
"metric_type": "COSINE",
"index_type": "IVF_FLAT",
"params": {"nlist": 128}
}
collection.create_index("embedding", index_params)插入与查询
python
entities = [
[get_embedding(doc.page_content) for doc in documents],
[doc.page_content for doc in documents]
]
collection.insert(entities)
collection.load()
search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}}
results = collection.search(
data=[query_embedding],
anns_field="embedding",
param=search_params,
limit=5,
output_fields=["text"]
)
for hits in results:
for hit in hits:
print(f"相似度: {hit.distance}")
print(f"内容: {hit.entity.get('text')[:100]}")Chroma使用
快速开始
python
import chromadb
from chromadb.config import Settings
client = chromadb.Client(Settings(
chroma_db_impl="duckdb+parquet",
persist_directory="./chroma_db"
))
collection = client.create_collection("documents")添加文档
python
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory="./chroma_db"
)
vectorstore.persist()检索文档
python
results = vectorstore.similarity_search(
query="查询内容",
k=5
)
for doc in results:
print(doc.page_content)
print(doc.metadata)
results_with_scores = vectorstore.similarity_search_with_score(
query="查询内容",
k=5
)
for doc, score in results_with_scores:
print(f"相似度: {score}")
print(doc.page_content)FAISS使用
基础用法
python
import faiss
import numpy as np
dimension = 1536
index = faiss.IndexFlatIP(dimension)
embeddings = np.array([get_embedding(doc.page_content) for doc in documents]).astype('float32')
faiss.normalize_L2(embeddings)
index.add(embeddings)
query_embedding = np.array([get_embedding("查询内容")]).astype('float32')
faiss.normalize_L2(query_embedding)
k = 5
distances, indices = index.search(query_embedding, k)
for i, idx in enumerate(indices[0]):
print(f"相似度: {distances[0][i]}")
print(documents[idx].page_content[:100])使用LangChain封装
python
from langchain.vectorstores import FAISS
vectorstore = FAISS.from_documents(
documents=chunks,
embedding=embeddings
)
vectorstore.save_local("./faiss_index")
loaded_vectorstore = FAISS.load_local(
"./faiss_index",
embeddings
)
results = loaded_vectorstore.similarity_search("查询内容", k=5)FAISS索引类型
python
nlist = 100
quantizer = faiss.IndexFlatIP(dimension)
index = faiss.IndexIVFFlat(quantizer, dimension, nlist)
index.train(embeddings)
index.add(embeddings)
nbits = 8
index = faiss.IndexIVFPQ(quantizer, dimension, nlist, 8, nbits)
index.train(embeddings)
index.add(embeddings)向量检索算法
常见算法
| 算法 | 原理 | 优点 | 缺点 |
|---|---|---|---|
| 暴力搜索 | 计算所有距离 | 精确结果 | 速度慢 |
| IVF | 聚类后搜索 | 速度快 | 需要训练 |
| HNSW | 图结构导航 | 高召回率 | 内存占用大 |
| PQ | 乘积量化 | 内存效率高 | 精度损失 |
HNSW示例
python
from langchain.vectorstores import Chroma
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
collection_metadata={"hnsw:space": "cosine", "hnsw:construction_ef": 200}
)性能优化
批量插入
python
batch_size = 100
for i in range(0, len(documents), batch_size):
batch = documents[i:i+batch_size]
vectorstore.add_documents(batch)索引优化
python
if collection.num_entities > 100000:
collection.release()
collection.create_index(
field_name="embedding",
index_params={
"metric_type": "COSINE",
"index_type": "IVF_PQ",
"params": {"nlist": 1024, "m": 8, "nbits": 8}
}
)
collection.load()缓存策略
python
from functools import lru_cache
@lru_cache(maxsize=1000)
def get_embedding_cached(text):
return get_embedding(text)
class CachedVectorStore:
def __init__(self, vectorstore):
self.vectorstore = vectorstore
self.query_cache = {}
def similarity_search(self, query, k=5):
cache_key = hash(query)
if cache_key in self.query_cache:
return self.query_cache[cache_key]
results = self.vectorstore.similarity_search(query, k=k)
self.query_cache[cache_key] = results
return results数据管理
删除文档
python
collection.delete(
expr=f'source == "{file_path}"'
)
index.delete(ids=["doc_1", "doc_2"])更新文档
python
def update_document(doc_id, new_content):
new_embedding = get_embedding(new_content)
index.upsert([{
"id": doc_id,
"values": new_embedding,
"metadata": {"text": new_content}
}])备份与恢复
python
import shutil
from datetime import datetime
def backup_vectorstore(source_dir, backup_dir):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = f"{backup_dir}/backup_{timestamp}"
shutil.copytree(source_dir, backup_path)
return backup_path
backup_path = backup_vectorstore("./chroma_db", "./backups")选型建议
开发测试阶段
推荐 Chroma 或 FAISS:
- 安装简单,无需额外服务
- 本地运行,快速迭代
- 足够满足功能验证需求
生产环境小规模
推荐 Pinecone 或 Qdrant:
- 托管服务,运维成本低
- 稳定可靠,易于扩展
- 文档完善,社区活跃
生产环境大规模
推荐 Milvus 或 Weaviate:
- 高性能,支持分布式
- 功能丰富,生态完善
- 可自主控制数据安全
已有PostgreSQL基础设施
推荐 pgvector:
- 无需新增组件
- 事务支持
- 与关系数据无缝集成
小结
向量数据库选型需要综合考虑性能、成本、易用性和扩展性。开发阶段优先选择轻量级方案,生产环境根据规模选择托管服务或自托管方案。掌握主流向量数据库的使用方法,能够根据实际需求做出合理选择。
下一章将深入讲解检索优化技巧,提升RAG系统的检索质量和效果。