Skip to content

企业知识库搭建

项目概述

企业知识库是一个基于RAG(检索增强生成)技术的智能文档管理系统,能够将企业内部文档、知识资料转化为可交互的智能问答系统。本项目将构建一个完整的企业知识库平台,支持文档管理、智能检索、权限控制等功能。

项目目标

  • 实现文档的智能导入和处理
  • 构建高效的向量检索系统
  • 提供精准的知识问答服务
  • 支持多租户和权限管理
  • 提供完整的知识管理界面

应用场景

  • 企业内部知识管理
  • 产品文档智能问答
  • 技术支持知识库
  • 培训资料检索系统

技术架构

整体架构

┌─────────────────────────────────────────────┐
│            知识库管理界面                    │
│      (文档上传、检索、问答、管理)            │
└─────────────────┬───────────────────────────┘

┌─────────────────▼───────────────────────────┐
│              API服务层                       │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐ │
│  │文档处理  │  │向量检索  │  │权限控制  │ │
│  └──────────┘  └──────────┘  └──────────┘ │
└─────────────────┬───────────────────────────┘

┌─────────────────▼───────────────────────────┐
│            数据存储层                        │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐ │
│  │PostgreSQL│  │向量数据库│  │对象存储  │ │
│  └──────────┘  └──────────┘  └──────────┘ │
└─────────────────┬───────────────────────────┘

┌─────────────────▼───────────────────────────┐
│            AI服务层                          │
│   (Embedding模型 + 大语言模型)              │
└─────────────────────────────────────────────┘

技术栈选择

后端技术栈

  • Python 3.10+
  • FastAPI + Celery
  • LangChain / LlamaIndex
  • PostgreSQL + pgvector
  • MinIO / S3 (对象存储)
  • Redis (缓存)

向量数据库

  • Milvus (生产环境推荐)
  • Chroma (开发测试)
  • Pinecone (云端方案)
  • FAISS (本地部署)

前端技术栈

  • React 18+ / Next.js
  • TypeScript
  • Ant Design / Shadcn UI
  • React Query

AI服务

  • OpenAI Embeddings
  • 通义千问 Embedding
  • BGE / M3E (开源模型)

核心功能

1. 文档处理与分块

python
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import (
    PyPDFLoader,
    Docx2txtLoader,
    UnstructuredMarkdownLoader
)
from typing import List
import hashlib

class DocumentProcessor:
    def __init__(
        self,
        chunk_size=1000,
        chunk_overlap=200
    ):
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=["\n\n", "\n", "。", "!", "?", ";", " ", ""]
        )
    
    def load_document(self, file_path: str) -> List:
        if file_path.endswith('.pdf'):
            loader = PyPDFLoader(file_path)
        elif file_path.endswith('.docx'):
            loader = Docx2txtLoader(file_path)
        elif file_path.endswith('.md'):
            loader = UnstructuredMarkdownLoader(file_path)
        else:
            raise ValueError(f"不支持的文件格式: {file_path}")
        
        return loader.load()
    
    def split_documents(self, documents: List) -> List:
        chunks = self.text_splitter.split_documents(documents)
        
        for chunk in chunks:
            chunk.metadata['chunk_id'] = self._generate_chunk_id(
                chunk.page_content
            )
        
        return chunks
    
    def _generate_chunk_id(self, content: str) -> str:
        return hashlib.md5(content.encode()).hexdigest()

2. 向量存储与检索

python
from langchain.vectorstores import Milvus
from langchain.embeddings import OpenAIEmbeddings
from typing import List, Optional

