企业知识库搭建
项目概述
企业知识库是一个基于RAG(检索增强生成)技术的智能文档管理系统,能够将企业内部文档、知识资料转化为可交互的智能问答系统。本项目将构建一个完整的企业知识库平台,支持文档管理、智能检索、权限控制等功能。
项目目标
- 实现文档的智能导入和处理
- 构建高效的向量检索系统
- 提供精准的知识问答服务
- 支持多租户和权限管理
- 提供完整的知识管理界面
应用场景
- 企业内部知识管理
- 产品文档智能问答
- 技术支持知识库
- 培训资料检索系统
技术架构
整体架构
┌─────────────────────────────────────────────┐
│ 知识库管理界面 │
│ (文档上传、检索、问答、管理) │
└─────────────────┬───────────────────────────┘
│
┌─────────────────▼───────────────────────────┐
│ API服务层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │文档处理 │ │向量检索 │ │权限控制 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────┬───────────────────────────┘
│
┌─────────────────▼───────────────────────────┐
│ 数据存储层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │PostgreSQL│ │向量数据库│ │对象存储 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────┬───────────────────────────┘
│
┌─────────────────▼───────────────────────────┐
│ AI服务层 │
│ (Embedding模型 + 大语言模型) │
└─────────────────────────────────────────────┘技术栈选择
后端技术栈
- Python 3.10+
- FastAPI + Celery
- LangChain / LlamaIndex
- PostgreSQL + pgvector
- MinIO / S3 (对象存储)
- Redis (缓存)
向量数据库
- Milvus (生产环境推荐)
- Chroma (开发测试)
- Pinecone (云端方案)
- FAISS (本地部署)
前端技术栈
- React 18+ / Next.js
- TypeScript
- Ant Design / Shadcn UI
- React Query
AI服务
- OpenAI Embeddings
- 通义千问 Embedding
- BGE / M3E (开源模型)
核心功能
1. 文档处理与分块
python
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import (
PyPDFLoader,
Docx2txtLoader,
UnstructuredMarkdownLoader
)
from typing import List
import hashlib
class DocumentProcessor:
def __init__(
self,
chunk_size=1000,
chunk_overlap=200
):
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", "。", "!", "?", ";", " ", ""]
)
def load_document(self, file_path: str) -> List:
if file_path.endswith('.pdf'):
loader = PyPDFLoader(file_path)
elif file_path.endswith('.docx'):
loader = Docx2txtLoader(file_path)
elif file_path.endswith('.md'):
loader = UnstructuredMarkdownLoader(file_path)
else:
raise ValueError(f"不支持的文件格式: {file_path}")
return loader.load()
def split_documents(self, documents: List) -> List:
chunks = self.text_splitter.split_documents(documents)
for chunk in chunks:
chunk.metadata['chunk_id'] = self._generate_chunk_id(
chunk.page_content
)
return chunks
def _generate_chunk_id(self, content: str) -> str:
return hashlib.md5(content.encode()).hexdigest()2. 向量存储与检索
python
from langchain.vectorstores import Milvus
from langchain.embeddings import OpenAIEmbeddings
from typing import List, Optional
class VectorStore:
def __init__(
self,
collection_name: str,
embedding_model: str = "text-embedding-3-small"
):
self.embeddings = OpenAIEmbeddings(model=embedding_model)
self.collection_name = collection_name
self.vectorstore = None
def create_collection(self):
from pymilvus import connections, utility
connections.connect("default", host="localhost", port="19530")
if utility.has_collection(self.collection_name):
utility.drop_collection(self.collection_name)
self.vectorstore = Milvus.from_documents(
documents=[],
embedding=self.embeddings,
collection_name=self.collection_name,
connection_args={"host": "localhost", "port": "19530"}
)
def add_documents(
self,
documents: List,
metadatas: Optional[List[dict]] = None
):
if not self.vectorstore:
self.vectorstore = Milvus.from_documents(
documents=documents,
embedding=self.embeddings,
collection_name=self.collection_name,
connection_args={"host": "localhost", "port": "19530"}
)
else:
self.vectorstore.add_documents(documents)
def similarity_search(
self,
query: str,
k: int = 5,
filter: Optional[dict] = None
) -> List:
results = self.vectorstore.similarity_search(
query=query,
k=k,
filter=filter
)
return results
def hybrid_search(
self,
query: str,
k: int = 5
) -> List:
results = self.vectorstore.similarity_search(
query=query,
k=k * 2
)
reranked = self._rerank(query, results)
return reranked[:k]
def _rerank(self, query: str, results: List) -> List:
from sentence_transformers import CrossEncoder
model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
pairs = [(query, doc.page_content) for doc in results]
scores = model.predict(pairs)
scored_results = list(zip(results, scores))
scored_results.sort(key=lambda x: x[1], reverse=True)
return [doc for doc, score in scored_results]3. RAG问答系统
python
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain.chat_models import ChatOpenAI
class KnowledgeQA:
def __init__(self, vectorstore, llm_model="gpt-4"):
self.llm = ChatOpenAI(model=llm_model, temperature=0)
self.vectorstore = vectorstore
prompt_template = """你是一个专业的知识库助手。请基于以下上下文回答用户问题。
上下文:
{context}
问题:{question}
要求:
1. 仅基于上下文信息回答,不要编造内容
2. 如果上下文中没有相关信息,请明确告知
3. 回答要准确、简洁、专业
4. 引用相关的文档来源
答案:"""
self.prompt = PromptTemplate(
template=prompt_template,
input_variables=["context", "question"]
)
self.qa_chain = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type="stuff",
retriever=self.vectorstore.as_retriever(
search_kwargs={"k": 5}
),
return_source_documents=True,
chain_type_kwargs={"prompt": self.prompt}
)
def answer(self, question: str) -> dict:
result = self.qa_chain({"query": question})
sources = []
for doc in result.get("source_documents", []):
sources.append({
"content": doc.page_content[:200],
"metadata": doc.metadata
})
return {
"answer": result["result"],
"sources": sources,
"confidence": self._calculate_confidence(result)
}
def _calculate_confidence(self, result: dict) -> float:
source_count = len(result.get("source_documents", []))
if source_count >= 3:
return 0.9
elif source_count >= 1:
return 0.7
else:
return 0.34. 权限管理系统
python
from sqlalchemy import Column, Integer, String, Boolean, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
from datetime import datetime
import hashlib
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
password_hash = Column(String)
role = Column(String)
tenant_id = Column(Integer, index=True)
created_at = Column(DateTime, default=datetime.utcnow)
class Document(Base):
__tablename__ = "documents"
id = Column(Integer, primary_key=True, index=True)
title = Column(String)
file_path = Column(String)
tenant_id = Column(Integer, index=True)
created_by = Column(Integer)
is_public = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
class PermissionManager:
def __init__(self, db: Session):
self.db = db
def check_permission(
self,
user_id: int,
document_id: int,
action: str
) -> bool:
user = self.db.query(User).filter(User.id == user_id).first()
document = self.db.query(Document).filter(
Document.id == document_id
).first()
if not user or not document:
return False
if user.role == "admin":
return True
if document.is_public and action in ["read", "search"]:
return True
if user.tenant_id == document.tenant_id:
return True
return False
def get_user_documents(self, user_id: int) -> List[int]:
user = self.db.query(User).filter(User.id == user_id).first()
query = self.db.query(Document)
if user.role != "admin":
query = query.filter(
(Document.tenant_id == user.tenant_id) |
(Document.is_public == True)
)
return [doc.id for doc in query.all()]实现步骤
步骤1:环境准备
bash
# 创建项目
mkdir enterprise-knowledge-base
cd enterprise-knowledge-base
# 后端环境
python -m venv venv
source venv/bin/activate
pip install fastapi uvicorn langchain openai
pip install pymilvus psycopg2-binary sqlalchemy redis
pip install python-multipart pypdf docx2txt
# 前端环境
npx create-next-app@latest frontend --typescript
cd frontend
npm install @tanstack/react-query axios antd步骤2:数据库初始化
sql
-- PostgreSQL with pgvector
CREATE DATABASE knowledge_base;
\c knowledge_base;
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE documents (
id SERIAL PRIMARY KEY,
title VARCHAR(500),
content TEXT,
metadata JSONB,
embedding VECTOR(1536),
tenant_id INTEGER,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX ON documents USING ivfflat (embedding vector_cosine_ops);
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(100) UNIQUE,
password_hash VARCHAR(255),
role VARCHAR(50),
tenant_id INTEGER
);步骤3:后端API实现
python
from fastapi import FastAPI, UploadFile, File, Depends, HTTPException
from fastapi.security import HTTPBearer
from typing import List
import os
app = FastAPI(title="企业知识库API")
security = HTTPBearer()
@app.post("/documents/upload")
async def upload_document(
file: UploadFile = File(...),
tenant_id: int = Depends(get_current_tenant)
):
file_path = f"uploads/{tenant_id}/{file.filename}"
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "wb") as buffer:
content = await file.read()
buffer.write(content)
processor = DocumentProcessor()
documents = processor.load_document(file_path)
chunks = processor.split_documents(documents)
for chunk in chunks:
chunk.metadata['tenant_id'] = tenant_id
chunk.metadata['source'] = file.filename
vector_store = VectorStore(f"tenant_{tenant_id}")
vector_store.add_documents(chunks)
return {
"message": "文档上传成功",
"chunks_count": len(chunks)
}
@app.post("/qa/ask")
async def ask_question(
question: str,
tenant_id: int = Depends(get_current_tenant),
top_k: int = 5
):
vector_store = VectorStore(f"tenant_{tenant_id}")
qa_system = KnowledgeQA(vector_store.vectorstore)
result = qa_system.answer(question)
return result
@app.get("/documents/search")
async def search_documents(
query: str,
tenant_id: int = Depends(get_current_tenant),
top_k: int = 10
):
vector_store = VectorStore(f"tenant_{tenant_id}")
results = vector_store.similarity_search(
query=query,
k=top_k,
filter={"tenant_id": tenant_id}
)
return {
"results": [
{
"content": doc.page_content,
"metadata": doc.metadata,
"score": doc.metadata.get('score', 0)
}
for doc in results
]
}步骤4:前端界面实现
typescript
import React, { useState } from 'react';
import { Upload, message, Input, Button, Card, List } from 'antd';
import { useQuery, useMutation } from '@tanstack/react-query';
import axios from 'axios';
const KnowledgeBase: React.FC = () => {
const [question, setQuestion] = useState('');
const [answer, setAnswer] = useState<any>(null);
const uploadMutation = useMutation({
mutationFn: (file: File) => {
const formData = new FormData();
formData.append('file', file);
return axios.post('/api/documents/upload', formData);
},
onSuccess: () => {
message.success('文档上传成功');
},
});
const askMutation = useMutation({
mutationFn: (q: string) =>
axios.post('/api/qa/ask', { question: q }),
onSuccess: (data) => {
setAnswer(data.data);
},
});
const handleAsk = () => {
if (question.trim()) {
askMutation.mutate(question);
}
};
return (
<div className="p-6 max-w-6xl mx-auto">
<h1 className="text-2xl font-bold mb-6">企业知识库</h1>
<Card title="文档上传" className="mb-6">
<Upload
beforeUpload={(file) => {
uploadMutation.mutate(file);
return false;
}}
accept=".pdf,.docx,.md,.txt"
>
<Button>上传文档</Button>
</Upload>
</Card>
<Card title="智能问答" className="mb-6">
<div className="flex gap-2 mb-4">
<Input
value={question}
onChange={(e) => setQuestion(e.target.value)}
placeholder="输入您的问题..."
size="large"
/>
<Button
type="primary"
size="large"
onClick={handleAsk}
loading={askMutation.isPending}
>
提问
</Button>
</div>
{answer && (
<div className="mt-4">
<h3 className="font-semibold mb-2">答案:</h3>
<p className="mb-4">{answer.answer}</p>
<h3 className="font-semibold mb-2">参考来源:</h3>
<List
dataSource={answer.sources}
renderItem={(source: any) => (
<List.Item>
<div>
<p className="text-sm text-gray-600">
{source.content}...
</p>
<p className="text-xs text-gray-400">
来源: {source.metadata.source}
</p>
</div>
</List.Item>
)}
/>
</div>
)}
</Card>
</div>
);
};
export default KnowledgeBase;步骤5:Celery异步任务
python
from celery import Celery
from app.services.document_processor import DocumentProcessor
from app.services.vector_store import VectorStore
celery_app = Celery(
'knowledge_base',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
@celery_app.task
def process_document_task(file_path: str, tenant_id: int):
processor = DocumentProcessor()
documents = processor.load_document(file_path)
chunks = processor.split_documents(documents)
for chunk in chunks:
chunk.metadata['tenant_id'] = tenant_id
vector_store = VectorStore(f"tenant_{tenant_id}")
vector_store.add_documents(chunks)
return {
"status": "completed",
"chunks_count": len(chunks)
}
@celery_app.task
def reindex_tenant_documents(tenant_id: int):
pass步骤6:监控与日志
python
import logging
from prometheus_client import Counter, Histogram
import time
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
REQUEST_COUNT = Counter(
'kb_requests_total',
'Total requests',
['method', 'endpoint']
)
REQUEST_LATENCY = Histogram(
'kb_request_latency_seconds',
'Request latency',
['endpoint']
)
def monitor_request(endpoint: str):
def decorator(func):
async def wrapper(*args, **kwargs):
start_time = time.time()
REQUEST_COUNT.labels(
method='POST',
endpoint=endpoint
).inc()
try:
result = await func(*args, **kwargs)
return result
finally:
latency = time.time() - start_time
REQUEST_LATENCY.labels(endpoint=endpoint).observe(latency)
logger.info(f"{endpoint} - {latency:.2f}s")
return wrapper
return decorator小结
本章介绍了企业知识库的完整搭建流程,包括:
核心要点
- 文档处理与分块是知识库的基础
- 向量数据库实现高效语义检索
- RAG技术结合检索与生成能力
- 权限管理保证数据安全
技术亮点
- 多格式文档支持(PDF、Word、Markdown)
- 混合检索提升召回质量
- 多租户架构支持企业级应用
- 异步任务处理大批量文档
优化方向
- 增加文档版本管理
- 实现知识图谱构建
- 添加文档自动分类
- 支持多模态内容(图片、表格)
通过本项目的学习,你已经掌握了构建企业知识库的核心技术,可以为企业打造智能化的知识管理平台。