在不断发展的自然语言处理和信息检索领域中,检索增强一代 (RAG) 已成为一种改变游戏规则的范式。
尖端 RAG 系统的核心是一个强大的组合:pgvector(PostgreSQL 的向量相似性搜索扩展)和 Django(高级 Python Web 框架)。当与复杂的 ReRanking 技术相结合时,这些技术构成了最先进的信息检索系统的支柱。
本指南将探讨在 RAG 系统中实现和优化 ReRanking 的主题,特别关注利用 pgvector 和 Django。我们将探讨如何利用这些工具来创建可扩展、高效且高度准确的检索系统,从而突破自然语言处理的界限。
了解 RAG 生态系统
在我们探索使用 pgvector 和 Django 进行 ReRanking 之前,让我们花点时间了解 RAG 生态系统以及为什么这些特定技术如此重要。
RAG:现代 NLP 的基础
Retrieval Augmented Generation 将大型语言模型的强大功能与外部知识库相结合。这种方法允许更准确、最新和可控的文本生成。典型的 RAG 过程包括:
- 在向量空间中嵌入文档
- 根据查询检索相关文档
- 将检索到的文档和查询提供给语言模型进行生成
pgvector:PostgreSQL 的向量强国
pgvector 是 PostgreSQL 的一个扩展,它增加了对向量相似性搜索的支持。它允许高效存储和检索高维载体,使其成为 RAG 系统的理想选择。主要功能包括:
- 支持 L2 距离、内积和余弦相似性
- 用于快速相似性搜索的索引
- 与 PostgreSQL 的丰富功能集集成
Django:首选的 Web 框架
Django 是一个高级 Python Web 框架,鼓励快速开发和简洁、实用的设计。它的 ORM(对象关系映射)系统使使用数据库变得容易,包括带有 pgvector 的 PostgreSQL。Django 对 RAG 系统的主要优势包括:
- 无缝数据库集成
- 强大的安全功能
- 可扩展性和性能优化
Django 是构建 AI 应用程序的绝佳框架。如果您有兴趣了解更多信息,请查看此文章。
ReRanking 革命
虽然使用 pgvector 和 Django 的基本 RAG 设置可以产生令人印象深刻的结果,但 ReRanking 的引入将性能提升到了新的高度。ReRanking 是一种复杂的技术,用于在用于生成检索到的文档之前提高检索到的文档的相关性和质量。检索过程中的这一额外步骤可以显著提高 RAG 系统的整体性能,从而获得更准确和上下文合适的响应。
了解 ReRankers
ReRanker 的核心是一种模型或算法,旨在根据检索到的文档与给定查询的相关性对检索到的文档列表进行优化和重新排序。与通常依赖于有效但有些粗糙的相似性度量的初始检索步骤不同,ReRankers 可以采用更复杂和细致的方法来评估文档相关性。
ReRankers 的主要特征包括:
- 情境理解:ReRankers 通常对查询和文档之间的语义关系有更深入的理解。它们可以捕获更简单的向量相似性搜索可能会遗漏的细微差别。
- 多方面评分:虽然初始检索可能依赖于单个相似度分数,但 ReRankers 在评估相关性时可以考虑多个因素。这可能包括语义相似性、事实正确性、文档新鲜度等。
- 查询-文档交互:许多高级 ReRanker,尤其是那些基于 transformer 架构的 ReRanker,可以直接对查询和文档之间的交互进行建模,而不是将它们视为独立的实体。
- 适应性: ReRankers 通常可以针对特定领域或任务进行微调或调整,从而在各种上下文中进行更专业和准确的排名。
ReRanker 的类型
有几种类型的 ReRankers,每种都有自己的优势和用例:
- 跨编码器模型:这些通常基于 transformer 架构,并一起处理查询和文档,从而允许丰富的交互建模。它们非常准确,但计算成本可能很高。
- 双编码器型号:这些模型分别对查询和文档进行编码,从而以牺牲一定的准确性为代价实现更快的推理时间。它们通常用于两阶段排名系统。
- 学习排名 (LTR) 模型:这些模型使用机器学习技术来组合多个相关性信号并学习最佳排名函数。
- 基于规则的 ReRankers:虽然在现代系统中不太常见,但这些系统使用预定义的规则或启发式方法来对文档重新排序。它们在可以直接编码专业知识的特定领域中非常有用。
ReRanking 过程
RAG 系统中的典型 ReRanking 过程遵循以下步骤:
- 初始检索:使用 pgvector,根据与查询的向量相似性检索一组可能相关的文档。
- 候选人选择:从此初始集中,选择顶级候选项的子集进行 ReRanking。此步骤对于平衡精度和计算效率至关重要。
- 特征提取:对于每个查询-文档对,将提取相关特征。这些可以是密集向量、稀疏特征或两者的组合。
- 得分:ReRanker 为每个查询-文档对分配一个相关性分数。这可能涉及通过神经网络传递货币对、应用学习的排名函数或使用其他评分机制。
- 重组:根据新的相关性分数,文档将重新排序。
- 最终选择:选择前 N 个重新排名的文档以传递给语言模型进行生成。
在 RAG 系统中进行 ReRanking 的好处
将 ReRanking 整合到 RAG 系统中有几个主要好处:
- 提高相关性:通过应用更复杂的相关性评估,ReRanking 有助于确保在生成时使用最相关的文档,从而获得更准确和切题的响应。
- 降低噪音:ReRanking 可以帮助筛选掉可能在初始搜索中检索到的不相关或低质量的文档,从而减少语言模型输入中的干扰。
- 边缘情况的处理:ReRankers 可以设计为处理更简单的检索方法可能遗漏的特定边缘情况或细微差别。
- 对不同查询类型的适应性:不同类型的查询可能需要不同的排名策略。ReRankers 可以设计为根据查询特征调整其行为。
- 集成多个信号:ReRankers 可以合并文本相似性之外的各种信号,例如文档新鲜度、用户首选项或外部知识库。
- 提高效率:虽然 ReRanking 本身增加了一个计算步骤,但它通过提供更高质量的输入来允许更有效地使用语言模型。
实施 ReRanking 的挑战
尽管有好处,但在 RAG 系统中实施 ReRanking 也带来了挑战:
- 计算开销:ReRanking,尤其是对于复杂模型,可能会大大增加检索过程的计算成本。
- 延迟问题:在实时应用程序中,需要仔细管理 ReRanking 所需的额外时间,以保持可接受的响应时间。
- 训练数据要求:许多有效的 ReRanker 需要大量标记的训练数据,创建这些数据可能既昂贵又耗时。
- 平衡精度和效率:在 ReRanking 的准确性和计算效率之间通常需要权衡。找到正确的平衡对于实际应用至关重要。
- 集成复杂性:将 ReRanking 整合到现有的 RAG 管道中会增加系统架构和工作流程的复杂性。
使用 pgvector 和 Django 重新排名
在使用 pgvector 和 Django 构建的系统中实现 ReRanking 时,我们可以利用这两种技术的优势:
- pgvector 进行初始检索:使用 pgvector 的高效向量相似性搜索快速检索初始候选文档集。
- Django 数据管理:利用 Django 的 ORM 来高效管理文档数据和元数据。
- 基于 Python 的 ReRankers:使用 PyTorch 或 TensorFlow 等 Python 库实现 ReRanking 模型,这些库与 Django 集成良好。
- 异步处理:使用 Django 的异步功能或 Celery 来处理计算密集型的 ReRanking 任务,而不会阻塞主应用程序线程。
- 缓存策略:使用 Django 的缓存框架实现 ReRanking 结果的缓存,以提高重复查询的性能。
通过结合 pgvector 的向量搜索功能、Django 的 Web 开发优势和复杂的 ReRanking 技术,我们可以创建不仅快速、可扩展,而且高度准确和上下文感知的 RAG 系统。这种组合突破了信息检索和自然语言处理的界限,为构建智能、响应迅速和以用户为中心的应用程序开辟了新的可能性。
设置环境
要使用 pgvector 和 Django 实现 Reranking,我们首先需要设置我们的环境。以下是分步指南:
- 安装 PostgreSQL 和 pgvector
首先,确保您已安装 PostgreSQL。然后,安装 pgvector:
sudo apt-get install postgresql-server-dev-all
git clone https://github.com/pgvector/pgvector.git
cd pgvector
make
sudo make install
- 创建新的 Django 项目和应用程序
django-admin startproject rag_project
cd rag_project
python manage.py startapp retrieval
- 安装必要的 Python 软件包
pip install django psycopg2-binary numpy scikit-learn sentence-transformers
- 配置 Django 设置
在 中,添加以下数据库配置:settings.py
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'rag_db',
'USER': 'your_username',
'PASSWORD': 'your_password',
'HOST': 'localhost',
'PORT': '5432',
}
}
- 创建数据库并启用 pgvector
CREATE DATABASE rag_db;
\c rag_db
CREATE EXTENSION vector;
实施核心 RAG 系统
现在我们的环境已经设置好了,让我们使用 pgvector 和 Django 实现核心 RAG 系统。
- 定义文档模型
在:retrieval/models.py
from django.db import models
from django.contrib.postgres.fields import ArrayField
class Document(models.Model):
content = models.TextField()
embedding = ArrayField(models.FloatField(), size=768) # Adjust size based on your embedding model
class Meta:
indexes = [
models.Index(fields=['embedding'], name='embedding_idx', opclasses=['vector_cosine_ops'])
]
- 创建和应用迁移
python manage.py makemigrations
python manage.py migrate
- 实施文档嵌入
创建新文件 :retrieval/embeddings.py
from sentence_transformers import SentenceTransformer
import numpy as np
model = SentenceTransformer('all-MiniLM-L6-v2')
def embed_text(text):
return model.encode(text).tolist()
- 实施文档插入
在:retrieval/views.py
from django.http import JsonResponse
from .models import Document
from .embeddings import embed_text
def insert_document(request):
content = request.POST.get('content')
embedding = embed_text(content)
doc = Document.objects.create(content=content, embedding=embedding)
return JsonResponse({'id': doc.id, 'content': doc.content})
- 实施相似性搜索
搭:retrieval/views.py
from django.db.models.expressions import RawSQL
def similarity_search(request):
query = request.GET.get('query')
query_embedding = embed_text(query)
similar_docs = Document.objects.annotate(
similarity=RawSQL(
"embedding <=> %s",
(query_embedding,)
)
).order_by('similarity')[:10]
results = [{'id': doc.id, 'content': doc.content, 'similarity': doc.similarity} for doc in similar_docs]
return JsonResponse({'results': results})
这个基本实现允许我们将文档插入支持 pgvector 的 PostgreSQL 数据库并执行相似性搜索。然而,要真正利用 ReRanking 的力量,我们需要更进一步。
使用 pgvector 和 Django 实现 ReRanking
现在我们已经有了基本的 RAG 系统,让我们实施 ReRanking 来提高检索到的文档的质量。
- 创建 ReRanking 模型
我们将使用一个简单的跨编码器模型进行 ReRanking。将以下内容添加到 :retrieval/embeddings.py
from sentence_transformers import CrossEncoder
reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
def rerank_documents(query, documents):
pairs = [[query, doc['content']] for doc in documents]
scores = reranker.predict(pairs)
reranked = sorted(zip(documents, scores), key=lambda x: x[1], reverse=True)
return [doc for doc, score in reranked]
- 修改相似性搜索视图
from .embeddings import embed_text, rerank_documents
def similarity_search(request):
query = request.GET.get('query')
query_embedding = embed_text(query)
similar_docs = Document.objects.annotate(
similarity=RawSQL(
"embedding <=> %s",
(query_embedding,)
)
).order_by('similarity')[:100] # Retrieve more documents for reranking
doc_list = [{'id': doc.id, 'content': doc.content, 'similarity': doc.similarity} for doc in similar_docs]
reranked_docs = rerank_documents(query, doc_list)
return JsonResponse({'results': reranked_docs[:10]}) # Return top 10 after reranking
此实现现在使用 pgvector 执行初始相似性搜索,检索更大的潜在相关文档集,然后应用 ReRanking 来优化结果。
优化 ReRanking 性能
虽然上述实现是功能性的,但我们可以进行一些优化来提高性能和可扩展性:
- 批处理
在处理大量文档时,我们可以使用批处理来提高效率:
from django.db import connection
def batch_similarity_search(query, batch_size=1000):
query_embedding = embed_text(query)
with connection.cursor() as cursor:
cursor.execute("""
SELECT id, content, embedding <=> %s AS similarity
FROM retrieval_document
ORDER BY similarity
LIMIT 100
""", [query_embedding])
results = []
while True:
batch = cursor.fetchmany(batch_size)
if not batch:
break
results.extend(batch)
return [{'id': r[0], 'content': r[1], 'similarity': r[2]} for r in results]
- 缓存
实现缓存以存储频繁查询的 ReRanking 结果:
from django.core.cache import cache
def cached_rerank(query, documents, cache_timeout=3600):
cache_key = f"rerank_{hash(query)}_{hash(tuple(d['id'] for d in documents))}"
cached_result = cache.get(cache_key)
if cached_result is not None:
return cached_result
reranked = rerank_documents(query, documents)
cache.set(cache_key, reranked, cache_timeout)
return reranked
- 异步处理
对于长时间运行的 ReRanking 任务,请考虑使用异步处理:
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
@async_to_sync
async def async_rerank(query, documents):
reranked = rerank_documents(query, documents)
channel_layer = get_channel_layer()
await channel_layer.group_send(
"search_results",
{
"type": "search.results",
"results": reranked,
},
)
- 分布式 ReRanking
对于非常大的数据集,请考虑实现分布式 ReRanking:
from celery import group
from .tasks import rerank_subset
def distributed_rerank(query, documents, num_workers=4):
chunk_size = len(documents) // num_workers
chunks = [documents[i:i + chunk_size] for i in range(0, len(documents), chunk_size)]
job = group(rerank_subset.s(query, chunk) for chunk in chunks)
result = job.apply_async()
reranked_chunks = result.get()
return sorted(sum(reranked_chunks, []), key=lambda x: x['score'], reverse=True)
高级 ReRanking 技术
现在我们已经为使用 pgvector 和 Django 进行 ReRanking 打下了坚实的基础,让我们探索一些高级技术来进一步增强我们的系统。
- 混合重新排名
组合多个 ReRanking 模型以提高性能:
from sentence_transformers import CrossEncoder
reranker1 = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
reranker2 = CrossEncoder('cross-encoder/mmarco-mMiniLMv2-L12-H384-v1')
def hybrid_rerank(query, documents):
pairs = [[query, doc['content']] for doc in documents]
scores1 = reranker1.predict(pairs)
scores2 = reranker2.predict(pairs)
combined_scores = [0.7 * s1 + 0.3 * s2 for s1, s2 in zip(scores1, scores2)]
reranked = sorted(zip(documents, combined_scores), key=lambda x: x[1], reverse=True)
return [doc for doc, score in reranked]
- 上下文感知 ReRanking
将用户上下文或会话信息合并到 ReRanking 过程中:
def context_aware_rerank(query, documents, user_context):
user_interests = user_context.get('interests', [])
def score_with_context(doc):
base_score = reranker.predict([[query, doc['content']]])[0]
context_score = sum(interest in doc['content'].lower() for interest in user_interests)
return base_score + 0.1 * context_score
return sorted(documents, key=score_with_context, reverse=True)
- 多元化意识 ReRanking
实施具有多样性意识的 ReRanking 策略,以确保结果各不相同:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
def diversity_aware_rerank(query, documents, lambda_param=0.5):
tfidf = TfidfVectorizer().fit_transform([doc['content'] for doc in documents])
diversity_matrix = 1 - cosine_similarity(tfidf)
reranked = []
remaining = list(range(len(documents)))
while remaining:
scores = []
for i in remaining:
relevance = reranker.predict([[query, documents[i]['content']]])[0]
if reranked:
diversity = np.mean([diversity_matrix[i][j] for j in reranked])
else:
diversity = 1
score = lambda_param * relevance + (1 - lambda_param) * diversity
scores.append(score)
best_idx = remaining[np.argmax(scores)]
reranked.append(best_idx)
remaining.remove(best_idx)
return [documents[i] for i in reranked]
- 自适应重新排名
实施根据查询特征进行调整的自适应 ReRanking 策略:
def adaptive_rerank(query, documents):
query_length = len(query.split())
if query_length <= 3:
return rerank_documents(query, documents) # Use basic reranking for short queries
elif query_length <= 6:
return hybrid_rerank(query, documents) # Use hybrid reranking for medium queries
else:
return diversity_aware_rerank(query, documents) # Use diversity-aware reranking for long queries
将 ReRanking 与 Django REST 框架集成
为了使我们的 ReRanking 系统更易于访问并更容易与前端应用程序集成,让我们使用 Django REST 框架创建一个 REST API。
- 安装 Django REST 框架
pip install djangorestframework
- 在 settings.py 中将 REST 框架添加到 INSTALLED_APPS
INSTALLED_APPS = [
# ...
'rest_framework',
# ...
]
- 创建序列化程序
在:retrieval/serializers.py
from rest_framework import serializers
from .models import Document
class DocumentSerializer(serializers.ModelSerializer):
class Meta:
model = Document
fields = ['id', 'content']
class SearchResultSerializer(serializers.Serializer):
id = serializers.IntegerField()
content = serializers.CharField()
similarity = serializers.FloatField()
- 创建 API 视图
在:retrieval/views.py
from django.db.models.expressions import RawSQL
from rest_framework.response import Response
from rest_framework.views import APIView
from .embeddings import embed_text, rerank_documents
from .serializers import DocumentSerializer, SearchResultSerializer
class DocumentView(APIView):
def post(self, request):
serializer = DocumentSerializer(data=request.data)
if serializer.is_valid():
content = serializer.validated_data['content']
embedding = embed_text(content)
doc = Document.objects.create(content=content, embedding=embedding)
return Response(DocumentSerializer(doc).data, status=201)
return Response(serializer.errors, status=400)
class SearchView(APIView):
def get(self, request):
query = request.query_params.get('query')
if not query:
return Response({'error': 'Query parameter is required'}, status=400)
query_embedding = embed_text(query)
similar_docs = Document.objects.annotate(
similarity=RawSQL(
"embedding <=> %s",
(query_embedding,)
)
).order_by('similarity')[:100]
doc_list = [{'id': doc.id, 'content': doc.content, 'similarity': doc.similarity} for doc in similar_docs]
reranked_docs = rerank_documents(query, doc_list)
serializer = SearchResultSerializer(reranked_docs[:10], many=True)
return Response(serializer.data)
5. 配置 URL
在:rag_project/urls.py
from django.urls import path, include
from retrieval.views import DocumentView, SearchView
urlpatterns = [
path('api/documents/', DocumentView.as_view(), name='document'),
path('api/search/', SearchView.as_view(), name='search'),
]
现在,我们的 ReRanking 系统有一个 RESTful API,可以轻松与各种前端应用程序或其他服务集成。
使用 pgvector 和 Django 扩展 ReRanking
随着 RAG 系统的增长,您需要考虑扩展策略以保持性能。以下是一些使用 pgvector 和 Django 扩展 ReRanking 的高级技术:
- 数据库分区
对于大型数据集,请考虑对启用 pgvector 的表进行分区:
CREATE TABLE documents_partition (
id BIGINT NOT NULL,
content TEXT NOT NULL,
embedding VECTOR(768) NOT NULL
) PARTITION BY RANGE (id);
CREATE TABLE documents_p1 PARTITION OF documents_partition
FOR VALUES FROM (1) TO (1000000);
CREATE TABLE documents_p2 PARTITION OF documents_partition
FOR VALUES FROM (1000000) TO (2000000);
-- Create more partitions as needed
更新你的 Django 模型以使用分区表:
class Document(models.Model):
content = models.TextField()
embedding = ArrayField(models.FloatField(), size=768)
class Meta:
managed = False
db_table = 'documents_partition'
- 实现分布式 pgvector 设置
对于更大的数据集,请考虑使用 PostgreSQL 的内置复制功能实现分布式 pgvector 设置:
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'rag_db',
'USER': 'your_username',
'PASSWORD': 'your_password',
'HOST': 'primary.example.com',
'PORT': '5432',
},
'replica1': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'rag_db',
'USER': 'your_username',
'PASSWORD': 'your_password',
'HOST': 'replica1.example.com',
'PORT': '5432',
},
'replica2': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'rag_db',
'USER': 'your_username',
'PASSWORD': 'your_password',
'HOST': 'replica2.example.com',
'PORT': '5432',
},
}
实现自定义数据库路由器以分发读取操作:
class ReplicaRouter:
def db_for_read(self, model, **hints):
import random
return random.choice(['replica1', 'replica2'])
def db_for_write(self, model, **hints):
return 'default'
def allow_relation(self, obj1, obj2, **hints):
return True
def allow_migrate(self, db, app_label, model_name=None, **hints):
return db == 'default'
将路由器添加到你的 Django 设置中:
DATABASE_ROUTERS = ['path.to.ReplicaRouter']
- 实施缓存层
实现多个缓存层以减少数据库负载:
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
'LOCATION': '127.0.0.1:11211',
},
'redis': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': 'redis://127.0.0.1:6379/1',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
}
}
}
实施分层缓存策略:
from django.core.cache import caches
def get_cached_search_results(query):
# Try to get from Memcached first
result = caches['default'].get(query)
if result is not None:
return result
# If not in Memcached, try Redis
result = caches['redis'].get(query)
if result is not None:
# Store in Memcached for faster future access
caches['default'].set(query, result, timeout=300)
return result
# If not in Redis, perform the search
result = perform_search_and_rerank(query)
# Store in both Redis and Memcached
caches['redis'].set(query, result, timeout=3600)
caches['default'].set(query, result, timeout=300)
return result
- 使用 Celery 进行异步处理
使用 Celery 为 ReRanking 任务实现异步处理:
from celery import shared_task
from .embeddings import rerank_documents
@shared_task
def async_rerank(query, documents):
return rerank_documents(query, documents)
# In your view
from django.http import JsonResponse
from .tasks import async_rerank
def search_view(request):
query = request.GET.get('query')
initial_results = perform_initial_search(query)
task = async_rerank.delay(query, initial_results)
return JsonResponse({
'task_id': task.id,
'initial_results': initial_results[:10]
})
def get_reranked_results(request):
task_id = request.GET.get('task_id')
task = AsyncResult(task_id)
if task.ready():
return JsonResponse({'results': task.get()})
else:
return JsonResponse({'status': 'pending'})
- 实现反馈循环
实施反馈循环以持续改进您的 ReRanking 模型:
from django.db import models
class UserFeedback(models.Model):
query = models.TextField()
document = models.ForeignKey(Document, on_delete=models.CASCADE)
relevance_score = models.FloatField()
timestamp = models.DateTimeField(auto_now_add=True)
def collect_feedback(request):
query = request.POST.get('query')
doc_id = request.POST.get('document_id')
relevance_score = float(request.POST.get('relevance_score'))
UserFeedback.objects.create(
query=query,
document_id=doc_id,
relevance_score=relevance_score
)
return JsonResponse({'status': 'success'})
# Periodically retrain your ReRanking model using collected feedback
@shared_task
def retrain_reranker():
feedback_data = UserFeedback.objects.all().values('query', 'document__content', 'relevance_score')
# Use feedback_data to fine-tune your ReRanking model
# This could involve updating the weights of your cross-encoder model
pass
使用 pgvector 的高级 ReRanking 技术
让我们探索一些利用 pgvector 独特功能的高级 ReRanking 技术:
- 混合向量搜索
组合多个嵌入模型以获得更强大的搜索结果:
from django.db.models import F
def hybrid_vector_search(query, model1, model2, weight1=0.7, weight2=0.3):
embedding1 = model1.encode(query)
embedding2 = model2.encode(query)
results = Document.objects.annotate(
score1=RawSQL("embedding <=> %s", (embedding1,)),
score2=RawSQL("embedding2 <=> %s", (embedding2,)),
combined_score=F('score1') * weight1 + F('score2') * weight2
).order_by('combined_score')[:100]
return results
- 使用 pgvector 的上下文重新排名
通过考虑用户最近的搜索历史来实现上下文 ReRanking:
def contextual_rerank(query, user_history):
query_embedding = embed_text(query)
history_embedding = np.mean([embed_text(q) for q in user_history], axis=0)
results = Document.objects.annotate(
query_similarity=RawSQL("embedding <=> %s", (query_embedding,)),
history_similarity=RawSQL("embedding <=> %s", (history_embedding,)),
combined_score=F('query_similarity') * 0.8 + F('history_similarity') * 0.2
).order_by('combined_score')[:100]
return results
- 使用 pgvector 进行语义聚类
实施语义聚类以对相似文档进行分组并使搜索结果多样化:
from sklearn.cluster import KMeans
def semantic_cluster_rerank(query, n_clusters=5):
query_embedding = embed_text(query)
initial_results = Document.objects.annotate(
similarity=RawSQL("embedding <=> %s", (query_embedding,))
).order_by('similarity')[:1000]
embeddings = np.array([doc.embedding for doc in initial_results])
kmeans = KMeans(n_clusters=n_clusters)
clusters = kmeans.fit_predict(embeddings)
reranked_results = []
for cluster in range(n_clusters):
cluster_docs = [doc for doc, c in zip(initial_results, clusters) if c == cluster]
reranked_results.extend(cluster_docs[:20 // n_clusters])
return reranked_results
- 时间感知型 ReRanking
实施时间感知型 ReRanking 以平衡相关性和新近度:
from django.db.models import ExpressionWrapper, F, fields
from django.utils import timezone
def time_aware_rerank(query, time_weight=0.2):
query_embedding = embed_text(query)
now = timezone.now()
results = Document.objects.annotate(
similarity=RawSQL("embedding <=> %s", (query_embedding,)),
age=ExpressionWrapper(now - F('created_at'), output_field=fields.DurationField()),
time_score=ExpressionWrapper(F('age').total_seconds() / (24 * 60 * 60), output_field=fields.FloatField()),
combined_score=F('similarity') * (1 - time_weight) + F('time_score') * time_weight
).order_by('combined_score')[:100]
return results
- 使用用户嵌入进行个性化 ReRanking
通过维护用户嵌入来实现个性化的 ReRanking:
class UserProfile(models.Model):
user = models.OneToOneField(User, on_delete=models.CASCADE)
embedding = ArrayField(models.FloatField(), size=768)
def update_user_embedding(user, query):
query_embedding = embed_text(query)
user_profile, created = UserProfile.objects.get_or_create(user=user)
if created:
user_profile.embedding = query_embedding
else:
user_profile.embedding = [
0.9 * ue + 0.1 * qe
for ue, qe in zip(user_profile.embedding, query_embedding)
]
user_profile.save()
def personalized_rerank(query, user):
query_embedding = embed_text(query)
user_embedding = UserProfile.objects.get(user=user).embedding
results = Document.objects.annotate(
query_similarity=RawSQL("embedding <=> %s", (query_embedding,)),
user_similarity=RawSQL("embedding <=> %s", (user_embedding,)),
combined_score=F('query_similarity') * 0.7 + F('user_similarity') * 0.3
).order_by('combined_score')[:100]
return results
优化 pgvector 性能
为了确保在大规模使用 pgvector 时获得最佳性能,请考虑以下优化:
- 索引策略
尝试不同的索引策略,以在查询速度和索引构建时间之间找到最佳平衡:
-- Create an HNSW index
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops);
-- Create an IVFFlat index
CREATE INDEX ON documents USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);
- 近似最近邻搜索
使用近似最近邻搜索可以缩短查询时间,但会降低一些准确性:
from django.db.models import Func
class ApproximateDistance(Func):
function = 'vector_cosine_ops'
template = "%(function)s(%(expressions)s) <?> %%s"
def approximate_search(query):
query_embedding = embed_text(query)
results = Document.objects.annotate(
distance=ApproximateDistance('embedding', query_embedding)
).order_by('distance')[:100]
return results
- 批处理
实现批处理以插入和更新大量文档:
from django.db import connection
def batch_insert_documents(documents, batch_size=1000):
with connection.cursor() as cursor:
values = []
for doc in documents:
embedding = embed_text(doc['content'])
values.append((doc['content'], embedding))
if len(values) >= batch_size:
cursor.executemany(
"INSERT INTO retrieval_document (content, embedding) VALUES (%s, %s)",
values
)
values = []
if values:
cursor.executemany(
"INSERT INTO retrieval_document (content, embedding) VALUES (%s, %s)",
values
)
- 异步更新
实施异步更新以使向量索引保持最新,而不会影响查询性能:
from celery import shared_task
@shared_task
def update_document_embedding(doc_id):
doc = Document.objects.get(id=doc_id)
new_embedding = embed_text(doc.content)
doc.embedding = new_embedding
doc.save()
def update_document(request):
doc_id = request.POST.get('doc_id')
new_content = request.POST.get('content')
doc = Document.objects.get(id=doc_id)
doc.content = new_content
doc.save()
update_document_embedding.delay(doc_id)
return JsonResponse({'status': 'success'})
结论
使用 pgvector 和 Django 进行 ReRanking 提供了一种强大而灵活的方法来构建高级 RAG 系统。通过利用 pgvector 的向量相似性和 Django 提供的健壮的 Web 开发框架,我们可以创建可扩展、高效且高度准确的信息检索系统。
在本指南中,我们探讨了实施和优化 ReRanking 的各种技术,从基本设置到混合 ReRanking、情境感知和个性化等高级策略。我们还探讨了扩展注意事项、性能优化以及与 Django REST Framework 和 Celery 等其他技术的集成。