-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathingest.py
112 lines (93 loc) · 3.74 KB
/
ingest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from box import Box
import yaml
from pprint import pprint
from anytree import Node, PreOrderIter, RenderTree
from anytree.exporter import JsonExporter
from transformers import AutoTokenizer
from langchain_text_splitters import TokenTextSplitter
import chromadb
import chromadb.config
import rag_tools
import asyncio
async def main():
config = Box.from_yaml(
filename = "config.yaml",
Loader = yaml.FullLoader
)
spec = Box.from_yaml(
filename = config.path.pdfs + "spec.yaml",
Loader = yaml.FullLoader
)
##
# Convert PDFs to trees
##
pdfs = rag_tools.get_documents(config, spec)
trees = await rag_tools.get_trees(config, spec)
tree_exporter = JsonExporter(
indent = None,
sort_keys = False,
ensure_ascii = False,
check_circular = False
)
for pdf in pdfs:
if pdf['id'] in trees: continue
pdf_name = config.path.pdfs + pdf['file_name']
tree_name = config.path.trees + pdf['id'] + ".json"
sp = spec[pdf['file_name']]
trees[pdf['id']] = rag_tools.pdf_to_tree(
file_name = pdf_name,
title = sp.title,
page_numbers = range(sp.page_from, sp.page_to+1) if sp.page_from is not None else None,
detect_columns = sp.detect_columns,
color_headers = sp.color_headers,
header = sp.header,
footer = sp.footer
)
print(RenderTree(trees[pdf['id']]).by_attr('header'))
fh = open(tree_name, "w")
tree_exporter.write(trees[pdf['id']], fh)
fh.close()
##
# Chunk and embed
##
tokenizer = AutoTokenizer.from_pretrained(config.embeddings.tokenizer_model)
text_splitter = TokenTextSplitter.from_huggingface_tokenizer(
tokenizer = tokenizer,
chunk_size = config.chunker.size,
chunk_overlap = config.chunker.overlap,
)
strings_embedder = rag_tools.Embedder(config)
chromadb_client = chromadb.PersistentClient(
path = config.path.embeddings,
settings = chromadb.config.Settings(anonymized_telemetry = False)
)
collections = {}
for pdf in pdfs:
if pdf['id'] in [c.name for c in chromadb_client.list_collections()]:
collections[pdf['id']] = chromadb_client.get_collection(name=pdf['id'])
continue
collections[pdf['id']] = chromadb_client.create_collection(
name = pdf['id'],
metadata = {"hnsw:space": "cosine"}
)
async def embed_node(node: Node):
if not node.is_leaf and (node.text == "" or node.text.isspace()): return
header = node.header
parent = node.parent
while parent.level >= 0:
header = parent.header + " > " + header
parent = parent.parent
text = header + "\n" + node.text
path = '/'.join([n.name for n in node.path])
splits = text_splitter.split_text(text)
print(f"Embedding: {header} ({len(splits)} splits)...")
embeddings = await strings_embedder.embed_strings([config.embeddings.document_prefix + s for s in splits])
collections[pdf['id']].add(
embeddings = embeddings,
metadatas = [{'pos' : text.index(s), 'len' : len(s)} for s in splits],
ids = [path + "#" + str(i) for i in range(len(splits))]
)
for embed in asyncio.as_completed([embed_node(node) for node in PreOrderIter(trees[pdf['id']])]):
await embed
await strings_embedder.close()
asyncio.run(main())