Skip to content

Commit

Permalink
Cache added to DocumentLoader. poetry.lock
Browse files Browse the repository at this point in the history
  • Loading branch information
enoch3712 committed Apr 30, 2024
1 parent 6836b02 commit 17b3685
Show file tree
Hide file tree
Showing 12 changed files with 2,353 additions and 97 deletions.
2 changes: 2 additions & 0 deletions .ruff.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ select = [
"E722",
# unused arguments
"ARG",
# redefined variables
"ARG005",
]
ignore = [
# mutable defaults
Expand Down
19 changes: 19 additions & 0 deletions extract_thinker/document_loader/cached_document_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@


from io import BytesIO
from typing import Any, Union

from cachetools import TTLCache
from extract_thinker.document_loader.document_loader import DocumentLoader


class CachedDocumentLoader(DocumentLoader):
def __init__(self, content: Any = None, cache_ttl: int = 300):
super().__init__(content)
self.cache = TTLCache(maxsize=100, ttl=cache_ttl)

def cached_load_content_from_file(self, file_path: str) -> Union[str, object]:
return self.load_content_from_file(file_path)

def cached_load_content_from_stream(self, stream: BytesIO) -> Union[str, object]:
return self.load_content_from_stream(stream)
4 changes: 3 additions & 1 deletion extract_thinker/document_loader/document_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import pypdfium2 as pdfium
import concurrent.futures
from typing import Any, Dict, List, Union
from cachetools import TTLCache


class DocumentLoader(ABC):
def __init__(self, content: Any = None):
def __init__(self, content: Any = None, cache_ttl: int = 300):
self.content = content
self.file_path = None
self.cache = TTLCache(maxsize=100, ttl=cache_ttl)

@abstractmethod
def load_content_from_file(self, file_path: str) -> Union[str, object]:
Expand Down
15 changes: 10 additions & 5 deletions extract_thinker/document_loader/document_loader_tesseract.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
from io import BytesIO
from operator import attrgetter
import os
from typing import Union
from PIL import Image
import pytesseract

from extract_thinker.document_loader.document_loader import DocumentLoader

from extract_thinker.document_loader.cached_document_loader import CachedDocumentLoader
from ..utils import get_image_type

from cachetools import cachedmethod
from cachetools.keys import hashkey

SUPPORTED_IMAGE_FORMATS = ["jpeg", "png", "bmp", "tiff"]


class DocumentLoaderTesseract(DocumentLoader):
def __init__(self, tesseract_cmd, isContainer=False, content=None):
self.content = content
class DocumentLoaderTesseract(CachedDocumentLoader):
def __init__(self, tesseract_cmd, isContainer=False, content=None, cache_ttl=300):
super().__init__(content, cache_ttl)
self.tesseract_cmd = tesseract_cmd
if isContainer:
# docker path to tesseract
Expand All @@ -22,6 +25,7 @@ def __init__(self, tesseract_cmd, isContainer=False, content=None):
if not os.path.isfile(self.tesseract_cmd):
raise Exception(f"Tesseract not found at {self.tesseract_cmd}")

@cachedmethod(cache=attrgetter('cache'), key=lambda self, file_path: hashkey(file_path))
def load_content_from_file(self, file_path: str) -> Union[str, object]:
try:
file_type = get_image_type(file_path)
Expand All @@ -35,6 +39,7 @@ def load_content_from_file(self, file_path: str) -> Union[str, object]:
except Exception as e:
raise Exception(f"Error processing file: {e}") from e

@cachedmethod(cache=attrgetter('cache'), key=lambda self, stream: hashkey(id(stream)))
def load_content_from_stream(self, stream: Union[BytesIO, str]) -> Union[str, object]:
try:
file_type = get_image_type(stream)
Expand Down
116 changes: 25 additions & 91 deletions extract_thinker/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,9 @@
from extract_thinker.models import (
Classification,
ClassificationResponse,
DocGroups2,
DocGroup,
DocGroups,
)
from extract_thinker.splitter import Splitter
from extract_thinker.llm import LLM
import asyncio
import os

from extract_thinker.document_loader.loader_interceptor import LoaderInterceptor
Expand All @@ -24,7 +20,6 @@ def __init__(
self.llm: Optional[LLM] = llm
self.splitter: Optional[Splitter] = None
self.file: Optional[str] = None
doc_groups: Optional[DocGroups] = None
self.document_loaders_by_file_type = {}
self.loader_interceptors: List[LoaderInterceptor] = []
self.llm_interceptors: List[LlmInterceptor] = []
Expand Down Expand Up @@ -56,7 +51,9 @@ def load_document_loader(self, document_loader: DocumentLoader) -> None:
def load_llm(self, model: str) -> None:
self.llm = LLM(model)

def extract(self, source: Union[str, IO], response_model: str, vision: bool = False) -> str:
def extract(
self, source: Union[str, IO], response_model: str, vision: bool = False
) -> str:
if isinstance(source, str): # if it's a file path
return self.extract_from_file(source, response_model, vision)
elif isinstance(source, IO): # if it's a stream
Expand All @@ -83,15 +80,15 @@ def extract_from_stream(
if self.document_loader is None:
raise ValueError("Document loader is not set")

content = self.document_loader.load_content_from_stream(stream)
content = self.document_loader.load(stream)
return self._extract(content, stream, response_model, vision, is_stream=True)

def classify_from_path(self, path: str, classifications: List[Classification]):
content = self.document_loader.getContent(path)
content = self.document_loader.load_content_from_file(path)
return self._classify(content, classifications)

def classify_from_stream(self, stream: IO, classifications: List[Classification]):
content = self.document_loader.getContentFromStream(stream)
content = self.document_loader.load_content_from_stream(stream)
self._classify(content, classifications)

def _classify(self, content: str, classifications: List[Classification]):
Expand All @@ -104,7 +101,7 @@ def _classify(self, content: str, classifications: List[Classification]):
]

input_data = (
f"##Content\n{content[0]}\n##Classifications\n"
f"##Content\n{content}\n##Classifications\n"
+ "\n".join([f"{c.name}: {c.description}" for c in classifications])
+ "\n\n##JSON Output\n"
)
Expand All @@ -115,11 +112,27 @@ def _classify(self, content: str, classifications: List[Classification]):

return response

def classify(self, input: Union[str, IO]):
if isinstance(input, str):
# Check if the input is a valid file path
if os.path.isfile(input):
_, ext = os.path.splitext(input)
if ext.lower() == ".pdf":
return self.classify_from_path(input)
else:
raise ValueError(f"Unsupported file type: {ext}")
else:
raise ValueError(f"No such file: {input}")
elif hasattr(input, 'read'):
# Check if the input is a stream (like a file object)
return self.classify_from_stream(input)
else:
raise ValueError("Input must be a file path or a stream.")

def _extract(
self, content, file_or_stream, response_model, vision=False, is_stream=False
):

#call all the llm interceptors before calling the llm
# call all the llm interceptors before calling the llm
for interceptor in self.llm_interceptors:
interceptor.intercept(self.llm)

Expand Down Expand Up @@ -156,88 +169,9 @@ def _extract(
response = self.llm.request(messages, response_model)
return response

def split(self, classifications: List[Classification]):
splitter = self.splitter

# Check if the file is a PDF
_, ext = os.path.splitext(self.file)
if ext.lower() != ".pdf":
raise ValueError("Invalid file type. Only PDFs are accepted.")

images = self.document_loader.convert_pdf_to_images(self.file)

groups = splitter.split_document_into_groups([self.file])

loop = asyncio.get_event_loop()
processedGroups = loop.run_until_complete(
splitter.process_split_groups(groups, classifications)
)

doc_groups = self.aggregate_split_documents_2(processedGroups)

self.doc_groups = doc_groups

return self

def aggregate_split_documents_2(doc_groups_tasks: List[DocGroups2]) -> DocGroups:
doc_groups = DocGroups()
current_group = DocGroup()
page_number = 1

# do the first group outside of the loop
doc_group = doc_groups_tasks[0]

if doc_group.belongs_to_same_document:
current_group.pages = [1, 2]
current_group.classification = doc_group.classification_page1
current_group.certainties = [
doc_group.certainty,
doc_groups_tasks[1].certainty,
]
else:
current_group.pages = [1]
current_group.classification = doc_group.classification_page1
current_group.certainties = [doc_group.certainty]

doc_groups.doc_groups.append(current_group)

current_group = DocGroup()
current_group.pages = [2]
current_group.classification = doc_group.classification_page2
current_group.certainties = [doc_groups_tasks[1].certainty]

page_number += 1

for index in range(1, len(doc_groups_tasks)):
doc_group_2 = doc_groups_tasks[index]

if doc_group_2.belongs_to_same_document:
current_group.pages.append(page_number + 1)
current_group.certainties.append(doc_group_2.certainty)
else:
doc_groups.doc_groups.append(current_group)

current_group = DocGroup()
current_group.classification = doc_group_2.classification_page2
current_group.pages = [page_number + 1]
current_group.certainties = [doc_group_2.certainty]

page_number += 1

doc_groups.doc_groups.append(current_group) # the last group

return doc_groups

def where(self, condition):
return self

def loadfile(self, file):
self.file = file
return self

def loadstream(self, stream):
return self

def loadSplitter(self, splitter):
self.splitter = splitter
return self
1 change: 1 addition & 0 deletions extract_thinker/featurelog.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add semantic search for the different languages and files
98 changes: 98 additions & 0 deletions extract_thinker/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import asyncio
from typing import List, Optional
from extract_thinker.extractor import Extractor
from extract_thinker.models import Classification
from extract_thinker.models import (
DocGroups2,
DocGroup,
DocGroups,
)
import os


class Process:
def __init__(self):
self.extractors: List[Extractor] = []
doc_groups: Optional[DocGroups] = None

def add_extractor(self, extractor: Extractor):
self.extractors.append(extractor)

def loadSplitter(self, splitter):
self.splitter = splitter
return self

def split(self, classifications: List[Classification]):
splitter = self.splitter

# Check if the file is a PDF
_, ext = os.path.splitext(self.file)
if ext.lower() != ".pdf":
raise ValueError("Invalid file type. Only PDFs are accepted.")

images = self.document_loader.convert_pdf_to_images(self.file)

groups = splitter.split_document_into_groups([self.file])

loop = asyncio.get_event_loop()
processedGroups = loop.run_until_complete(
splitter.process_split_groups(groups, classifications)
)

doc_groups = self.aggregate_split_documents_2(processedGroups)

self.doc_groups = doc_groups

return self

def aggregate_split_documents_2(doc_groups_tasks: List[DocGroups2]) -> DocGroups:
doc_groups = DocGroups()
current_group = DocGroup()
page_number = 1

# do the first group outside of the loop
doc_group = doc_groups_tasks[0]

if doc_group.belongs_to_same_document:
current_group.pages = [1, 2]
current_group.classification = doc_group.classification_page1
current_group.certainties = [
doc_group.certainty,
doc_groups_tasks[1].certainty,
]
else:
current_group.pages = [1]
current_group.classification = doc_group.classification_page1
current_group.certainties = [doc_group.certainty]

doc_groups.doc_groups.append(current_group)

current_group = DocGroup()
current_group.pages = [2]
current_group.classification = doc_group.classification_page2
current_group.certainties = [doc_groups_tasks[1].certainty]

page_number += 1

for index in range(1, len(doc_groups_tasks)):
doc_group_2 = doc_groups_tasks[index]

if doc_group_2.belongs_to_same_document:
current_group.pages.append(page_number + 1)
current_group.certainties.append(doc_group_2.certainty)
else:
doc_groups.doc_groups.append(current_group)

current_group = DocGroup()
current_group.classification = doc_group_2.classification_page2
current_group.pages = [page_number + 1]
current_group.certainties = [doc_group_2.certainty]

page_number += 1

doc_groups.doc_groups.append(current_group) # the last group

return doc_groups

def where(self, condition):
pass
Loading

0 comments on commit 17b3685

Please sign in to comment.