RAG(Retrieval-Augmented Generation,检索增强生成)是当前大语言模型应用中最热门的架构之一。它的核心思想很简单:在生成回答之前,先从知识库中检索相关信息,然后将检索到的内容作为上下文提供给大模型,从而生成更准确、更有依据的回答。

传统的纯LLM方案存在几个致命问题:
RAG通过引入外部知识检索环节,有效解决了这些问题。它让AI不再仅仅依赖参数化的记忆,而是能够实时访问和引用外部知识源。
一个完整的RAG系统通常包含以下核心组件:
用户提问 → 查询预处理 → 向量检索 → 上下文组装 → LLM生成 → 回答返回
↑
文档加载 → 文本分块 → 向量化 → 存入向量数据库
让我们从零开始,逐步构建这个系统。
首先创建项目并安装必要的依赖:
mkdir rag-system && cd rag-system
python -m venv venv
source venv/bin/activate
pip install langchain langchain-openai chromadb sentence-transformers
pip install pypdf unstructured tiktoken
创建一个基础的配置文件:
# config.py
import os
# OpenAI API配置(也可替换为其他LLM提供商)
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "your-api-key-here")
OPENAI_MODEL = "gpt-4o"
# Embedding模型配置
EMBEDDING_MODEL = "all-MiniLM-L6-v2" # 本地模型,无需API
EMBEDDING_DIMENSION = 384
# 向量数据库配置
CHROMA_PERSIST_DIR = "./chroma_db"
# 检索配置
TOP_K_RESULTS = 5
CHUNK_SIZE = 500
CHUNK_OVERLAP = 50
文档分块是RAG系统中极其关键的一步。分块质量直接影响检索的准确性和最终回答的质量。
# document_loader.py
from langchain_community.document_loaders import PyPDFLoader, TextLoader
from langchain_community.document_loaders import UnstructuredMarkdownLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import List
from langchain_core.documents import Document
class DocumentProcessor:
"""文档加载与分块处理器"""
def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=["\n\n", "\n", "。", "!", "?", ".", " ", ""],
)
def load_document(self, file_path: str) -> List[Document]:
"""根据文件类型自动选择加载器"""
if file_path.endswith(".pdf"):
loader = PyPDFLoader(file_path)
elif file_path.endswith(".md"):
loader = UnstructuredMarkdownLoader(file_path)
elif file_path.endswith(".txt"):
loader = TextLoader(file_path, encoding="utf-8")
else:
raise ValueError(f"不支持的文件格式: {file_path}")
return loader.load()
def process(self, file_path: str) -> List[Document]:
"""加载文档并进行分块"""
documents = self.load_document(file_path)
chunks = self.text_splitter.split_documents(documents)
# 为每个分块添加元数据
for i, chunk in enumerate(chunks):
chunk.metadata["chunk_index"] = i
chunk.metadata["source_file"] = file_path
print(f"文档 '{file_path}' 已处理: {len(documents)} 页 → {len(chunks)} 个分块")
return chunks
不同的分块策略适用于不同场景:
| 策略 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| 固定长度分块 | 通用场景 | 实现简单 | 可能截断语义 |
| 递归字符分块 | 结构化文档 | 保留语义边界 | 需要调参 |
| 语义分块 | 长文档 | 语义完整性高 | 计算成本高 |
| 句子级分块 | 精确检索 | 粒度细 | 上下文可能不足 |
推荐使用 RecursiveCharacterTextSplitter,它在大多数场景下表现良好,且支持自定义分隔符来适配中文文档。
我们使用ChromaDB作为向量数据库,它轻量、易用,非常适合入门和中小规模应用。
# vector_store.py
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.documents import Document
from typing import List, Optional
from config import EMBEDDING_MODEL, CHROMA_PERSIST_DIR
class VectorStoreManager:
"""向量数据库管理器"""
def __init__(self, persist_directory: str = CHROMA_PERSIST_DIR):
self.embeddings = HuggingFaceEmbeddings(
model_name=EMBEDDING_MODEL,
model_kwargs={"device": "cpu"},
encode_kwargs={"normalize_embeddings": True},
)
self.persist_directory = persist_directory
self.vectorstore: Optional[Chroma] = None
def create_vectorstore(
self, documents: List[Document], collection_name: str = "default"
) -> Chroma:
"""从文档创建向量数据库"""
self.vectorstore = Chroma.from_documents(
documents=documents,
embedding=self.embeddings,
persist_directory=self.persist_directory,
collection_name=collection_name,
)
self.vectorstore.persist()
print(f"向量数据库已创建,共 {len(documents)} 条记录")
return self.vectorstore
def load_vectorstore(self, collection_name: str = "default") -> Chroma:
"""加载已有的向量数据库"""
self.vectorstore = Chroma(
persist_directory=self.persist_directory,
embedding_function=self.embeddings,
collection_name=collection_name,
)
return self.vectorstore
def add_documents(self, documents: List[Document]) -> List[str]:
"""向已有数据库添加新文档"""
if self.vectorstore is None:
raise ValueError("请先加载或创建向量数据库")
return self.vectorstore.add_documents(documents)
def similarity_search(self, query: str, top_k: int = 5) -> List[Document]:
"""相似度检索"""
if self.vectorstore is None:
raise ValueError("请先加载或创建向量数据库")
return self.vectorstore.similarity_search(query, k=top_k)
def similarity_search_with_score(
self, query: str, top_k: int = 5
) -> List[tuple]:
"""带相似度分数的检索"""
if self.vectorstore is None:
raise ValueError("请先加载或创建向量数据库")
return self.vectorstore.similarity_search_with_score(query, k=top_k)
选择合适的Embedding模型至关重要:
基础的相似度检索往往不够,我们需要更智能的检索策略:
# retriever.py
from langchain_community.vectorstores import Chroma
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
from typing import List
class AdvancedRetriever:
"""高级检索器:混合检索 + 重排序"""
def __init__(self, vectorstore: Chroma, documents: List):
self.vectorstore = vectorstore
self.documents = documents
def create_hybrid_retriever(self, top_k: int = 10) -> EnsembleRetriever:
"""创建混合检索器(向量检索 + BM25关键词检索)"""
# 向量检索器
vector_retriever = vectorstore.as_retriever(
search_type="mmr", # 使用MMR算法增加多样性
search_kwargs={"k": top_k, "fetch_k": top_k * 3},
)
# BM25关键词检索器
bm25_retriever = BM25Retriever.from_documents(
self.documents, k=top_k
)
# 混合检索
ensemble_retriever = EnsembleRetriever(
retrievers=[vector_retriever, bm25_retriever],
weights=[0.6, 0.4], # 向量检索权重更高
)
return ensemble_retriever
def create_reranking_retriever(self, top_k: int = 5):
"""创建带重排序的检索器"""
# 使用Cross-Encoder进行精排
cross_encoder = HuggingFaceCrossEncoder(
model_name="BAAI/bge-reranker-v2-m3"
)
compressor = CrossEncoderReranker(
model=cross_encoder, top_n=top_k
)
base_retriever = self.vectorstore.as_retriever(
search_kwargs={"k": top_k * 3}
)
reranking_retriever = ContextualCompressionRetriever(
base_compressor=compressor, base_retriever=base_retriever
)
return reranking_retriever
混合检索结合了语义检索(向量)和关键词检索(BM25),能同时捕获语义相似性和精确匹配。重排序则用更精确的Cross-Encoder模型对初步检索结果进行精排,显著提升最终结果的质量。
# rag_engine.py
from langchain_openai import ChatOpenAI
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.vectorstores import Chroma
from typing import List
from config import OPENAI_API_KEY, OPENAI_MODEL, TOP_K_RESULTS
class RAGEngine:
"""RAG引擎:检索 + 生成"""
SYSTEM_PROMPT = """你是一个专业的知识助手。请根据以下检索到的上下文信息回答用户问题。
规则:
1. 仅基于提供的上下文信息回答,不要编造信息
2. 如果上下文中没有足够信息,请明确说明
3. 回答时引用信息来源
4. 使用专业但易懂的语言
上下文信息:
{context}
用户问题:{input}
"""
def __init__(self, vectorstore: Chroma, top_k: int = TOP_K_RESULTS):
self.llm = ChatOpenAI(
api_key=OPENAI_API_KEY,
model=OPENAI_MODEL,
temperature=0.1, # 低温度保证回答稳定性
)
self.vectorstore = vectorstore
self.top_k = top_k
self.chain = self._build_chain()
def _build_chain(self):
"""构建RAG链"""
retriever = self.vectorstore.as_retriever(
search_type="mmr",
search_kwargs={
"k": self.top_k,
"fetch_k": self.top_k * 3,
"lambda_mult": 0.7, # MMR多样性参数
},
)
prompt = ChatPromptTemplate.from_template(self.SYSTEM_PROMPT)
question_answer_chain = create_stuff_documents_chain(
self.llm, prompt
)
rag_chain = create_retrieval_chain(retriever, question_answer_chain)
return rag_chain
def query(self, question: str) -> dict:
"""执行RAG查询"""
result = self.chain.invoke({"input": question})
return {
"answer": result["answer"],
"source_documents": result["context"],
"sources": [
{
"content": doc.page_content[:200],
"source": doc.metadata.get("source_file", "unknown"),
"chunk_index": doc.metadata.get("chunk_index", -1),
"score": doc.metadata.get("score", None),
}
for doc in result["context"]
],
}
def chat(self, question: str) -> str:
"""简洁的聊天接口"""
result = self.query(question)
response = result["answer"]
response += "\n\n---\n**参考来源:**\n"
for i, src in enumerate(result["sources"], 1):
response += f"{i}. {src['source']} (分块 #{src['chunk_index']})\n"
return response
将所有组件整合到一起:
# main.py
from document_loader import DocumentProcessor
from vector_store import VectorStoreManager
from rag_engine import RAGEngine
import os
def build_knowledge_base(directory: str):
"""从目录构建知识库"""
processor = DocumentProcessor(chunk_size=500, chunk_overlap=50)
vs_manager = VectorStoreManager()
all_chunks = []
supported_extensions = [".pdf", ".md", ".txt"]
for filename in os.listdir(directory):
ext = os.path.splitext(filename)[1].lower()
if ext in supported_extensions:
file_path = os.path.join(directory, filename)
chunks = processor.process(file_path)
all_chunks.extend(chunks)
if not all_chunks:
print("未找到可处理的文档")
return None
vectorstore = vs_manager.create_vectorstore(all_chunks)
return vectorstore
def main():
import argparse
parser = argparse.ArgumentParser(description="RAG系统")
parser.add_argument("--build", type=str, help="构建知识库的目录路径")
parser.add_argument("--query", type=str, help="查询问题")
args = parser.parse_args()
if args.build:
vectorstore = build_knowledge_base(args.build)
print("知识库构建完成!")
elif args.query:
vs_manager = VectorStoreManager()
vectorstore = vs_manager.load_vectorstore()
engine = RAGEngine(vectorstore)
answer = engine.chat(args.query)
print(answer)
else:
# 交互模式
vs_manager = VectorStoreManager()
vectorstore = vs_manager.load_vectorstore()
engine = RAGEngine(vectorstore)
print("RAG系统已就绪,输入问题开始查询(输入 'quit' 退出)")
while True:
question = input("\n问题: ").strip()
if question.lower() == "quit":
break
if question:
print(engine.chat(question))
if __name__ == "__main__":
main()
分块大小是影响RAG效果最关键的参数之一。建议根据你的文档类型进行实验:
# 针对不同文档类型的推荐配置
CHUNK_CONFIGS = {
"技术文档": {"chunk_size": 800, "chunk_overlap": 100},
"法律合同": {"chunk_size": 1200, "chunk_overlap": 200},
"新闻文章": {"chunk_size": 500, "chunk_overlap": 50},
"学术论文": {"chunk_size": 1000, "chunk_overlap": 150},
}
对用户查询进行预处理可以显著提升检索质量:
from langchain_core.prompts import ChatPromptTemplate
# 查询重写:将用户问题改写为更适合检索的形式
query_rewrite_prompt = ChatPromptTemplate.from_template(
"""请将以下用户问题改写为一个更适合在知识库中检索的查询语句。
要求:提取关键概念,去除无关信息,保持语义完整。
原始问题:{question}
改写后的查询:"""
)
# HyDE(假设性文档嵌入):先生成一个假设性回答,再用它进行检索
hyde_prompt = ChatPromptTemplate.from_template(
"""请针对以下问题写一段简短的假设性回答(约100字)。
即使你不确定答案,也请根据常识给出一个合理的回答。
问题:{question}
假设性回答:"""
)
对于频繁查询的内容,添加缓存可以大幅降低成本和延迟:
from functools import lru_cache
import hashlib
import json
class RAGCache:
"""RAG查询缓存"""
def __init__(self, max_size: int = 1000):
self.cache = {}
def _get_cache_key(self, query: str) -> str:
return hashlib.md5(query.encode()).hexdigest()
def get(self, query: str):
key = self._get_cache_key(query)
return self.cache.get(key)
def set(self, query: str, result: dict, ttl: int = 3600):
key = self._get_cache_key(query)
self.cache[key] = {"result": result, "expires_at": time.time() + ttl}
def get_or_query(self, query: str, query_func):
cached = self.get(query)
if cached and cached["expires_at"] > time.time():
return cached["result"]
result = query_func(query)
self.set(query, result)
return result
本文从零开始构建了一个完整的RAG系统,涵盖了从文档处理到最终生成的全部流程。核心要点回顾:
RAG系统的优化是一个持续迭代的过程,建议在实际应用中不断监控检索质量和生成效果,针对性地调整各个组件的参数。随着RAG技术的快速发展,未来还可以探索多模态RAG、Agentic RAG等更高级的架构模式。