-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathread_to_vectorstore.py
229 lines (202 loc) · 8.64 KB
/
read_to_vectorstore.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import argparse
import textract
import pandas as pd
import os
from langchain_community.embeddings import HuggingFaceInstructEmbeddings
from langchain_community.vectorstores import FAISS
from dotenv import load_dotenv
load_dotenv()
if os.getenv("EMBEDDINGS_MODEL") is not None:
model_name=os.getenv("EMBEDDINGS_MODEL")
else:
model_name="sentence-transformers/all-MiniLM-L6-v2"
if os.getenv("CHUNK_SEPARATOR") is not None:
chunk_separator=os.getenv("CHUNK_SEPARATOR")
else:
chunk_separator="\n\n"
def read_filenames_from_directory(material_directory: str):
filenames = []
for root, dirs, files in os.walk(material_directory):
for name in files:
# Exclude dot-files
if name[0] != '.':
filenames.append(os.path.join(root, name))
return filenames
def create_material_headings_from_filenames(filenames, material_directory):
# Make headings pretty based on file names
# '_' to ' ', remove file suffixes, title case, "/" to ": "
material_headings = [filename[len(material_directory):] for filename in filenames]
def pretty_headings(heading):
heading = heading.replace('_', ' ')
heading = heading.split('.')[0]
heading = heading.title()
heading = heading.replace('/', ': ')
return heading
material_headings = [pretty_headings(heading) for heading in material_headings]
return material_headings
def convert_files_totext(filenames):
# Extract text from the files
# Supported file formats: https://textract.readthedocs.io/en/stable/ + MarkDown
texts = []
for filename in filenames:
# Exctract file type
filetype = filename.split('.')[-1]
print("Converting to text: " + filename)
if filetype != "md":
try:
text = textract.process(filename)
text = text.decode("utf-8")
except Exception as e:
print(f"An error occurred when processing the file {filename}: {e}. Unsupported file type?")
continue
else:
with open(filename) as f:
text=f.read()
f.close()
texts.append(text)
return texts
def create_chunck_dataframe(material_headings, texts):
# Create data frame
df = pd.DataFrame({'Heading': material_headings, 'Text': texts})
# Create chunks
from langchain.text_splitter import CharacterTextSplitter
text_splitter = CharacterTextSplitter(
separator = chunk_separator, # Set in .env, if not set, default is "\n\n"
chunk_size = 500,
chunk_overlap = 100,
length_function = len,
is_separator_regex = False,
)
df['Text_Splitted'] = df['Text'].apply(text_splitter.split_text)
# Append Heading to the top of chunk
df['Text_Splitted_w_Headings'] = df.apply(lambda row: ["Source: " + row['Heading'] + '\n' + chunk for chunk in row['Text_Splitted']], axis=1)
print("Number of chunks in the first file: "+str(len(df['Text_Splitted'][0])))
return df
def create_vector_store(df,
store_type="faiss",
metadatas=False,
vector_store_endpoint=None,
vector_store_api_key=None,
vector_store_collection_name=None):
master_chunk = []
master_metadata=[]
for i, row in df.iterrows():
master_chunk += row['Text_Splitted_w_Headings']
if metadatas:
for text_in_row in row['Text_Splitted_w_Headings']:
master_metadata.append(row[['Heading','Modified']].to_dict())
# Create vector store
embeddings = HuggingFaceInstructEmbeddings(model_name=model_name)
if store_type=="faiss":
vector_store = FAISS.from_texts(texts=master_chunk, embedding=embeddings,metadatas=master_metadata if metadatas else None)
elif store_type=="qdrant":
from langchain_qdrant import QdrantVectorStore
vector_store = QdrantVectorStore.from_texts(
texts=master_chunk,
embedding=embeddings,
metadatas=master_metadata if metadatas else None,
url=vector_store_endpoint,
prefer_grpc=True,
api_key=vector_store_api_key,
collection_name=vector_store_collection_name,
force_recreate=True,
)
else:
print("Unsupported vector store detected. Returning None.")
return None
return vector_store
def main():
# Scan files from the course_material directory (or alternative)
material_directory=args.load_dir+"/"
folder = args.save_dir+"/"
use_defaults = args.use_defaults
print("Running script with the following arguments:")
print(f"Directory for the course materials: {material_directory}")
print(f"Directory to save the vector store: {folder}")
print(f"Run the script with sensible defaults: {use_defaults}")
print("---")
print("The following files will be processed:\n")
if use_defaults:
dir_input = material_directory
else:
dir_input = input(f"Default materials directory: {material_directory}\nPress enter to keep the same, or write the desired name to change: ")
if dir_input != '':
material_directory = dir_input
filenames=read_filenames_from_directory(material_directory=material_directory)
print(filenames)
material_headings=create_material_headings_from_filenames(filenames,material_directory)
# Loop through suggested material headings and ask the user if they want to change it
for i, heading in enumerate(material_headings):
# print(heading)
if use_defaults:
user_input=heading
else:
user_input = input(f"Suggested heading: {heading}\nPress enter to keep the same, or write the desired name to change:")
if user_input != '':
material_headings[i] = user_input
print(material_headings)
texts = convert_files_totext(filenames)
df = create_chunck_dataframe(material_headings, texts)
vector_store = create_vector_store(df,store_type="faiss")
# Try querying the vector store
print("Test querying the vector store.")
try:
if use_defaults:
query="Default query."
else:
query = input("Search the database: ")
docs = vector_store.similarity_search(query)
print("Results of test query: ")
print(docs)
except Exception as e:
print(f"An error occurred when performing a query: {e}")
exit(1)
# Ask for the folder to save the database in
if use_defaults:
folder_figured_out = True
else:
folder_figured_out = False
while folder_figured_out == False:
folder = input(f"What folder would you like to save the database in? Default: {folder} \n")
if folder == "":
folder = args.save_dir
folder_figured_out = True
if not os.path.exists(folder):
print(f"Folder {folder} does not exist. Do you want to create it?")
create_folder = input("Y/n: ")
if create_folder.lower() == "y":
os.mkdir(folder)
folder_figured_out = True
elif create_folder.lower() == "":
os.mkdir(folder)
folder_figured_out = True
print(f"Saving database in {folder}")
# Save the vector store
try:
vector_store.save_local(folder)
print("Save succcess!")
except Exception as e:
print(f"An error occurred: {e}")
exit(1)
# Load the vector store for testing
print("Test querying the vector store (loaded from disk).")
try:
loaded_vector_store = FAISS.load_local(folder, HuggingFaceInstructEmbeddings(model_name=model_name), allow_dangerous_deserialization=True)
# Try querying the vector store
if use_defaults:
query="Default query."
else:
query = input("Search the database: ")
docs = loaded_vector_store.similarity_search(query)
print(docs)
print("Success!")
except Exception as e:
print(f"An error occurred while loading vector store from disk: {e}")
return df
if __name__ == "__main__":
parser=argparse.ArgumentParser(description="Read course materials and save to vector store.")
parser.add_argument('-d','--load_dir',help="Directory for the course materials. ",required=False, default="course_material")
parser.add_argument('-s','--save_dir',help="Directory to save the vector store.",required=False, default="course_material_vdb")
parser.add_argument('-u','--use_defaults',action='store_true', help="Run the script with sensible defaults.",required=False)
args = parser.parse_args()
df = main()