Skip to content

向量数据库选型与使用

概述

向量数据库是RAG系统的核心存储组件,负责存储文档向量并支持高效的相似度检索。选择合适的向量数据库需要考虑性能、扩展性、成本和易用性等多个维度。

向量数据库对比

主流向量数据库

数据库类型特点适用场景
Pinecone云服务托管服务,易用生产环境快速部署
Milvus开源高性能,可扩展大规模企业应用
Chroma开源轻量级,易集成开发测试、小规模应用
FAISS开源库Meta出品,高性能本地部署、研究
Weaviate开源语义搜索强复杂查询场景
Qdrant开源Rust实现,高效高性能需求
pgvectorPostgreSQL扩展关系数据库集成已有PG基础设施

详细对比

维度PineconeMilvusChromaFAISS
部署方式云托管自托管/云本地本地库
学习曲线
性能
扩展性自动手动有限手动
成本按量付费免费/云付费免费免费
生产就绪测试阶段需封装

Pinecone使用

初始化与连接

python
import pinecone

pinecone.init(
    api_key="your-api-key",
    environment="us-east-1"
)

index_name = "my-index"
if index_name not in pinecone.list_indexes():
    pinecone.create_index(
        name=index_name,
        dimension=1536,
        metric="cosine",
        pod_type="p1.x1"
    )

index = pinecone.Index(index_name)

插入向量

python
from openai import OpenAI

client = OpenAI()

def get_embedding(text):
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=text
    )
    return response.data[0].embedding

vectors = []
for i, doc in enumerate(documents):
    embedding = get_embedding(doc.page_content)
    vectors.append({
        "id": f"doc_{i}",
        "values": embedding,
        "metadata": {
            "text": doc.page_content,
            "source": doc.metadata.get("source", "")
        }
    })

index.upsert(vectors=vectors)

查询向量

python
query_embedding = get_embedding("查询文本")

results = index.query(
    vector=query_embedding,
    top_k=5,
    include_metadata=True
)

for match in results.matches:
    print(f"相似度: {match.score}")
    print(f"内容: {match.metadata['text'][:100]}")

Milvus使用

安装与启动

bash
docker run -d --name milvus-standalone \
  -p 19530:19530 \
  -p 9091:9091 \
  milvusdb/milvus:latest standalone

Python客户端

python
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType

connections.connect("default", host="localhost", port="19530")

fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=2000)
]

schema = CollectionSchema(fields, "文档向量集合")
collection = Collection("documents", schema)

index_params = {
    "metric_type": "COSINE",
    "index_type": "IVF_FLAT",
    "params": {"nlist": 128}
}
collection.create_index("embedding", index_params)

插入与查询

python
entities = [
    [get_embedding(doc.page_content) for doc in documents],
    [doc.page_content for doc in documents]
]

collection.insert(entities)
collection.load()

search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}}

results = collection.search(
    data=[query_embedding],
    anns_field="embedding",
    param=search_params,
    limit=5,
    output_fields=["text"]
)

for hits in results:
    for hit in hits:
        print(f"相似度: {hit.distance}")
        print(f"内容: {hit.entity.get('text')[:100]}")

Chroma使用

快速开始

python
import chromadb
from chromadb.config import Settings

client = chromadb.Client(Settings(
    chroma_db_impl="duckdb+parquet",
    persist_directory="./chroma_db"
))

collection = client.create_collection("documents")

添加文档

python
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma

embeddings = OpenAIEmbeddings()

vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embeddings,
    persist_directory="./chroma_db"
)

vectorstore.persist()

检索文档

python
results = vectorstore.similarity_search(
    query="查询内容",
    k=5
)

for doc in results:
    print(doc.page_content)
    print(doc.metadata)

results_with_scores = vectorstore.similarity_search_with_score(
    query="查询内容",
    k=5
)

for doc, score in results_with_scores:
    print(f"相似度: {score}")
    print(doc.page_content)

FAISS使用

基础用法

python
import faiss
import numpy as np

dimension = 1536
index = faiss.IndexFlatIP(dimension)

embeddings = np.array([get_embedding(doc.page_content) for doc in documents]).astype('float32')
faiss.normalize_L2(embeddings)

index.add(embeddings)