class VectorStore:
    def __init__(
        self,
        collection_name: str,
        embedding_model: str = "text-embedding-3-small"
    ):
        self.embeddings = OpenAIEmbeddings(model=embedding_model)
        self.collection_name = collection_name
        self.vectorstore = None
    
    def create_collection(self):
        from pymilvus import connections, utility
        
        connections.connect("default", host="localhost", port="19530")
        
        if utility.has_collection(self.collection_name):
            utility.drop_collection(self.collection_name)
        
        self.vectorstore = Milvus.from_documents(
            documents=[],
            embedding=self.embeddings,
            collection_name=self.collection_name,
            connection_args={"host": "localhost", "port": "19530"}
        )
    
    def add_documents(
        self,
        documents: List,
        metadatas: Optional[List[dict]] = None
    ):
        if not self.vectorstore:
            self.vectorstore = Milvus.from_documents(
                documents=documents,
                embedding=self.embeddings,
                collection_name=self.collection_name,
                connection_args={"host": "localhost", "port": "19530"}
            )
        else:
            self.vectorstore.add_documents(documents)
    
    def similarity_search(
        self,
        query: str,
        k: int = 5,
        filter: Optional[dict] = None
    ) -> List:
        results = self.vectorstore.similarity_search(
            query=query,
            k=k,
            filter=filter
        )
        return results
    
    def hybrid_search(
        self,
        query: str,
        k: int = 5
    ) -> List:
        results = self.vectorstore.similarity_search(
            query=query,
            k=k * 2
        )
        
        reranked = self._rerank(query, results)
        return reranked[:k]
    
    def _rerank(self, query: str, results: List) -> List:
        from sentence_transformers import CrossEncoder
        
        model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
        
        pairs = [(query, doc.page_content) for doc in results]
        scores = model.predict(pairs)
        
        scored_results = list(zip(results, scores))
        scored_results.sort(key=lambda x: x[1], reverse=True)
        
        return [doc for doc, score in scored_results]

3. RAG问答系统

python
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain.chat_models import ChatOpenAI

class KnowledgeQA:
    def __init__(self, vectorstore, llm_model="gpt-4"):
        self.llm = ChatOpenAI(model=llm_model, temperature=0)
        self.vectorstore = vectorstore
        
        prompt_template = """你是一个专业的知识库助手。请基于以下上下文回答用户问题。

上下文:
{context}

问题:{question}

要求:
1. 仅基于上下文信息回答,不要编造内容
2. 如果上下文中没有相关信息,请明确告知
3. 回答要准确、简洁、专业
4. 引用相关的文档来源

答案:"""
        
        self.prompt = PromptTemplate(
            template=prompt_template,
            input_variables=["context", "question"]
        )
        
        self.qa_chain = RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type="stuff",
            retriever=self.vectorstore.as_retriever(
                search_kwargs={"k": 5}
            ),
            return_source_documents=True,
            chain_type_kwargs={"prompt": self.prompt}
        )
    
    def answer(self, question: str) -> dict:
        result = self.qa_chain({"query": question})
        
        sources = []
        for doc in result.get("source_documents", []):
            sources.append({
                "content": doc.page_content[:200],
                "metadata": doc.metadata
            })
        
        return {
            "answer": result["result"],
            "sources": sources,
            "confidence": self._calculate_confidence(result)
        }
    
    def _calculate_confidence(self, result: dict) -> float:
        source_count = len(result.get("source_documents", []))
        if source_count >= 3:
            return 0.9
        elif source_count >= 1:
            return 0.7
        else:
            return 0.3

4. 权限管理系统

python
from sqlalchemy import Column, Integer, String, Boolean, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
from datetime import datetime
import hashlib

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, unique=True, index=True)
    password_hash = Column(String)
    role = Column(String)
    tenant_id = Column(Integer, index=True)
    created_at = Column(DateTime, default=datetime.utcnow)

class Document(Base):
    __tablename__ = "documents"
    
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String)
    file_path = Column(String)
    tenant_id = Column(Integer, index=True)
    created_by = Column(Integer)
    is_public = Column(Boolean, default=False)
    created_at = Column(DateTime, default=datetime.utcnow)

class PermissionManager:
    def __init__(self, db: Session):
        self.db = db
    
    def check_permission(
        self,
        user_id: int,
        document_id: int,
        action: str
    ) -> bool:
        user = self.db.query(User).filter(User.id == user_id).first()
        document = self.db.query(Document).filter(
            Document.id == document_id
        ).first()
        
        if not user or not document:
            return False
        
        if user.role == "admin":
            return True
        
        if document.is_public and action in ["read", "search"]:
            return True
        
        if user.tenant_id == document.tenant_id:
            return True
        
        return False
    
    def get_user_documents(self, user_id: int) -> List[int]:
        user = self.db.query(User).filter(User.id == user_id).first()
        
        query = self.db.query(Document)
        
        if user.role != "admin":
            query = query.filter(
                (Document.tenant_id == user.tenant_id) |
                (Document.is_public == True)
            )
        
        return [doc.id for doc in query.all()]

