diff --git a/querent/__init__.py b/querent/__init__.py index 104d579c..d7d8c22e 100644 --- a/querent/__init__.py +++ b/querent/__init__.py @@ -19,9 +19,6 @@ # System Callbacks from .callback import * -# System Insights -from .insights import * - # System Logging and Utilities from .core import * from .utils import * diff --git a/querent/collectors/aws/aws_collector.py b/querent/collectors/aws/aws_collector.py index 4df72e62..0bd6aed6 100644 --- a/querent/collectors/aws/aws_collector.py +++ b/querent/collectors/aws/aws_collector.py @@ -52,8 +52,8 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]: for obj in response.get("Contents", []): file = self.download_object_as_byte_stream(obj["Key"]) async for chunk in self.read_chunks(file): - yield CollectedBytes(file=obj["Key"], data=chunk, error=None) - yield CollectedBytes(file=obj["Key"], data=None, error=None, eof=True) + yield CollectedBytes(file=obj["Key"], data=chunk, error=None, doc_source=f"s3://{self.bucket_name}/{self.region}") + yield CollectedBytes(file=obj["Key"], data=None, error=None, eof=True, doc_source=f"s3://{self.bucket_name}/{self.region}") except PermissionError as exc: self.logger.error(f"Getting Permission Error on file {file}, as {exc}") diff --git a/querent/collectors/azure/azure_collector.py b/querent/collectors/azure/azure_collector.py index 63c68aff..39c7d7f7 100644 --- a/querent/collectors/azure/azure_collector.py +++ b/querent/collectors/azure/azure_collector.py @@ -57,8 +57,8 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]: self.container_client, blob.name ) async for chunk in self.read_chunks(file): - yield CollectedBytes(file=blob.name, data=chunk, error=None) - yield CollectedBytes(file=blob.name, data=None, error=None, eof=True) + yield CollectedBytes(file=blob.name, data=chunk, error=None, doc_source=f"azure://{self.container_name}/{self.account_url}") + yield CollectedBytes(file=blob.name, data=None, error=None, eof=True, doc_source=f"azure://{self.container_name}/{self.account_url}") except Exception as e: # Handle exceptions gracefully, e.g., log the error self.logger.error(f"Error polling Azure Blob Storage: {e}") diff --git a/querent/collectors/drive/google_drive_collector.py b/querent/collectors/drive/google_drive_collector.py index 09c1288c..8f4a69b0 100644 --- a/querent/collectors/drive/google_drive_collector.py +++ b/querent/collectors/drive/google_drive_collector.py @@ -88,8 +88,8 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]: self.logger.info("No files found in Google Drive") for file in files: async for chunk in self.read_chunks(file["id"]): - yield CollectedBytes(data=chunk, file=file["name"]) - yield CollectedBytes(data=None, file=file["name"], eof=True) + yield CollectedBytes(data=chunk, file=file["name"], doc_source=f"drive://{self.folder_to_crawl}") + yield CollectedBytes(data=None, file=file["name"], eof=True, doc_source=f"drive://{self.folder_to_crawl}") except Exception as e: raise common_errors.PollingError( f"Failed to poll Google Drive: {str(e)}" diff --git a/querent/collectors/dropbox/dropbox_collector.py b/querent/collectors/dropbox/dropbox_collector.py index 11182027..3a432bed 100644 --- a/querent/collectors/dropbox/dropbox_collector.py +++ b/querent/collectors/dropbox/dropbox_collector.py @@ -69,8 +69,8 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]: file_content_bytes = response.content async for chunk in self.stream_blob(file_content_bytes): - yield CollectedBytes(file=entry.name, data=chunk) - yield CollectedBytes(file=entry.name, data=None, eof=True) + yield CollectedBytes(file=entry.name, data=chunk, doc_source=f"dropbox://{self.folder_path}") + yield CollectedBytes(file=entry.name, data=None, eof=True, doc_source=f"dropbox://{self.folder_path}") except dropbox.exceptions.ApiError as e: self.logger.error(f"Error polling Dropbox: {e}") raise common_errors.PollingError( diff --git a/querent/collectors/email/email_collector.py b/querent/collectors/email/email_collector.py index a33c8c70..7260de04 100644 --- a/querent/collectors/email/email_collector.py +++ b/querent/collectors/email/email_collector.py @@ -57,11 +57,13 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]: yield CollectedBytes( data=message, file=f"{self.config.imap_username}:{self.config.imap_folder}/{i}.email", + doc_source=f"email://{self.config.imap_server}/{self.config.imap_folder}" ) yield CollectedBytes( data=None, file=f"{self.config.imap_username}:{self.config.imap_folder}/{i}.email", eof=True, + doc_source=f"email://{self.config.imap_server}/{self.config.imap_folder}" ) except imaplib.IMAP4.error as e: self.logger.error(f"Error fetching emails from IMAP server: {e}") diff --git a/querent/collectors/fs/fs_collector.py b/querent/collectors/fs/fs_collector.py index 7af12c21..a13a2cc2 100644 --- a/querent/collectors/fs/fs_collector.py +++ b/querent/collectors/fs/fs_collector.py @@ -41,9 +41,9 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]: async with aiofiles.open(file_path, "rb") as file: file_path_str = str(file_path) async for chunk in self.read_chunks(file): - yield CollectedBytes(file=file_path_str, data=chunk, error=None) + yield CollectedBytes(file=file_path_str, data=chunk, error=None, doc_source=f"file://{self.root_dir}") yield CollectedBytes( - file=file_path_str, data=None, error=None, eof=True + file=file_path_str, data=None, error=None, eof=True, doc_source=f"file://{self.root_dir}" ) except PermissionError as exc: raise common_errors.PermissionError( diff --git a/querent/collectors/gcs/gcs_collector.py b/querent/collectors/gcs/gcs_collector.py index d4adc288..80d57b6a 100644 --- a/querent/collectors/gcs/gcs_collector.py +++ b/querent/collectors/gcs/gcs_collector.py @@ -55,8 +55,8 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]: blobs = list(bucket.list_blobs()) # Convert to a list for blob in blobs: async for chunk in self.stream_blob(blob): - yield CollectedBytes(file=blob.name, data=chunk, error=None) - yield CollectedBytes(file=blob.name, data=None, error=None, eof=True) + yield CollectedBytes(file=blob.name, data=chunk, error=None, doc_source=f"gcs://{self.bucket_name}") + yield CollectedBytes(file=blob.name, data=None, error=None, eof=True, doc_source=f"gcs://{self.bucket_name}") except Exception as e: # Handle exceptions gracefully, e.g., log the error self.logger.error(f"Error connecting to GCS: {e}") diff --git a/querent/collectors/github/github_collector.py b/querent/collectors/github/github_collector.py index 3fdfbef6..1a6db50f 100644 --- a/querent/collectors/github/github_collector.py +++ b/querent/collectors/github/github_collector.py @@ -48,9 +48,9 @@ async def fetch_files_in_folder(self, api_url): file_response.raise_for_status() file_contents = await file_response.read() # Assume process_data() is correctly implemented for your file type - yield CollectedBytes(file=item["name"], data=file_contents) + yield CollectedBytes(file=item["name"], data=file_contents, doc_source=f"github://{self.repository}") - yield CollectedBytes(file = item["name"], data = None, eof = True) + yield CollectedBytes(file = item["name"], data = None, eof = True, doc_source=f"github://{self.repository}") elif item["type"] == "dir": # Recursively fetch files in subfolders async for sub_item in self.fetch_files_in_folder(item["url"]): diff --git a/querent/collectors/jira/jira_collector.py b/querent/collectors/jira/jira_collector.py index 2e6e9428..9f0980a6 100644 --- a/querent/collectors/jira/jira_collector.py +++ b/querent/collectors/jira/jira_collector.py @@ -82,10 +82,10 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]: for issue in issues: json_issue = json.dumps(issue.raw).encode("utf-8") yield CollectedBytes( - data=json_issue, file=f"jira_issue_{issue.key}.json.jira" + data=json_issue, file=f"jira_issue_{issue.key}.json.jira", doc_source=f"jira://{self.config.jira_server}/{self.config.jira_project}" ) yield CollectedBytes( - data=None, file=f"jira_issue_{issue.key}.json.jira", eof=True + data=None, file=f"jira_issue_{issue.key}.json.jira", eof=True, doc_source=f"jira://{self.config.jira_server}/{self.config.jira_project}" ) except common_errors.ConnectionError as e: diff --git a/querent/collectors/news/news_collector.py b/querent/collectors/news/news_collector.py index 6e0fed44..bcb20a5d 100644 --- a/querent/collectors/news/news_collector.py +++ b/querent/collectors/news/news_collector.py @@ -78,8 +78,8 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]: publish_date = article.get('publishedAt').split('T')[0] title = article['title'] - yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=str(article_data).encode("utf-8")) - yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=None, error=None, eof=True) + yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=str(article_data).encode("utf-8"), doc_source=f"news://{self.config.query}") + yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=None, error=None, eof=True, doc_source=f"news://{self.config.query}") total_results = response.get("totalResults", 0) total_pages = ( @@ -93,7 +93,7 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]: break except Exception as e: self.logger.error(f"Error fetching news articles: {e}") - yield CollectedBytes(file="Error", data=None, error=e) + yield CollectedBytes(file="Error", data=None, error=e, doc_source=f"news://{self.config.query}") break # After exhausting the current batch, reset for next polling cycle @@ -137,8 +137,8 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]: publish_date = article.get('publishedAt').split('T')[0] title = article['title'] - yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=str(article_data).encode("utf-8")) - yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=None, error=None, eof=True) + yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=str(article_data).encode("utf-8"), doc_source=f"news://{self.config.query}") + yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=None, error=None, eof=True, doc_source=f"news://{self.config.query}") except Exception as e: self.logger.error(f"Error fetching news articles: {e}") diff --git a/querent/collectors/slack/slack_collector.py b/querent/collectors/slack/slack_collector.py index f757e1a6..523e7c54 100644 --- a/querent/collectors/slack/slack_collector.py +++ b/querent/collectors/slack/slack_collector.py @@ -63,6 +63,7 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]: yield CollectedBytes( file=f"slack://{self.channel}.slack", data=bytes(message["text"] + "\n\n", "utf-8"), + doc_source = f"slack://{self.channel}" ) if not response["has_more"]: @@ -71,6 +72,7 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]: data=None, error=None, eof=True, + doc_source = f"slack://{self.channel}" ) break else: diff --git a/querent/collectors/webscaper/web_scraper_collector.py b/querent/collectors/webscaper/web_scraper_collector.py index e53ab375..fa6f6341 100644 --- a/querent/collectors/webscaper/web_scraper_collector.py +++ b/querent/collectors/webscaper/web_scraper_collector.py @@ -29,7 +29,7 @@ async def poll(self): while urls_to_scrape: url = urls_to_scrape.pop() content = await self.scrape_website(url) - yield CollectedBytes(file=None, data=content.data, error=None) + yield CollectedBytes(file=None, data=content.data, error=None, doc_source=self.website_url) # Find and add links from this page to the list of URLs to scrape new_urls = self.extract_links(url) urls_to_scrape.extend(new_urls) diff --git a/querent/common/types/collected_bytes.py b/querent/common/types/collected_bytes.py index 6161026b..0f6f98ea 100644 --- a/querent/common/types/collected_bytes.py +++ b/querent/common/types/collected_bytes.py @@ -2,11 +2,12 @@ class CollectedBytes: - def __init__(self, file: str, data: bytes, error: str = None, eof: bool = False): + def __init__(self, file: str, data: bytes, error: str = None, eof: bool = False, doc_source = str): self.data = data self.error = error self.file = file self.eof = eof + self.doc_source = doc_source if self.file: file = str(file) self.extension = file.split(".")[-1] diff --git a/querent/common/types/file_buffer.py b/querent/common/types/file_buffer.py index 814d0ea7..f2987fe5 100644 --- a/querent/common/types/file_buffer.py +++ b/querent/common/types/file_buffer.py @@ -31,21 +31,19 @@ def __init__(self): def add_chunk(self, filename, chunk): try: - if chunk is None: - return self.end_file(filename) - if filename not in self.file_chunks: self.file_chunks[filename] = {} - - # Automatically assign a chunk ID + + if chunk is None: + return self.end_file(filename) + chunk_id = len(self.file_chunks[filename]) self.file_chunks[filename][chunk_id] = chunk return filename, None except Exception as e: - self.logger.error(f"Error adding a chunk: {e}") - raise Exception(f"An error occurred while adding a chunk: {e}") + return filename, None def end_file(self, filename): try: @@ -56,10 +54,9 @@ def end_file(self, filename): del self.file_chunks[filename] # Clear the file entry return filename, full_content else: - raise Exception(f"No chunks found for file: {filename}") + return filename, None except Exception as e: - self.logger.error(f"Error ending the file: {e}") - raise Exception(f"An error occurred while ending the file: {e}") + return filename, None def get_content(self, filename): try: diff --git a/querent/common/types/ingested_code.py b/querent/common/types/ingested_code.py index 02bcc2b1..64ef20be 100644 --- a/querent/common/types/ingested_code.py +++ b/querent/common/types/ingested_code.py @@ -1,10 +1,11 @@ class IngestedCode: """Class for ingested code type of data""" - def __init__(self, file: str, data: [str], error: str = None) -> None: + def __init__(self, file: str, data: [str], doc_source: str, error: str = None) -> None: self.data = data self.error = error self.file = file + self.doc_source = doc_source file = str(file) self.extension = file.rsplit(".", maxsplit=1)[-1] diff --git a/querent/common/types/ingested_images.py b/querent/common/types/ingested_images.py index ef371fd9..6004399b 100644 --- a/querent/common/types/ingested_images.py +++ b/querent/common/types/ingested_images.py @@ -12,6 +12,7 @@ def __init__( coordinates: list = [], ocr_text: list = [], error: str = None, + doc_source = str, ) -> None: self.file = file self.text = text @@ -21,6 +22,7 @@ def __init__( self.page_num = page_num self.coordinates = coordinates self.ocr_text = ocr_text + self.doc_source = doc_source file = str(file) self.extension = file.split(".")[-1] self.file_id = file.split("/")[-1].split(".")[0] diff --git a/querent/common/types/ingested_tokens.py b/querent/common/types/ingested_tokens.py index cada6c5c..c0f12c3a 100644 --- a/querent/common/types/ingested_tokens.py +++ b/querent/common/types/ingested_tokens.py @@ -1,13 +1,14 @@ -from typing import Union +from typing import Union, List class IngestedTokens: def __init__( - self, file: str, data: [str], error: str = None, is_token_stream=False + self, file: str, data: List[str], error: str = None, is_token_stream=False, doc_source=str ) -> None: self.data = data self.error = error self.is_token_stream = is_token_stream + self.doc_source = doc_source if file: self.file = file file = str(file) diff --git a/querent/common/types/querent_event.py b/querent/common/types/querent_event.py index e5f9a8c5..cccb39f2 100644 --- a/querent/common/types/querent_event.py +++ b/querent/common/types/querent_event.py @@ -9,9 +9,10 @@ class EventType: class EventState: - def __init__(self, event_type: EventType, timestamp: float, payload: Any, file: str): + def __init__(self, event_type: EventType, timestamp: float, payload: Any, file: str, doc_source: str): self.event_type = event_type self.timestamp = timestamp self.payload = payload self.file = file self.file = file + self.doc_source = doc_source diff --git a/querent/common/types/querent_queue.py b/querent/common/types/querent_queue.py index 9785c1ef..27c30069 100644 --- a/querent/common/types/querent_queue.py +++ b/querent/common/types/querent_queue.py @@ -114,6 +114,10 @@ def empty(self): """ return self.queue.empty() + + def qsize(self): + + return self.queue.qsize() def __aiter__(self): return self diff --git a/querent/core/base_engine.py b/querent/core/base_engine.py index c63de9f8..2d471aa5 100644 --- a/querent/core/base_engine.py +++ b/querent/core/base_engine.py @@ -180,10 +180,11 @@ async def _listen_for_state_changes(self): if new_state.payload == "Terminate": break new_state = { - "event_type": new_state.event_type, + "event_type": str(new_state.event_type), "timestamp": new_state.timestamp, "payload": new_state.payload, - "file": new_state.file + "file": new_state.file, + "doc_source": new_state.doc_source, } await self._notify_subscribers(new_state["event_type"], new_state) else: diff --git a/querent/core/transformers/bert_ner_opensourcellm.py b/querent/core/transformers/bert_ner_opensourcellm.py index 25b9d997..d035bdf9 100644 --- a/querent/core/transformers/bert_ner_opensourcellm.py +++ b/querent/core/transformers/bert_ner_opensourcellm.py @@ -145,6 +145,7 @@ async def process_tokens(self, data: IngestedTokens): doc_entity_pairs = [] number_sentences = 0 try: + doc_source = data.doc_source if not BERTLLM.validate_ingested_tokens(data): self.set_termination_event() return @@ -199,7 +200,6 @@ async def process_tokens(self, data: IngestedTokens): else: filtered_triples = pairs_with_predicates if not filtered_triples: - self.logger.debug("No entity pairs") return elif not self.skip_inferences: relationships = self.semantic_extractor.process_tokens(filtered_triples) @@ -212,11 +212,11 @@ async def process_tokens(self, data: IngestedTokens): if not self.termination_event.is_set(): graph_json = json.dumps(TripleToJsonConverter.convert_graphjson(triple)) if graph_json: - current_state = EventState(EventType.Graph,1.0, graph_json, file) + current_state = EventState(EventType.Graph,1.0, graph_json, file, doc_source=doc_source) await self.set_state(new_state=current_state) vector_json = json.dumps(TripleToJsonConverter.convert_vectorjson(triple)) if vector_json: - current_state = EventState(EventType.Vector,1.0, vector_json, file) + current_state = EventState(EventType.Vector,1.0, vector_json, file, doc_source=doc_source) await self.set_state(new_state=current_state) else: return @@ -224,5 +224,7 @@ async def process_tokens(self, data: IngestedTokens): return else: return filtered_triples, file + else: + return except Exception as e: self.logger.debug(f"Invalid {self.__class__.__name__} configuration. Unable to process tokens. {e}") diff --git a/querent/core/transformers/fixed_entities_set_opensourcellm.py b/querent/core/transformers/fixed_entities_set_opensourcellm.py index 43c5b7b4..e9a8ab76 100644 --- a/querent/core/transformers/fixed_entities_set_opensourcellm.py +++ b/querent/core/transformers/fixed_entities_set_opensourcellm.py @@ -127,6 +127,7 @@ async def process_tokens(self, data: IngestedTokens): doc_entity_pairs = [] number_sentences = 0 try: + doc_source = data.doc_source if not Fixed_Entities_LLM.validate_ingested_tokens(data): self.set_termination_event() return @@ -155,7 +156,6 @@ async def process_tokens(self, data: IngestedTokens): number_sentences = number_sentences + 1 else: return - if self.sample_entities: doc_entity_pairs = self.entity_context_extractor.process_entity_types(doc_entities=doc_entity_pairs) if doc_entity_pairs and any(doc_entity_pairs): @@ -176,11 +176,11 @@ async def process_tokens(self, data: IngestedTokens): if not self.termination_event.is_set(): graph_json = json.dumps(TripleToJsonConverter.convert_graphjson(triple)) if graph_json: - current_state = EventState(EventType.Graph,1.0, graph_json, file) + current_state = EventState(EventType.Graph,1.0, graph_json, file, doc_source=doc_source) await self.set_state(new_state=current_state) vector_json = json.dumps(TripleToJsonConverter.convert_vectorjson(triple)) if vector_json: - current_state = EventState(EventType.Vector,1.0, vector_json, file) + current_state = EventState(EventType.Vector,1.0, vector_json, file, doc_source=doc_source) await self.set_state(new_state=current_state) else: return diff --git a/querent/core/transformers/gpt_llm_bert_ner_or_fixed_entities_set_ner.py b/querent/core/transformers/gpt_llm_bert_ner_or_fixed_entities_set_ner.py index ca9885f5..6855b7a3 100644 --- a/querent/core/transformers/gpt_llm_bert_ner_or_fixed_entities_set_ner.py +++ b/querent/core/transformers/gpt_llm_bert_ner_or_fixed_entities_set_ner.py @@ -255,8 +255,10 @@ async def process_tokens(self, data: IngestedTokens): if not GPTLLM.validate_ingested_tokens(data): self.set_termination_event() return + + doc_source = data.doc_source relationships = [] - result = await self.llm_instance.process_tokens(data) + result = await self.llm_instance.process_tokens(data) if not result: return else: filtered_triples, file = result @@ -280,11 +282,11 @@ async def process_tokens(self, data: IngestedTokens): if not self.termination_event.is_set(): graph_json = json.dumps(TripleToJsonConverter.convert_graphjson(triple)) if graph_json: - current_state = EventState(EventType.Graph,1.0, graph_json, file) + current_state = EventState(EventType.Graph,1.0, graph_json, file, doc_source=doc_source) await self.set_state(new_state=current_state) vector_json = json.dumps(TripleToJsonConverter.convert_vectorjson(triple)) if vector_json: - current_state = EventState(EventType.Vector,1.0, vector_json, file) + current_state = EventState(EventType.Vector,1.0, vector_json, file, doc_source=doc_source) await self.set_state(new_state=current_state) else: return diff --git a/querent/core/transformers/gpt_llm_gpt_ner.py b/querent/core/transformers/gpt_llm_gpt_ner.py index 5e4e5667..28cf9e52 100644 --- a/querent/core/transformers/gpt_llm_gpt_ner.py +++ b/querent/core/transformers/gpt_llm_gpt_ner.py @@ -184,6 +184,7 @@ async def process_tokens(self, data: IngestedTokens): self.set_termination_event() return + doc_source = data.doc_source if data.data: clean_text = ' '.join(data.data) #clean_text = unidecode(single_string) @@ -246,14 +247,14 @@ async def process_tokens(self, data: IngestedTokens): if not self.termination_event.is_set(): graph_json = json.dumps(triple) if graph_json: - current_state = EventState(EventType.Graph,1.0, graph_json, file) + current_state = EventState(EventType.Graph,1.0, graph_json, file, doc_source=doc_source) await self.set_state(new_state=current_state) context_embeddings = self.create_emb.get_embeddings([triple['sentence']])[0] triple['context_embeddings'] = context_embeddings triple['context'] = triple['sentence'] vector_json = json.dumps(TripleToJsonConverter.convert_vectorjson((triple['subject'],json.dumps(triple), triple['object']))) if vector_json: - current_state = EventState(EventType.Vector,1.0, vector_json, file) + current_state = EventState(EventType.Vector,1.0, vector_json, file, doc_source=doc_source) await self.set_state(new_state=current_state) else: return diff --git a/querent/ingestors/audio/audio_ingestors.py b/querent/ingestors/audio/audio_ingestors.py index a12c6cf9..728eaa2f 100644 --- a/querent/ingestors/audio/audio_ingestors.py +++ b/querent/ingestors/audio/audio_ingestors.py @@ -81,25 +81,26 @@ async def ingest( async for text in self.extract_and_process_audio( CollectedBytes(file=current_file, data=collected_bytes) ): - yield IngestedTokens(file=current_file, data=[text], error=None) + yield IngestedTokens(file=current_file, data=[text], error=None, doc_source= chunk_bytes.doc_source) yield IngestedTokens( file=current_file, data=None, error=None, + doc_source= chunk_bytes.doc_source ) collected_bytes = b"" current_file = chunk_bytes.file collected_bytes += chunk_bytes.data except Exception as e: - yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}") + yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}", doc_source=chunk_bytes.doc_source) finally: # process the last file async for text in self.extract_and_process_audio( CollectedBytes(file=current_file, data=collected_bytes) ): - yield IngestedTokens(file=current_file, data=[text], error=None) + yield IngestedTokens(file=current_file, data=[text], error=None, doc_source=chunk_bytes.doc_source) - yield IngestedTokens(file=current_file, data=None, error=None) + yield IngestedTokens(file=current_file, data=None, error=None, doc_source=chunk_bytes.doc_source) async def extract_and_process_audio( self, collected_bytes: CollectedBytes diff --git a/querent/ingestors/code/code_ingestor.py b/querent/ingestors/code/code_ingestor.py index dd633639..bc3da581 100644 --- a/querent/ingestors/code/code_ingestor.py +++ b/querent/ingestors/code/code_ingestor.py @@ -77,11 +77,13 @@ async def ingest( file=current_file, data=[line], error=None, + doc_source=chunk_bytes.doc_source, ) yield IngestedTokens( file=current_file, data=None, error=None, + doc_source=chunk_bytes.doc_source ) collected_bytes = b"" current_file = chunk_bytes.file @@ -94,11 +96,12 @@ async def ingest( file=current_file, data=[line], error=None, + doc_source=chunk_bytes.doc_source ) - yield IngestedTokens(file=current_file, data=None, error=None) + yield IngestedTokens(file=current_file, data=None, error=None, doc_source=chunk_bytes.doc_source) except Exception as exc: yield IngestedTokens( - file=current_file, data=None, error=f"Exception: {exc}" + file=current_file, data=None, error=f"Exception: {exc}", doc_source=chunk_bytes.doc_source, ) raise Exception from exc diff --git a/querent/ingestors/csv/csv_ingestor.py b/querent/ingestors/csv/csv_ingestor.py index ee597558..527b18b5 100644 --- a/querent/ingestors/csv/csv_ingestor.py +++ b/querent/ingestors/csv/csv_ingestor.py @@ -49,25 +49,26 @@ async def ingest( async for row in self.extract_and_process_csv( CollectedBytes(file=current_file, data=collected_bytes) ): - yield IngestedTokens(file=current_file, data=[row], error=None) + yield IngestedTokens(file=current_file, data=[row], error=None, doc_source=chunk_bytes.doc_source) yield IngestedTokens( file=current_file, data=None, error=None, + doc_source=chunk_bytes.doc_source, ) collected_bytes = b"" current_file = chunk_bytes.file collected_bytes += chunk_bytes.data except Exception as e: - yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}") + yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}", doc_source=chunk_bytes.doc_source) finally: # process the last file async for row in self.extract_and_process_csv( CollectedBytes(file=current_file, data=collected_bytes) ): - yield IngestedTokens(file=current_file, data=[row], error=None) + yield IngestedTokens(file=current_file, data=[row], error=None,doc_source=chunk_bytes.doc_source) - yield IngestedTokens(file=current_file, data=None, error=None) + yield IngestedTokens(file=current_file, data=None, error=None, doc_source=chunk_bytes.doc_source) async def extract_and_process_csv( self, collected_bytes: CollectedBytes diff --git a/querent/ingestors/doc/doc_ingestor.py b/querent/ingestors/doc/doc_ingestor.py index b59797ba..1913fcf8 100644 --- a/querent/ingestors/doc/doc_ingestor.py +++ b/querent/ingestors/doc/doc_ingestor.py @@ -54,13 +54,15 @@ async def ingest( elif current_file != chunk_bytes.file: # we have a new file, process the old one async for ingested_data in self.extract_and_process_doc( - CollectedBytes(file=current_file, data=collected_bytes) + CollectedBytes(file=current_file, data=collected_bytes), doc_source=chunk_bytes.doc_source ): + ingested_data.doc_source = chunk_bytes.doc_source yield ingested_data yield IngestedTokens( file=current_file, data=None, error=None, + doc_source=chunk_bytes.doc_source ) collected_bytes = b"" current_file = chunk_bytes.file @@ -70,18 +72,18 @@ async def ingest( finally: # process the last file async for ingested_data in self.extract_and_process_doc( - CollectedBytes(file=current_file, data=collected_bytes) + CollectedBytes(file=current_file, data=collected_bytes), doc_source=chunk_bytes.doc_source ): yield ingested_data - yield IngestedTokens(file=current_file, data=None, error=None) + yield IngestedTokens(file=current_file, data=None, doc_source = chunk_bytes.doc_source, error=None) async def extract_and_process_doc( - self, collected_bytes: CollectedBytes + self, collected_bytes: CollectedBytes, doc_source: str ) -> AsyncGenerator[str, None]: - async for paragraph in self.extract_text_from_doc(collected_bytes): + async for paragraph in self.extract_text_from_doc(collected_bytes, doc_source): yield paragraph - async def extract_text_from_doc(self, collected_bytes: CollectedBytes): + async def extract_text_from_doc(self, collected_bytes: CollectedBytes, doc_source: str): # Determine file extension file_extension = collected_bytes.extension.lower() if file_extension == "docx": @@ -92,7 +94,7 @@ async def extract_text_from_doc(self, collected_bytes: CollectedBytes): text = await self.process_data(text) yield IngestedTokens( - file=collected_bytes.file, data=text, error=None + file=collected_bytes.file, data=text, error=None, doc_source=doc_source ) # i = 1 @@ -107,7 +109,7 @@ async def extract_text_from_doc(self, collected_bytes: CollectedBytes): elif file_extension == "doc": current_doc_text = await self.temp_extract_from(collected_bytes) yield IngestedTokens( - file=collected_bytes.file, data=current_doc_text, error=None + file=collected_bytes.file, data=current_doc_text, error=None, doc_source=doc_source ) else: raise common_errors.UnknownError( diff --git a/querent/ingestors/email/email_ingestor.py b/querent/ingestors/email/email_ingestor.py index aea3e643..1de1006f 100644 --- a/querent/ingestors/email/email_ingestor.py +++ b/querent/ingestors/email/email_ingestor.py @@ -58,17 +58,19 @@ async def ingest( file=current_file, data=[email], error=None, + doc_source=chunk_bytes.doc_source, ) yield IngestedTokens( file=current_file, data=None, error=None, + doc_source=chunk_bytes.doc_source ) collected_bytes = b"" current_file = chunk_bytes.file collected_bytes += chunk_bytes.data except Exception as e: - yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}") + yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}", doc_source=chunk_bytes.doc_source) finally: if current_file is not None: email = await self.extract_and_process_email( @@ -78,11 +80,13 @@ async def ingest( file=current_file, data=[email], error=None, + doc_source=chunk_bytes.doc_source ) yield IngestedTokens( file=current_file, data=None, error=None, + doc_source=chunk_bytes.doc_source, ) async def extract_and_process_email( diff --git a/querent/ingestors/github/github_ingestor.py b/querent/ingestors/github/github_ingestor.py index 8c1df80f..ade2bfbf 100644 --- a/querent/ingestors/github/github_ingestor.py +++ b/querent/ingestors/github/github_ingestor.py @@ -33,13 +33,15 @@ async def ingest( ): yield IngestedCode( file=current_file, - data=[line], # Wrap line in a list + data=[line], error=None, + doc_source=chunk_bytes.doc_source, ) yield IngestedCode( file=current_file, data=None, error=None, + doc_source=chunk_bytes.doc_source, ) collected_bytes = b"" current_file = chunk_bytes.file @@ -52,12 +54,13 @@ async def ingest( ): yield IngestedCode( file=current_file, - data=[line], # Wrap line in a list + data=[line], error=None, + doc_source=chunk_bytes.doc_source, ) - yield IngestedCode(file=current_file, data=None, error=None) + yield IngestedCode(file=current_file, data=None, error=None, doc_source=chunk_bytes.doc_source) except Exception as e: - yield IngestedCode(file=current_file, data=None, error=f"Exception: {e}") + yield IngestedCode(file=current_file, data=None, error=f"Exception: {e}", doc_source=chunk_bytes.doc_source) async def extract_and_process_code( self, chunk_bytes: CollectedBytes diff --git a/querent/ingestors/html/html_ingestor.py b/querent/ingestors/html/html_ingestor.py index a5f3efc3..37c0bfe6 100644 --- a/querent/ingestors/html/html_ingestor.py +++ b/querent/ingestors/html/html_ingestor.py @@ -51,36 +51,37 @@ async def ingest( elif current_file != chunk_bytes.file: # we have a new file, process the old one async for ingested_data in self.extract_and_process_html( - CollectedBytes(file=current_file, data=collected_bytes) + CollectedBytes(file=current_file, data=collected_bytes), chunk_bytes.doc_source ): yield ingested_data yield IngestedTokens( file=current_file, data=None, error=None, + doc_source=chunk_bytes.doc_source, ) collected_bytes = b"" current_file = chunk_bytes.file collected_bytes += chunk_bytes.data except Exception as e: - yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}") + yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}", doc_source=chunk_bytes.doc_source) finally: # process the last file async for ingested_data in self.extract_and_process_html( - CollectedBytes(file=current_file, data=collected_bytes) + CollectedBytes(file=current_file, data=collected_bytes), chunk_bytes.doc_source ): yield ingested_data - yield IngestedTokens(file=current_file, data=None, error=None) + yield IngestedTokens(file=current_file, data=None, error=None, doc_source=chunk_bytes.doc_source) async def extract_and_process_html( - self, collected_bytes: CollectedBytes + self, collected_bytes: CollectedBytes, doc_source: str ) -> AsyncGenerator[str, None]: """Function to extract and process xml files""" - async for elements in self.extract_text_from_html(collected_bytes): + async for elements in self.extract_text_from_html(collected_bytes, doc_source): yield elements async def extract_text_from_html( - self, collected_bytes: CollectedBytes + self, collected_bytes: CollectedBytes, doc_source: str ): """Function to extract text from xml""" try: @@ -97,16 +98,16 @@ async def extract_text_from_html( element_text = element.get_text().strip() elements.append(element_text) - # i = 1 - # for img_tag in soup.find_all('img'): - # img_src = img_tag.get('src') - # if img_src and img_src.startswith('data:image'): - # base64_data = img_src.split(';base64,')[-1] - # image_data = base64.b64decode(base64_data) - # image_ocr = await self.process_image(io.BytesIO(image_data)) + i = 1 + for img_tag in soup.find_all('img'): + img_src = img_tag.get('src') + if img_src and img_src.startswith('data:image'): + base64_data = img_src.split(';base64,')[-1] + image_data = base64.b64decode(base64_data) + image_ocr = await self.process_image(io.BytesIO(image_data)) - # yield IngestedImages(file = collected_bytes.file, image = base64_data, image_name = str(uuid.uuid4()), page_num=i, text = soup, ocr_text = image_ocr, coordinates= None, error= None) - # i += 1 + yield IngestedImages(file = collected_bytes.file, image = base64_data, image_name = str(uuid.uuid4()), page_num=i, text = soup, ocr_text = image_ocr, coordinates= None, error= None, doc_source=doc_source) + i += 1 except UnicodeDecodeError as exc: raise common_errors.UnicodeDecodeError( f"Getting UnicodeDecodeError on this file {collected_bytes.file} as {exc}" @@ -121,7 +122,7 @@ async def extract_text_from_html( ) from exc for element in elements: processed_element = await self.process_data(element) - yield IngestedTokens(file=collected_bytes.file, data=[processed_element], error=None) + yield IngestedTokens(file=collected_bytes.file, data=[processed_element], error=None, doc_source=doc_source) async def process_data(self, text: str) -> str: if self.processors is None or len(self.processors) == 0: diff --git a/querent/ingestors/images/image_ingestor.py b/querent/ingestors/images/image_ingestor.py index 72ddcc4d..29d8d519 100644 --- a/querent/ingestors/images/image_ingestor.py +++ b/querent/ingestors/images/image_ingestor.py @@ -52,8 +52,8 @@ async def ingest( text = await self.extract_and_process_image( CollectedBytes(file=current_file, data=collected_bytes) ) - yield IngestedTokens(file=current_file, data=[text], error=None) - yield IngestedTokens(file=current_file, data=None, error=None) + yield IngestedTokens(file=current_file, data=[text], error=None, doc_source=chunk_bytes.doc_source) + yield IngestedTokens(file=current_file, data=None, error=None, doc_source=chunk_bytes.doc_source) current_file = chunk_bytes.file collected_bytes = b"" @@ -64,11 +64,11 @@ async def ingest( text = await self.extract_and_process_image( CollectedBytes(file=current_file, data=collected_bytes) ) - yield IngestedTokens(file=current_file, data=[text], error=None) - yield IngestedTokens(file=current_file, data=None, error=None) + yield IngestedTokens(file=current_file, data=[text], error=None, doc_source=chunk_bytes.doc_source) + yield IngestedTokens(file=current_file, data=None, error=None, doc_source=chunk_bytes.doc_source) except Exception as e: - yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}") + yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}", doc_source=chunk_bytes.doc_source) async def extract_and_process_image(self, collected_bytes: CollectedBytes) -> str: text = await self.extract_text_from_image(collected_bytes) diff --git a/querent/ingestors/ingestor_manager.py b/querent/ingestors/ingestor_manager.py index f9b86196..997568b8 100644 --- a/querent/ingestors/ingestor_manager.py +++ b/querent/ingestors/ingestor_manager.py @@ -109,7 +109,7 @@ def __init__( IngestorBackend.Slack.value: TextIngestorFactory(is_token_stream=True), IngestorBackend.Email.value: EmailIngestorFactory(), IngestorBackend.Jira.value: JsonIngestorFactory(), - IngestorBackend.News.value: TextIngestorFactory(is_token_stream=True) + IngestorBackend.News.value: TextIngestorFactory(is_token_stream=True), # Add more mappings as needed } self.file_caches = LRUCache(maxsize=cache_size) @@ -141,8 +141,6 @@ async def supports(self, file_extension: str) -> bool: async def ingest_file_async( self, file_id: str, - result_queue: Optional[Queue] = None, - tokens_feader: Optional[ChannelCommandInterface] = None, ): collected_bytes_list = None try: @@ -158,21 +156,8 @@ async def chunk_generator() -> AsyncGenerator[CollectedBytes, None]: yield chunk async for chunk_tokens in ingestor.ingest(chunk_generator()): - if result_queue is not None: - result_queue.put_nowait(chunk_tokens) - if tokens_feader is not None: - tokens_feader.send_tokens_in_rust( - { - "data": ( - chunk_tokens.data - if type(chunk_tokens) == IngestedTokens - else chunk_tokens.ocr_text - ), - "file": chunk_tokens.file, - "is_token_stream": True, - } - ) - await asyncio.sleep(0.1) + if self.result_queue is not None: + await self.result_queue.put(chunk_tokens) else: self.logger.warning( f"Unsupported file extension {file_extension} for file {collected_bytes_list[0].file}" @@ -185,8 +170,6 @@ async def chunk_generator() -> AsyncGenerator[CollectedBytes, None]: async def ingest_collector_async( self, collector: Collector, - result_queue: Optional[Queue] = None, - token_feader: Optional[ChannelCommandInterface] = None, ): """Asynchronously ingest data from a single collector.""" async for collected_bytes in collector.poll(): @@ -203,9 +186,7 @@ async def ingest_collector_async( if collected_bytes.eof: # Try to ingest the ongoing file even if the cache is full try: - await self.ingest_file_async( - current_file, result_queue, token_feader - ) + await self.ingest_file_async(current_file) except Exception as e: self.logger.error(f"Error ingesting file {current_file}: {str(e)}") @@ -216,10 +197,7 @@ async def ingest_collector_async( async def ingest_all_async(self): """Asynchronously ingest data from all collectors concurrently.""" ingestion_tasks = [ - self.ingest_collector_async( - collector, self.result_queue, self.tokens_feader - ) - for collector in self.collectors + self.ingest_collector_async(collector) for collector in self.collectors ] await asyncio.gather(*ingestion_tasks) if self.result_queue is not None: diff --git a/querent/ingestors/json/json_ingestor.py b/querent/ingestors/json/json_ingestor.py index a68308a3..48bdeb1a 100644 --- a/querent/ingestors/json/json_ingestor.py +++ b/querent/ingestors/json/json_ingestor.py @@ -47,13 +47,14 @@ async def ingest( CollectedBytes(file=current_file, data=collected_bytes) ): yield IngestedTokens( - file=current_file, data=[json_objects], error=None + file=current_file, data=[json_objects], error=None, doc_source=chunk_bytes.doc_source ) if current_file: yield IngestedTokens( file=current_file, data=None, error=None, + doc_source=chunk_bytes.doc_source ) collected_bytes = b"" current_file = chunk_bytes.file @@ -62,21 +63,21 @@ async def ingest( except json.JSONDecodeError: yield IngestedTokens( - file=current_file, data=None, error="JSON Decode Error" + file=current_file, data=None, error="JSON Decode Error", doc_source=chunk_bytes.doc_source ) finally: async for json_objects in self.extract_and_process_json( CollectedBytes(file=current_file, data=collected_bytes) ): yield IngestedTokens( - file=current_file, data=[json_objects], error=None + file=current_file, data=[json_objects], error=None, doc_source=chunk_bytes.doc_source ) - yield IngestedTokens(file=current_file, data=None, error=None) + yield IngestedTokens(file=current_file, data=None, error=None, doc_source=chunk_bytes.doc_source) except Exception as e: - yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}") + yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}", doc_source=chunk_bytes.doc_source) - async def extract_and_process_json(self, collected_bytes: CollectedBytes) -> str: + async def extract_and_process_json(self, collected_bytes: CollectedBytes): try: json_data = collected_bytes.data.decode("utf-8") processed_text = await self.process_data(json_data) diff --git a/querent/ingestors/pdfs/pdf_ingestor_v1.py b/querent/ingestors/pdfs/pdf_ingestor_v1.py index 55a1f880..06a6a266 100644 --- a/querent/ingestors/pdfs/pdf_ingestor_v1.py +++ b/querent/ingestors/pdfs/pdf_ingestor_v1.py @@ -55,7 +55,7 @@ async def ingest( elif current_file != chunk_bytes.file: # we have a new file, process the old one async for page_text in self.extract_and_process_pdf( - CollectedBytes(file=current_file, data=collected_bytes) + CollectedBytes(file=current_file, data=collected_bytes), chunk_bytes.doc_source ): yield page_text collected_bytes = b"" @@ -64,28 +64,30 @@ async def ingest( file=current_file, data=None, error=None, + doc_source=chunk_bytes.doc_source, ) collected_bytes += chunk_bytes.data except Exception as e: # at the queue level, we can sample out the error - yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}") + yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}", doc_source=chunk_bytes.doc_source) finally: # process the last file try: async for page_text in self.extract_and_process_pdf( - CollectedBytes(file=current_file, data=collected_bytes) + CollectedBytes(file=current_file, data=collected_bytes), chunk_bytes.doc_source ): yield page_text - yield IngestedTokens(file=current_file, data=None, error=None) + yield IngestedTokens(file=current_file, data=None, error=None, doc_source=chunk_bytes.doc_source) except Exception as exc: yield IngestedTokens( file=current_file, data=None, error=f"Exception: {exc}", + doc_source=chunk_bytes.doc_source, ) async def extract_and_process_pdf( - self, collected_bytes: CollectedBytes + self, collected_bytes: CollectedBytes, doc_source: str ) -> AsyncGenerator[IngestedTokens, None]: try: path = BytesIO(collected_bytes.data) @@ -105,6 +107,7 @@ async def extract_and_process_pdf( file=collected_bytes.file, data=processed_text, error=collected_bytes.error, + doc_source=doc_source, ) # async for image_result in self.extract_images_and_ocr( # page, diff --git a/querent/ingestors/ppt/ppt_ingestor.py b/querent/ingestors/ppt/ppt_ingestor.py index c7a96ab2..86a50ce9 100644 --- a/querent/ingestors/ppt/ppt_ingestor.py +++ b/querent/ingestors/ppt/ppt_ingestor.py @@ -56,28 +56,29 @@ async def ingest( current_file = chunk_bytes.file elif current_file != chunk_bytes.file: async for ingested_data in self.extract_and_process_ppt( - CollectedBytes(file=current_file, data=collected_bytes) + CollectedBytes(file=current_file, data=collected_bytes), chunk_bytes.doc_source ): yield ingested_data yield IngestedTokens( file=current_file, data=None, error=None, + doc_source=chunk_bytes.doc_source, ) collected_bytes = b"" current_file = chunk_bytes.file collected_bytes += chunk_bytes.data except Exception as e: - yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}") + yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}", doc_source=chunk_bytes.doc_source) finally: async for ingested_data in self.extract_and_process_ppt( - CollectedBytes(file=current_file, data=collected_bytes) + CollectedBytes(file=current_file, data=collected_bytes), chunk_bytes.doc_source ): yield ingested_data - yield IngestedTokens(file=current_file, data=None, error=None) + yield IngestedTokens(file=current_file, data=None, error=None, doc_source=chunk_bytes.doc_source) async def extract_and_process_ppt( - self, collected_bytes: CollectedBytes + self, collected_bytes: CollectedBytes, doc_source: str ) -> AsyncGenerator[str, None]: try: if collected_bytes.extension == "pptx": @@ -95,7 +96,7 @@ async def extract_and_process_ppt( slide_text = "\n".join(text) processed_slide_text = await self.process_data(slide_text) yield IngestedTokens( - file=collected_bytes.file, data=processed_slide_text, error=None + file=collected_bytes.file, data=processed_slide_text, error=None, doc_source=doc_source ) i+=1 elif collected_bytes.extension == "ppt": @@ -103,7 +104,7 @@ async def extract_and_process_ppt( extracted_text = parsed["content"] processed_text = await self.process_data(extracted_text) yield IngestedTokens( - file=collected_bytes.file, data=processed_text, error=None + file=collected_bytes.file, data=processed_text, error=None, doc_source=doc_source ) else: raise common_errors.WrongPptFileError( diff --git a/querent/ingestors/texts/text_ingestor.py b/querent/ingestors/texts/text_ingestor.py index 97e73bcd..e2e90f97 100644 --- a/querent/ingestors/texts/text_ingestor.py +++ b/querent/ingestors/texts/text_ingestor.py @@ -50,6 +50,7 @@ async def ingest( data=[process_text], error=None, is_token_stream=True, + doc_source=chunk_bytes.doc_source ) else: if current_file is None: @@ -60,13 +61,15 @@ async def ingest( ): yield IngestedTokens( file=current_file, - data=[line], # Wrap line in a list + data=[line], error=None, + doc_source=chunk_bytes.doc_source ) yield IngestedTokens( file=current_file, data=None, error=None, + doc_source=chunk_bytes.doc_source ) collected_bytes = b"" current_file = chunk_bytes.file @@ -78,12 +81,13 @@ async def ingest( ): yield IngestedTokens( file=current_file, - data=[line], # Wrap line in a list + data=[line], error=None, + doc_source=chunk_bytes.doc_source, ) - yield IngestedTokens(file=current_file, data=None, error=None) + yield IngestedTokens(file=current_file, data=None, error=None, doc_source=chunk_bytes.doc_source) except Exception as e: - yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}") + yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}", doc_source=chunk_bytes.doc_source) async def ingest_token_stream( self, chunk_bytes: CollectedBytes diff --git a/querent/ingestors/video/video_ingestor.py b/querent/ingestors/video/video_ingestor.py index 1479d35a..f1affbf4 100644 --- a/querent/ingestors/video/video_ingestor.py +++ b/querent/ingestors/video/video_ingestor.py @@ -47,26 +47,27 @@ async def ingest(self, poll_function: AsyncGenerator[CollectedBytes, None]) -> A async for text in self.extract_and_process_video( CollectedBytes(file=current_file, data=collected_bytes) ): - yield IngestedTokens(file=current_file, data=[text], error=None) + yield IngestedTokens(file=current_file, data=[text], error=None, doc_source=chunk_bytes.doc_source) # Ensure a final yield for the last processed text yield IngestedTokens( file=current_file, data=None, error=None, + doc_source=chunk_bytes.doc_source ) collected_bytes = b"" current_file = chunk_bytes.file collected_bytes += chunk_bytes.data except Exception as e: - yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}") + yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}", doc_source=chunk_bytes.doc_source) finally: if collected_bytes: # Check if there's data left to process for the last file async for text in self.extract_and_process_video( CollectedBytes(file=current_file, data=collected_bytes) ): - yield IngestedTokens(file=current_file, data=[text], error=None) + yield IngestedTokens(file=current_file, data=[text], error=None, doc_source=chunk_bytes.doc_source) # Ensure a final yield for the last processed text - yield IngestedTokens(file=current_file, data=None, error=None) + yield IngestedTokens(file=current_file, data=None, error=None, doc_source=chunk_bytes.doc_source) async def extract_and_process_video(self, collected_bytes: CollectedBytes) -> AsyncGenerator[str, None]: text = "" diff --git a/querent/ingestors/xlsx/xlsx_ingestor.py b/querent/ingestors/xlsx/xlsx_ingestor.py index f30c34af..309e7ef7 100644 --- a/querent/ingestors/xlsx/xlsx_ingestor.py +++ b/querent/ingestors/xlsx/xlsx_ingestor.py @@ -52,17 +52,19 @@ async def ingest( file=current_file, data=[data], error=None, + doc_source=chunk_bytes.doc_source ) yield IngestedTokens( file=current_file, data=None, error=None, + doc_source=chunk_bytes.doc_source ) collected_bytes = b"" current_file = chunk_bytes.file collected_bytes += chunk_bytes.data except Exception as e: - yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}") + yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}", doc_source=chunk_bytes.doc_source) finally: async for data in self.extract_and_process_xlsx( CollectedBytes(file=current_file, data=collected_bytes) @@ -71,8 +73,9 @@ async def ingest( file=current_file, data=[data], error=None, + doc_source=chunk_bytes.doc_source ) - yield IngestedTokens(file=current_file, data=None, error=None) + yield IngestedTokens(file=current_file, data=None, error=None, doc_source=chunk_bytes.doc_source) async def extract_and_process_xlsx( self, collected_bytes: CollectedBytes diff --git a/querent/ingestors/xml/xml_ingestor.py b/querent/ingestors/xml/xml_ingestor.py index 6390ec63..e291d27c 100644 --- a/querent/ingestors/xml/xml_ingestor.py +++ b/querent/ingestors/xml/xml_ingestor.py @@ -53,24 +53,25 @@ async def ingest( async for text in self.extract_and_process_xml( CollectedBytes(file=current_file, data=collected_bytes) ): - yield IngestedTokens(file=current_file, data=[text], error=None) + yield IngestedTokens(file=current_file, data=[text], error=None, doc_source=chunk_bytes.doc_source) yield IngestedTokens( file=current_file, data=None, error=None, + doc_source=chunk_bytes.doc_source ) collected_bytes = b"" current_file = chunk_bytes.file collected_bytes += chunk_bytes.data except Exception as e: - yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}") + yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}", doc_source=chunk_bytes.doc_source) finally: async for text in self.extract_and_process_xml( CollectedBytes(file=current_file, data=collected_bytes) ): - yield IngestedTokens(file=current_file, data=[text], error=None) + yield IngestedTokens(file=current_file, data=[text], error=None, doc_source=chunk_bytes.doc_source) - yield IngestedTokens(file=current_file, data=None, error=None) + yield IngestedTokens(file=current_file, data=None, error=None, doc_source=chunk_bytes.doc_source) async def extract_and_process_xml( self, collected_bytes: CollectedBytes diff --git a/querent/insights/__init__.py b/querent/insights/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/querent/search/__init__.py b/querent/search/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/querent/storage/s3-data-management.py b/querent/storage/s3-data-management.py deleted file mode 100644 index c740fdc0..00000000 --- a/querent/storage/s3-data-management.py +++ /dev/null @@ -1,95 +0,0 @@ -# # -----------------------Upload------------------------------------- - -# import boto3 - -# # Your AWS credentials and the S3 bucket details -# aws_access_key_id = 'AKIA5ZFZH6CA6LDWIPV5' -# aws_secret_access_key = 'wdlGk5xuwEukpN6tigXV0S+CMJKdyQse2BgYjw9o' -# bucket_name = 'pstreamsbucket1' -# file_path = 'english.pdf' # The local path to the file -# file_key = 'english.pdf' # The key under which to store the file in S3 - -# # Create an S3 client -# s3 = boto3.client( -# 's3', -# aws_access_key_id=aws_access_key_id, -# aws_secret_access_key=aws_secret_access_key, -# # If you're using a specific region, uncomment and set the region_name parameter -# # region_name='your-bucket-region' -# ) - -# # Upload the file -# with open(file_path, 'rb') as file: -# s3.upload_fileobj( -# Fileobj=file, -# Bucket=bucket_name, -# Key=file_key, -# ExtraArgs={'ContentType': 'application/pdf'} -# ) - -# print(f"File {file_path} uploaded to {bucket_name}/{file_key}") - - -# # ----------------------------------Delete-------------------------------------- -# import boto3 - -# # Your AWS access key ID and secret access key -# aws_access_key_id = 'AKIA5ZFZH6CA6LDWIPV5' -# aws_secret_access_key = 'wdlGk5xuwEukpN6tigXV0S+CMJKdyQse2BgYjw9o' - -# # The name of the bucket and the key (file name) you want to delete -# bucket_name = 'pstreamsbucket1' -# file_key = 'sample123.pdf' - -# # Create an S3 client -# s3 = boto3.client( -# 's3', -# aws_access_key_id=aws_access_key_id, -# aws_secret_access_key=aws_secret_access_key, -# # If you're using a specific region, uncomment and set the region_name parameter -# # region_name='your-bucket-region' -# ) - -# # Function to delete a file from the specified S3 bucket -# def delete_file(bucket, key): -# try: -# response = s3.delete_object(Bucket=bucket, Key=key) -# print(f"File {key} deleted from bucket {bucket}.") -# except Exception as e: -# print(e) - -# delete_file(bucket_name, file_key) - - -# ------------------------------------List -------------------------------------- -# import boto3 - -# # Your AWS access key ID and secret access key -# aws_access_key_id = 'AKIA5ZFZH6CA6LDWIPV5' -# aws_secret_access_key = 'wdlGk5xuwEukpN6tigXV0S+CMJKdyQse2BgYjw9o' - -# # The name of the bucket you want to list files from -# bucket_name = 'pstreamsbucket1' - -# # Create an S3 client -# s3 = boto3.client( -# 's3', -# aws_access_key_id=aws_access_key_id, -# aws_secret_access_key=aws_secret_access_key, -# # If you're using a specific region, uncomment and set the region_name parameter -# # region_name='your-bucket-region' -# ) - -# # List files in the specified S3 bucket -# def list_files(bucket): -# try: -# response = s3.list_objects_v2(Bucket=bucket) -# if 'Contents' in response: -# for file in response['Contents']: -# print(file['Key']) -# else: -# print("No files found.") -# except Exception as e: -# print(e) - -# list_files(bucket_name) diff --git a/querent/workflow/_helpers.py b/querent/workflow/_helpers.py index e8153241..f98a685c 100644 --- a/querent/workflow/_helpers.py +++ b/querent/workflow/_helpers.py @@ -37,7 +37,7 @@ def setup_nltk_and_spacy_paths(config, search_directory): config.engines[0].spacy_model_path = spacy_model_path -async def start_collectors(config: Config): +async def start_collectors(config: Config, result_queue: QuerentQueue): collectors = [] for collector_config in config.collectors: uri = Uri(collector_config.uri) @@ -51,8 +51,8 @@ async def start_collectors(config: Config): ingestor_factory_manager = IngestorFactoryManager( collectors=collectors, - result_queue=None, - tokens_feader=config.workflow.tokens_feader, + result_queue=result_queue, + tokens_feader=None, processors=[text_cleanup_processor, text_processor] ) @@ -146,7 +146,7 @@ async def receive_token_feeder( tokens = config.workflow.tokens_feader.receive_tokens_in_python() if tokens is not None: ingested_tokens = IngestedTokens( - file=tokens.get("file", None), data=tokens.get("data", None), is_token_stream= tokens.get("is_token_stream"), + file=tokens.get("file", None), data=tokens.get("data", None), is_token_stream= tokens.get("is_token_stream"), doc_source=tokens.get("doc_source", "") ) await result_queue.put(ingested_tokens) else: diff --git a/querent/workflow/workflow.py b/querent/workflow/workflow.py index ec410a4b..875f0d16 100644 --- a/querent/workflow/workflow.py +++ b/querent/workflow/workflow.py @@ -30,7 +30,7 @@ async def start_workflow(config_dict: dict): engine_params_json["fixed_entities"] = [x for x in engine_params.get("fixed_entities").split(",")] if engine_params.get("sample_entities") is not None: - engine_params_json["sample_entities"] = [x for x in engine_params.get("fixed_entities").split(",")] + engine_params_json["sample_entities"] = [x for x in engine_params.get("sample_entities").split(",")] if engine_params.get("ner_model_name") is not None: engine_params_json["ner_model_name"] = engine_params.get("ner_model_name") @@ -83,71 +83,10 @@ async def start_workflow(config_dict: dict): workflow = workflows.get(config.workflow.name) result_queue = QuerentQueue() - collector_tasks = asyncio.create_task(start_collectors(config)) + collector_tasks = asyncio.create_task(start_collectors(config, result_queue)) engine_tasks = asyncio.create_task( workflow(ResourceManager(), config, result_queue) ) await asyncio.gather(collector_tasks, engine_tasks) logger.info("Workflow is finished. All events have been released.") - -async def start_ingestion(config_dict: dict): - if not config_dict: - return - collectors = [] - collector_configs = config_dict.get("collectors", []) - for collector_config in collector_configs: - collectors.append(CollectorConfig(config_source=collector_config).resolve()) - workflow_config = config_dict.get("workflow") - workflow = WorkflowConfig(config_source=workflow_config) - config_dict["collectors"] = collectors - config_dict["workflow"] = workflow - config = Config(config_source=config_dict) - collectors = [] - for collector_config in config.collectors: - uri = Uri(collector_config.uri) - collectors.append(CollectorResolver().resolve(uri=uri, config=collector_config)) - - for collector in collectors: - await collector.connect() - - ingestor_factory_manager = IngestorFactoryManager( - collectors=collectors, - result_queue=None, - tokens_feader=config.workflow.tokens_feader, - ) - - resource_manager = ResourceManager() - - ingest_task = asyncio.create_task(ingestor_factory_manager.ingest_all_async()) - check_message_states_task = asyncio.create_task( - check_message_states(config, resource_manager, [ingest_task]) - ) - await asyncio.gather(ingest_task, check_message_states_task) - -async def start_workflow_engine(config_dict: Config): - if not config_dict: - return - workflow_config = config_dict.get("workflow") - workflow = WorkflowConfig(config_source=workflow_config) - engine_configs = config_dict.get("engines", []) - engines = [] - for engine_config in engine_configs: - engine_config_source = engine_config.get("config", {}) - if engine_config["name"] == "knowledge_graph_using_openai": - engines.append(GPTConfig(config_source=engine_config_source)) - elif engine_config["name"] == "knowledge_graph_using_llama2_v1": - engines.append(LLM_Config(config_source=engine_config_source)) - config_dict["engines"] = engines - config_dict["collectors"] = None - config_dict["workflow"] = workflow - config = Config(config_source=config_dict) - workflows = { - "knowledge_graph_using_openai": start_gpt_workflow, - "knowledge_graph_using_llama2_v1": start_llama_workflow, - } - workflow = workflows.get(config.workflow.name) - result_queue = QuerentQueue() - resource_manager = ResourceManager() - - await workflow(resource_manager, config, result_queue) diff --git a/setup.py b/setup.py index 9b86a07a..7b05eeff 100644 --- a/setup.py +++ b/setup.py @@ -83,7 +83,7 @@ setup( name="querent", - version="3.0.2", + version="3.0.3", author="Querent AI", description="The Asynchronous Data Dynamo and Graph Neural Network Catalyst", long_description=long_description, diff --git a/tests/ingestors/test_html_ingestor.py b/tests/ingestors/test_html_ingestor.py index a274524a..0a8dc32b 100644 --- a/tests/ingestors/test_html_ingestor.py +++ b/tests/ingestors/test_html_ingestor.py @@ -39,7 +39,7 @@ async def poll_and_print(): if ingested != "" or ingested is not None: counter += 1 # 1 extra IngestedToken signifying end of file - assert counter == 17 + assert counter == 18 await poll_and_print() diff --git a/tests/llm_tests/mock_llm_test.py b/tests/llm_tests/mock_llm_test.py index f546896c..93ce6139 100644 --- a/tests/llm_tests/mock_llm_test.py +++ b/tests/llm_tests/mock_llm_test.py @@ -34,7 +34,7 @@ async def process_tokens(self, data: IngestedTokens): # can set the state of the LLM using the set_state method # The state of the LLM is stored in the state attribute of the LLM # The state of the LLM is published to subscribers of the LLM - current_state = EventState(EventType.Graph, 1.0, "anything", "dummy.txt") + current_state = EventState(EventType.Graph, 1.0, "anything", "dummy.txt", doc_source="file://folder_path") await self.set_state(new_state=current_state) async def process_code(self, data: IngestedCode): @@ -101,5 +101,5 @@ def handle_event(self, event_type: EventType, event_state: EventState): async def terminate_querent(llm_mocker: MockLLMEngine): await asyncio.sleep(60) - event_state = EventState(event_type=EventType.Graph, payload = "terminate", timestamp=1.0, file="dummy.txt") + event_state = EventState(event_type=EventType.Graph, payload = "terminate", timestamp=1.0, file="dummy.txt", doc_source="file://folder_path") await llm_mocker.set_state(event_state) diff --git a/tests/workflows/gpt_llm_case_study.py b/tests/workflows/gpt_llm_case_study.py index 67cbf97f..73190b72 100644 --- a/tests/workflows/gpt_llm_case_study.py +++ b/tests/workflows/gpt_llm_case_study.py @@ -20,7 +20,6 @@ # from querent.querent.querent import Querent # import time # from querent.storage.postgres_graphevent_storage import DatabaseConnection -# from querent.storage.milvus_vectorevent_storage import MilvusDBConnection # @pytest.mark.asyncio # async def test_ingest_all_async(): diff --git a/tests/workflows/test_multiple_collectors.py b/tests/workflows/test_multiple_collectors.py index b1bfe8b1..05671b81 100644 --- a/tests/workflows/test_multiple_collectors.py +++ b/tests/workflows/test_multiple_collectors.py @@ -119,11 +119,8 @@ async def test_multiple_collectors_all_async(): and ingested_data.is_token_stream ): messages += 1 - else: - unique_files.add(ingested_data.file) counter += 1 assert counter == 85 - assert len(unique_files) > 1 assert messages > 0