query_embedding = np.array([get_embedding("查询内容")]).astype('float32')
faiss.normalize_L2(query_embedding)

k = 5
distances, indices = index.search(query_embedding, k)

for i, idx in enumerate(indices[0]):
    print(f"相似度: {distances[0][i]}")
    print(documents[idx].page_content[:100])

使用LangChain封装

python
from langchain.vectorstores import FAISS

vectorstore = FAISS.from_documents(
    documents=chunks,
    embedding=embeddings
)

vectorstore.save_local("./faiss_index")

loaded_vectorstore = FAISS.load_local(
    "./faiss_index",
    embeddings
)

results = loaded_vectorstore.similarity_search("查询内容", k=5)

FAISS索引类型

python
nlist = 100
quantizer = faiss.IndexFlatIP(dimension)
index = faiss.IndexIVFFlat(quantizer, dimension, nlist)

index.train(embeddings)
index.add(embeddings)

nbits = 8
index = faiss.IndexIVFPQ(quantizer, dimension, nlist, 8, nbits)
index.train(embeddings)
index.add(embeddings)

向量检索算法

常见算法

算法原理优点缺点
暴力搜索计算所有距离精确结果速度慢
IVF聚类后搜索速度快需要训练
HNSW图结构导航高召回率内存占用大
PQ乘积量化内存效率高精度损失

HNSW示例

python
from langchain.vectorstores import Chroma

vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embeddings,
    collection_metadata={"hnsw:space": "cosine", "hnsw:construction_ef": 200}
)

性能优化

批量插入

python
batch_size = 100
for i in range(0, len(documents), batch_size):
    batch = documents[i:i+batch_size]
    vectorstore.add_documents(batch)

索引优化

python
if collection.num_entities > 100000:
    collection.release()
    collection.create_index(
        field_name="embedding",
        index_params={
            "metric_type": "COSINE",
            "index_type": "IVF_PQ",
            "params": {"nlist": 1024, "m": 8, "nbits": 8}
        }
    )
    collection.load()

缓存策略

python
from functools import lru_cache

@lru_cache(maxsize=1000)
def get_embedding_cached(text):
    return get_embedding(text)

class CachedVectorStore:
    def __init__(self, vectorstore):
        self.vectorstore = vectorstore
        self.query_cache = {}
    
    def similarity_search(self, query, k=5):
        cache_key = hash(query)
        if cache_key in self.query_cache:
            return self.query_cache[cache_key]
        
        results = self.vectorstore.similarity_search(query, k=k)
        self.query_cache[cache_key] = results
        return results

数据管理

删除文档

python
collection.delete(
    expr=f'source == "{file_path}"'
)

index.delete(ids=["doc_1", "doc_2"])

更新文档

python
def update_document(doc_id, new_content):
    new_embedding = get_embedding(new_content)
    
    index.upsert([{
        "id": doc_id,
        "values": new_embedding,
        "metadata": {"text": new_content}
    }])

备份与恢复

python
import shutil
from datetime import datetime

def backup_vectorstore(source_dir, backup_dir):
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_path = f"{backup_dir}/backup_{timestamp}"
    shutil.copytree(source_dir, backup_path)
    return backup_path

backup_path = backup_vectorstore("./chroma_db", "./backups")

选型建议

开发测试阶段

推荐 ChromaFAISS

  • 安装简单,无需额外服务
  • 本地运行,快速迭代
  • 足够满足功能验证需求

生产环境小规模

推荐 PineconeQdrant

  • 托管服务,运维成本低
  • 稳定可靠,易于扩展
  • 文档完善,社区活跃

生产环境大规模

推荐 MilvusWeaviate

  • 高性能,支持分布式
  • 功能丰富,生态完善
  • 可自主控制数据安全

已有PostgreSQL基础设施

推荐 pgvector

  • 无需新增组件
  • 事务支持
  • 与关系数据无缝集成

小结

向量数据库选型需要综合考虑性能、成本、易用性和扩展性。开发阶段优先选择轻量级方案,生产环境根据规模选择托管服务或自托管方案。掌握主流向量数据库的使用方法,能够根据实际需求做出合理选择。

下一章将深入讲解检索优化技巧,提升RAG系统的检索质量和效果。