实现步骤

步骤1:环境准备

bash
# 创建项目
mkdir enterprise-knowledge-base
cd enterprise-knowledge-base

# 后端环境
python -m venv venv
source venv/bin/activate
pip install fastapi uvicorn langchain openai
pip install pymilvus psycopg2-binary sqlalchemy redis
pip install python-multipart pypdf docx2txt

# 前端环境
npx create-next-app@latest frontend --typescript
cd frontend
npm install @tanstack/react-query axios antd

步骤2:数据库初始化

sql
-- PostgreSQL with pgvector
CREATE DATABASE knowledge_base;

\c knowledge_base;

CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE documents (
    id SERIAL PRIMARY KEY,
    title VARCHAR(500),
    content TEXT,
    metadata JSONB,
    embedding VECTOR(1536),
    tenant_id INTEGER,
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX ON documents USING ivfflat (embedding vector_cosine_ops);

CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(100) UNIQUE,
    password_hash VARCHAR(255),
    role VARCHAR(50),
    tenant_id INTEGER
);

步骤3:后端API实现

python
from fastapi import FastAPI, UploadFile, File, Depends, HTTPException
from fastapi.security import HTTPBearer
from typing import List
import os

app = FastAPI(title="企业知识库API")
security = HTTPBearer()

@app.post("/documents/upload")
async def upload_document(
    file: UploadFile = File(...),
    tenant_id: int = Depends(get_current_tenant)
):
    file_path = f"uploads/{tenant_id}/{file.filename}"
    os.makedirs(os.path.dirname(file_path), exist_ok=True)
    
    with open(file_path, "wb") as buffer:
        content = await file.read()
        buffer.write(content)
    
    processor = DocumentProcessor()
    documents = processor.load_document(file_path)
    chunks = processor.split_documents(documents)
    
    for chunk in chunks:
        chunk.metadata['tenant_id'] = tenant_id
        chunk.metadata['source'] = file.filename
    
    vector_store = VectorStore(f"tenant_{tenant_id}")
    vector_store.add_documents(chunks)
    
    return {
        "message": "文档上传成功",
        "chunks_count": len(chunks)
    }

@app.post("/qa/ask")
async def ask_question(
    question: str,
    tenant_id: int = Depends(get_current_tenant),
    top_k: int = 5
):
    vector_store = VectorStore(f"tenant_{tenant_id}")
    qa_system = KnowledgeQA(vector_store.vectorstore)
    
    result = qa_system.answer(question)
    
    return result

@app.get("/documents/search")
async def search_documents(
    query: str,
    tenant_id: int = Depends(get_current_tenant),
    top_k: int = 10
):
    vector_store = VectorStore(f"tenant_{tenant_id}")
    
    results = vector_store.similarity_search(
        query=query,
        k=top_k,
        filter={"tenant_id": tenant_id}
    )
    
    return {
        "results": [
            {
                "content": doc.page_content,
                "metadata": doc.metadata,
                "score": doc.metadata.get('score', 0)
            }
            for doc in results
        ]
    }

步骤4:前端界面实现

typescript
import React, { useState } from 'react';
import { Upload, message, Input, Button, Card, List } from 'antd';
import { useQuery, useMutation } from '@tanstack/react-query';
import axios from 'axios';

