-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdeepseek_rag.py
201 lines (185 loc) · 6.68 KB
/
deepseek_rag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import time
import logging
import numpy as np
from opensearchpy import OpenSearch, RequestsHttpConnection
from sentence_transformers import SentenceTransformer
from langchain_ollama import ChatOllama
from langchain.schema import HumanMessage
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
##############################
# Step 1: 初始化 OpenSearch 客户端
##############################
def get_opensearch_client():
try:
client = OpenSearch(
hosts=[{"host": "localhost", "port": 9200}],
http_compress=True,
http_auth=("admin", "test"), # 请确保密码与容器启动时一致
use_ssl=False,
verify_certs=False,
ssl_assert_hostname=False,
ssl_show_warn=False,
connection_class=RequestsHttpConnection
)
logger.info("✅ Connected to OpenSearch")
return client
except Exception as e:
logger.error(f"Failed to connect to OpenSearch: {e}")
exit(1)
client = get_opensearch_client()
##############################
# Step 2: 初始化嵌入模型
##############################
# 这里使用 SentenceTransformer 模型生成文本嵌入
embedding_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
embedding_dim = embedding_model.get_sentence_embedding_dimension()
logger.info(f"✅ Embedding dimension: {embedding_dim}")
##############################
# Step 3: 创建 OpenSearch 索引(启用 k-NN 搜索)
##############################
index_name = "documents"
index_body = {
"settings": {
"index": {
"knn": True, # Enable k-NN search
"number_of_shards": 1,
"number_of_replicas": 1
}
},
"mappings": {
"properties": {
"text": {"type": "text"},
"embedding": {
"type": "knn_vector",
"dimension": embedding_dim,
"method": {
"name": "hnsw",
"engine": "faiss",
"space_type": "innerproduct"
}
}
}
}
}
#index_body = {
# "settings": {
# "index": {
# "knn": True,
# "number_of_shards": 1,
# "number_of_replicas": 1
# }
# },
# "mappings": {
# "properties": {
# "text": {"type": "text"},
# "embedding": {
# "type": "knn_vector",
# "dimension": embedding_dim,
# "method": {
# "name": "hnsw",
# "engine": "faiss",
# "space_type": "l2" # 使用余弦相似度(需要对向量做归一化处理)
# }
# }
# }
# }
#}
#
# 如果索引已存在,则先删除
if client.indices.exists(index=index_name):
client.indices.delete(index=index_name)
client.indices.create(index=index_name, body=index_body)
logger.info(f"✅ Created index: {index_name} with knn enabled and dimension {embedding_dim}")
##############################
# Step 4: 定义辅助函数
##############################
def normalize_vector(vector):
"""将向量归一化,便于使用内积作为相似度度量"""
norm = np.linalg.norm(vector)
return vector / norm if norm > 0 else vector
def index_document(text, doc_id=None, metadata=None):
"""将文档文本转换为嵌入后,索引到 OpenSearch"""
embedding = embedding_model.encode(text)
embedding = normalize_vector(embedding).tolist()
doc = {
"text": text,
"embedding": embedding,
"metadata": metadata or {}
}
try:
client.index(index=index_name, id=doc_id, body=doc)
logger.info(f"Indexed document: {text[:50]}...")
except Exception as e:
logger.error(f"Failed to index document {text[:50]}: {e}")
def query_documents(question, k=3):
"""根据问题生成嵌入,然后在 OpenSearch 中执行 k-NN 查询"""
question_embedding = embedding_model.encode(question)
question_embedding = normalize_vector(question_embedding).tolist()
search_query = {
"size": k,
"query": {
"knn": {
"embedding": {
"vector": question_embedding,
"k": k
}
}
}
}
try:
response = client.search(index=index_name, body=search_query)
return response["hits"]["hits"]
except Exception as e:
logger.error(f"Query failed: {e}")
return []
##############################
# Step 5: 利用 Ollama 的 DeepSeek 模型生成回答
##############################
# 注意:请确保已在本地安装并启动 Ollama,并拉取相应 DeepSeek 模型(例如 deepseek-r1:1.5b)。
deepseek_model = ChatOllama(model="deepseek-r1:70b")
def generate_response(question):
"""根据问题查询 OpenSearch 获取上下文,并利用 DeepSeek 模型生成回答"""
results = query_documents(question)
if not results:
return "No relevant documents found."
# 拼接检索到的文档文本作为上下文
context = " ".join(hit["_source"]["text"] for hit in results)
prompt = f"Context: {context}\nQuestion: {question}\nAnswer:"
print(prompt)
# 使用 HumanMessage 构造消息列表调用 Ollama 模型
messages = [HumanMessage(content=prompt)]
response = deepseek_model.invoke(messages)
# 假设返回的是一个 BaseMessage 列表,取第一条消息的 content
if isinstance(response, list) and len(response) > 0:
answer = response[0].content
return answer
return str(response)
##############################
# Step 6: 示例:索引文档并查询回答
##############################
if __name__ == "__main__":
# 示例文档列表
documents = [
("OpenSearch is a powerful search and analytics engine.", "1"),
("DeepSeek is an advanced language model for natural language generation.", "2"),
("Vector databases optimize the storage and retrieval of high-dimensional data.", "3"),
("K-NN search in OpenSearch enables fast similarity search.", "4"),
("Machine learning models leverage embeddings to represent text semantically.", "5")
]
# 索引文档
for text, doc_id in documents:
index_document(text, doc_id=doc_id)
logger.info("✅ All documents indexed!")
# 等待一段时间确保数据索引完成
time.sleep(2)
# 示例查询
queries = [
"What is OpenSearch?",
"Explain vector databases.",
"Tell me about deep language models."
]
for query in queries:
answer = generate_response(query)
logger.info(f"\nQuestion: {query}\nAnswer: {answer}\n")