const KnowledgeBase: React.FC = () => {
  const [question, setQuestion] = useState('');
  const [answer, setAnswer] = useState<any>(null);

  const uploadMutation = useMutation({
    mutationFn: (file: File) => {
      const formData = new FormData();
      formData.append('file', file);
      return axios.post('/api/documents/upload', formData);
    },
    onSuccess: () => {
      message.success('文档上传成功');
    },
  });

  const askMutation = useMutation({
    mutationFn: (q: string) =>
      axios.post('/api/qa/ask', { question: q }),
    onSuccess: (data) => {
      setAnswer(data.data);
    },
  });

  const handleAsk = () => {
    if (question.trim()) {
      askMutation.mutate(question);
    }
  };

  return (
    <div className="p-6 max-w-6xl mx-auto">
      <h1 className="text-2xl font-bold mb-6">企业知识库</h1>
      
      <Card title="文档上传" className="mb-6">
        <Upload
          beforeUpload={(file) => {
            uploadMutation.mutate(file);
            return false;
          }}
          accept=".pdf,.docx,.md,.txt"
        >
          <Button>上传文档</Button>
        </Upload>
      </Card>

      <Card title="智能问答" className="mb-6">
        <div className="flex gap-2 mb-4">
          <Input
            value={question}
            onChange={(e) => setQuestion(e.target.value)}
            placeholder="输入您的问题..."
            size="large"
          />
          <Button
            type="primary"
            size="large"
            onClick={handleAsk}
            loading={askMutation.isPending}
          >
            提问
          </Button>
        </div>

        {answer && (
          <div className="mt-4">
            <h3 className="font-semibold mb-2">答案:</h3>
            <p className="mb-4">{answer.answer}</p>
            
            <h3 className="font-semibold mb-2">参考来源:</h3>
            <List
              dataSource={answer.sources}
              renderItem={(source: any) => (
                <List.Item>
                  <div>
                    <p className="text-sm text-gray-600">
                      {source.content}...
                    </p>
                    <p className="text-xs text-gray-400">
                      来源: {source.metadata.source}
                    </p>
                  </div>
                </List.Item>
              )}
            />
          </div>
        )}
      </Card>
    </div>
  );
};

export default KnowledgeBase;

步骤5:Celery异步任务

python
from celery import Celery
from app.services.document_processor import DocumentProcessor
from app.services.vector_store import VectorStore

celery_app = Celery(
    'knowledge_base',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'
)

@celery_app.task
def process_document_task(file_path: str, tenant_id: int):
    processor = DocumentProcessor()
    documents = processor.load_document(file_path)
    chunks = processor.split_documents(documents)
    
    for chunk in chunks:
        chunk.metadata['tenant_id'] = tenant_id
    
    vector_store = VectorStore(f"tenant_{tenant_id}")
    vector_store.add_documents(chunks)
    
    return {
        "status": "completed",
        "chunks_count": len(chunks)
    }

@celery_app.task
def reindex_tenant_documents(tenant_id: int):
    pass

步骤6:监控与日志

python
import logging
from prometheus_client import Counter, Histogram
import time

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

logger = logging.getLogger(__name__)

REQUEST_COUNT = Counter(
    'kb_requests_total',
    'Total requests',
    ['method', 'endpoint']
)

REQUEST_LATENCY = Histogram(
    'kb_request_latency_seconds',
    'Request latency',
    ['endpoint']
)

def monitor_request(endpoint: str):
    def decorator(func):
        async def wrapper(*args, **kwargs):
            start_time = time.time()
            
            REQUEST_COUNT.labels(
                method='POST',
                endpoint=endpoint
            ).inc()
            
            try:
                result = await func(*args, **kwargs)
                return result
            finally:
                latency = time.time() - start_time
                REQUEST_LATENCY.labels(endpoint=endpoint).observe(latency)
                logger.info(f"{endpoint} - {latency:.2f}s")
        
        return wrapper
    return decorator

小结

本章介绍了企业知识库的完整搭建流程,包括:

核心要点

  • 文档处理与分块是知识库的基础
  • 向量数据库实现高效语义检索
  • RAG技术结合检索与生成能力
  • 权限管理保证数据安全

技术亮点

  • 多格式文档支持(PDF、Word、Markdown)
  • 混合检索提升召回质量
  • 多租户架构支持企业级应用
  • 异步任务处理大批量文档

优化方向

  • 增加文档版本管理
  • 实现知识图谱构建
  • 添加文档自动分类
  • 支持多模态内容(图片、表格)

通过本项目的学习,你已经掌握了构建企业知识库的核心技术,可以为企业打造智能化的知识管理平台。