Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added doc source #292

Merged
merged 28 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c782d8e
Added doc_source in collectors and ingestors
Ansh5461 Apr 13, 2024
21e1d4d
Added doc_source in engines
Ansh5461 Apr 13, 2024
ad1381b
Updated test case assertions
Ansh5461 Apr 13, 2024
69059c0
Merge branch 'main' into Added-doc_source
Ansh5461 Apr 13, 2024
cae44ba
Updations in workflow and new version release
Ansh5461 Apr 15, 2024
176461d
Merge branch 'Added-doc_source' of github.com-ansh5461:Querent-ai/que…
Ansh5461 Apr 15, 2024
23844e9
Updated doc_source
Ansh5461 Apr 15, 2024
9b0fcdf
Changed version
Ansh5461 Apr 15, 2024
66193cc
Stringified event_type
Ansh5461 Apr 15, 2024
34fb8fd
trying queue implementation
Ansh5461 Apr 17, 2024
4d5011f
added prints
ngupta10 Apr 17, 2024
97f6acb
workflow use queue now
Ansh5461 Apr 17, 2024
88645d4
workflow use queue now
Ansh5461 Apr 17, 2024
9da2a19
workflow use queue now
Ansh5461 Apr 17, 2024
bdcbc75
Fixes
Ansh5461 Apr 17, 2024
2fabbe5
Added enum in eventtype
Ansh5461 Apr 17, 2024
7b7e77c
Removed enum
Ansh5461 Apr 18, 2024
63dbb47
added prints
ngupta10 Apr 18, 2024
e494511
changes in file buffer
ngupta10 Apr 18, 2024
cbec784
corrected doc ingestor
ngupta10 Apr 18, 2024
ee1df40
removed prints and minor fix
ngupta10 Apr 18, 2024
a3d233e
Final fixes
Ansh5461 Apr 18, 2024
cfd8bbe
Fixed assertion
Ansh5461 Apr 18, 2024
48d9abe
Fixed storage files
Ansh5461 Apr 18, 2024
f7b8f74
Final fixes
Ansh5461 Apr 18, 2024
0930126
cleanup
saraswatpuneet Apr 18, 2024
813b05e
cleanup
saraswatpuneet Apr 18, 2024
a013b3d
cleanups
saraswatpuneet Apr 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions querent/collectors/aws/aws_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
for obj in response.get("Contents", []):
file = self.download_object_as_byte_stream(obj["Key"])
async for chunk in self.read_chunks(file):
yield CollectedBytes(file=obj["Key"], data=chunk, error=None)
yield CollectedBytes(file=obj["Key"], data=None, error=None, eof=True)
yield CollectedBytes(file=obj["Key"], data=chunk, error=None, doc_source=f"aws://{self.bucket_name}")
yield CollectedBytes(file=obj["Key"], data=None, error=None, eof=True, doc_source=f"aws://{self.bucket_name}")
saraswatpuneet marked this conversation as resolved.
Show resolved Hide resolved

except PermissionError as exc:
self.logger.error(f"Getting Permission Error on file {file}, as {exc}")
Expand Down
4 changes: 2 additions & 2 deletions querent/collectors/azure/azure_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
self.container_client, blob.name
)
async for chunk in self.read_chunks(file):
yield CollectedBytes(file=blob.name, data=chunk, error=None)
yield CollectedBytes(file=blob.name, data=None, error=None, eof=True)
yield CollectedBytes(file=blob.name, data=chunk, error=None, doc_source=f"azure://{self.container_name}")
yield CollectedBytes(file=blob.name, data=None, error=None, eof=True, doc_source=f"azure://{self.container_name}")
except Exception as e:
# Handle exceptions gracefully, e.g., log the error
self.logger.error(f"Error polling Azure Blob Storage: {e}")
Expand Down
4 changes: 2 additions & 2 deletions querent/collectors/drive/google_drive_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
self.logger.info("No files found in Google Drive")
for file in files:
async for chunk in self.read_chunks(file["id"]):
yield CollectedBytes(data=chunk, file=file["name"])
yield CollectedBytes(data=None, file=file["name"], eof=True)
yield CollectedBytes(data=chunk, file=file["name"], doc_source=f"drive://{self.folder_to_crawl}")
yield CollectedBytes(data=None, file=file["name"], eof=True, doc_source=f"drive://{self.folder_to_crawl}")
except Exception as e:
raise common_errors.PollingError(
f"Failed to poll Google Drive: {str(e)}"
Expand Down
4 changes: 2 additions & 2 deletions querent/collectors/dropbox/dropbox_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:

file_content_bytes = response.content
async for chunk in self.stream_blob(file_content_bytes):
yield CollectedBytes(file=entry.name, data=chunk)
yield CollectedBytes(file=entry.name, data=None, eof=True)
yield CollectedBytes(file=entry.name, data=chunk, doc_source=f"dropbox://{self.folder_path}")
yield CollectedBytes(file=entry.name, data=None, eof=True, doc_source=f"dropbox://{self.folder_path}")
except dropbox.exceptions.ApiError as e:
self.logger.error(f"Error polling Dropbox: {e}")
raise common_errors.PollingError(
Expand Down
1 change: 1 addition & 0 deletions querent/collectors/email/email_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
yield CollectedBytes(
data=message,
file=f"{self.config.imap_username}:{self.config.imap_folder}/{i}.email",
doc_source="email://"
)
yield CollectedBytes(
data=None,
Expand Down
4 changes: 2 additions & 2 deletions querent/collectors/fs/fs_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
async with aiofiles.open(file_path, "rb") as file:
file_path_str = str(file_path)
async for chunk in self.read_chunks(file):
yield CollectedBytes(file=file_path_str, data=chunk, error=None)
yield CollectedBytes(file=file_path_str, data=chunk, error=None, doc_source=f"file://{self.root_dir}")
yield CollectedBytes(
file=file_path_str, data=None, error=None, eof=True
file=file_path_str, data=None, error=None, eof=True, doc_source=f"file://{self.root_dir}"
)
except PermissionError as exc:
raise common_errors.PermissionError(
Expand Down
4 changes: 2 additions & 2 deletions querent/collectors/gcs/gcs_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
blobs = list(bucket.list_blobs()) # Convert to a list
for blob in blobs:
async for chunk in self.stream_blob(blob):
yield CollectedBytes(file=blob.name, data=chunk, error=None)
yield CollectedBytes(file=blob.name, data=None, error=None, eof=True)
yield CollectedBytes(file=blob.name, data=chunk, error=None, doc_source=f"gcs://{self.bucket_name}")
yield CollectedBytes(file=blob.name, data=None, error=None, eof=True, doc_source=f"gcs://{self.bucket_name}")
except Exception as e:
# Handle exceptions gracefully, e.g., log the error
self.logger.error(f"Error connecting to GCS: {e}")
Expand Down
4 changes: 2 additions & 2 deletions querent/collectors/github/github_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ async def fetch_files_in_folder(self, api_url):
file_response.raise_for_status()
file_contents = await file_response.read()
# Assume process_data() is correctly implemented for your file type
yield CollectedBytes(file=item["name"], data=file_contents)
yield CollectedBytes(file=item["name"], data=file_contents, doc_source=f"github://{self.repository}")

yield CollectedBytes(file = item["name"], data = None, eof = True)
yield CollectedBytes(file = item["name"], data = None, eof = True, doc_source=f"github://{self.repository}")
elif item["type"] == "dir":
# Recursively fetch files in subfolders
async for sub_item in self.fetch_files_in_folder(item["url"]):
Expand Down
4 changes: 2 additions & 2 deletions querent/collectors/jira/jira_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
for issue in issues:
json_issue = json.dumps(issue.raw).encode("utf-8")
yield CollectedBytes(
data=json_issue, file=f"jira_issue_{issue.key}.json.jira"
data=json_issue, file=f"jira_issue_{issue.key}.json.jira", doc_source="jira://"
)
yield CollectedBytes(
data=None, file=f"jira_issue_{issue.key}.json.jira", eof=True
data=None, file=f"jira_issue_{issue.key}.json.jira", eof=True, doc_source="jira://"
)

except common_errors.ConnectionError as e:
Expand Down
4 changes: 2 additions & 2 deletions querent/collectors/news/news_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
publish_date = article.get('publishedAt').split('T')[0]

title = article['title']
yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=str(article_data).encode("utf-8"))
yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=None, error=None, eof=True)
yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=str(article_data).encode("utf-8"), doc_source="news://")
yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=None, error=None, eof=True, doc_source="news://")

total_results = response.get("totalResults", 0)
total_pages = (
Expand Down
2 changes: 2 additions & 0 deletions querent/collectors/slack/slack_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
yield CollectedBytes(
file=f"slack://{self.channel}.slack",
data=bytes(message["text"] + "\n\n", "utf-8"),
doc_source = f"slack://{self.channel}"
)

if not response["has_more"]:
Expand All @@ -71,6 +72,7 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
data=None,
error=None,
eof=True,
doc_source = f"slack://{self.channel}"
)
break
else:
Expand Down
2 changes: 1 addition & 1 deletion querent/collectors/webscaper/web_scraper_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def poll(self):
while urls_to_scrape:
url = urls_to_scrape.pop()
content = await self.scrape_website(url)
yield CollectedBytes(file=None, data=content.data, error=None)
yield CollectedBytes(file=None, data=content.data, error=None, doc_source=self.website_url)
# Find and add links from this page to the list of URLs to scrape
new_urls = self.extract_links(url)
urls_to_scrape.extend(new_urls)
Expand Down
3 changes: 2 additions & 1 deletion querent/common/types/collected_bytes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@


class CollectedBytes:
def __init__(self, file: str, data: bytes, error: str = None, eof: bool = False):
def __init__(self, file: str, data: bytes, error: str = None, eof: bool = False, doc_source = str):
self.data = data
self.error = error
self.file = file
self.eof = eof
self.doc_source = doc_source
if self.file:
file = str(file)
self.extension = file.split(".")[-1]
Expand Down
3 changes: 2 additions & 1 deletion querent/common/types/ingested_code.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
class IngestedCode:
"""Class for ingested code type of data"""

def __init__(self, file: str, data: [str], error: str = None) -> None:
def __init__(self, file: str, data: [str], doc_source: str, error: str = None) -> None:
self.data = data
self.error = error
self.file = file
self.doc_source = doc_source
file = str(file)
self.extension = file.rsplit(".", maxsplit=1)[-1]

Expand Down
2 changes: 2 additions & 0 deletions querent/common/types/ingested_images.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def __init__(
coordinates: list = [],
ocr_text: list = [],
error: str = None,
doc_source = str,
) -> None:
self.file = file
self.text = text
Expand All @@ -21,6 +22,7 @@ def __init__(
self.page_num = page_num
self.coordinates = coordinates
self.ocr_text = ocr_text
self.doc_source = doc_source
file = str(file)
self.extension = file.split(".")[-1]
self.file_id = file.split("/")[-1].split(".")[0]
Expand Down
5 changes: 3 additions & 2 deletions querent/common/types/ingested_tokens.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from typing import Union
from typing import Union, List


class IngestedTokens:
def __init__(
self, file: str, data: [str], error: str = None, is_token_stream=False
self, file: str, data: List[str], error: str = None, is_token_stream=False, doc_source=str
) -> None:
self.data = data
self.error = error
self.is_token_stream = is_token_stream
self.doc_source = doc_source
if file:
self.file = file
file = str(file)
Expand Down
3 changes: 2 additions & 1 deletion querent/common/types/querent_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ class EventType:


class EventState:
def __init__(self, event_type: EventType, timestamp: float, payload: Any, file: str):
def __init__(self, event_type: EventType, timestamp: float, payload: Any, file: str, doc_source: str):
self.event_type = event_type
self.timestamp = timestamp
self.payload = payload
self.file = file
self.file = file
self.doc_source = doc_source
3 changes: 2 additions & 1 deletion querent/core/base_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ async def _listen_for_state_changes(self):
"event_type": new_state.event_type,
"timestamp": new_state.timestamp,
"payload": new_state.payload,
"file": new_state.file
"file": new_state.file,
"doc_source": new_state.doc_source,
}
await self._notify_subscribers(new_state["event_type"], new_state)
else:
Expand Down
5 changes: 3 additions & 2 deletions querent/core/transformers/bert_ner_opensourcellm.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ async def process_tokens(self, data: IngestedTokens):
doc_entity_pairs = []
number_sentences = 0
try:
doc_source = data.doc_source
if not BERTLLM.validate_ingested_tokens(data):
self.set_termination_event()
return
Expand Down Expand Up @@ -212,11 +213,11 @@ async def process_tokens(self, data: IngestedTokens):
if not self.termination_event.is_set():
graph_json = json.dumps(TripleToJsonConverter.convert_graphjson(triple))
if graph_json:
current_state = EventState(EventType.Graph,1.0, graph_json, file)
current_state = EventState(EventType.Graph,1.0, graph_json, file, doc_source=doc_source)
await self.set_state(new_state=current_state)
vector_json = json.dumps(TripleToJsonConverter.convert_vectorjson(triple))
if vector_json:
current_state = EventState(EventType.Vector,1.0, vector_json, file)
current_state = EventState(EventType.Vector,1.0, vector_json, file, doc_source=doc_source)
await self.set_state(new_state=current_state)
else:
return
Expand Down
5 changes: 3 additions & 2 deletions querent/core/transformers/fixed_entities_set_opensourcellm.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ async def process_tokens(self, data: IngestedTokens):
doc_entity_pairs = []
number_sentences = 0
try:
doc_source = data.doc_source
if not Fixed_Entities_LLM.validate_ingested_tokens(data):
self.set_termination_event()
return
Expand Down Expand Up @@ -176,11 +177,11 @@ async def process_tokens(self, data: IngestedTokens):
if not self.termination_event.is_set():
graph_json = json.dumps(TripleToJsonConverter.convert_graphjson(triple))
if graph_json:
current_state = EventState(EventType.Graph,1.0, graph_json, file)
current_state = EventState(EventType.Graph,1.0, graph_json, file, doc_source=doc_source)
await self.set_state(new_state=current_state)
vector_json = json.dumps(TripleToJsonConverter.convert_vectorjson(triple))
if vector_json:
current_state = EventState(EventType.Vector,1.0, vector_json, file)
current_state = EventState(EventType.Vector,1.0, vector_json, file, doc_source=doc_source)
await self.set_state(new_state=current_state)
else:
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ async def process_tokens(self, data: IngestedTokens):
if not GPTLLM.validate_ingested_tokens(data):
self.set_termination_event()
return

doc_source = data.doc_source
relationships = []
result = await self.llm_instance.process_tokens(data)
if not result: return
Expand All @@ -280,11 +282,11 @@ async def process_tokens(self, data: IngestedTokens):
if not self.termination_event.is_set():
graph_json = json.dumps(TripleToJsonConverter.convert_graphjson(triple))
if graph_json:
current_state = EventState(EventType.Graph,1.0, graph_json, file)
current_state = EventState(EventType.Graph,1.0, graph_json, file, doc_source=doc_source)
await self.set_state(new_state=current_state)
vector_json = json.dumps(TripleToJsonConverter.convert_vectorjson(triple))
if vector_json:
current_state = EventState(EventType.Vector,1.0, vector_json, file)
current_state = EventState(EventType.Vector,1.0, vector_json, file, doc_source=doc_source)
await self.set_state(new_state=current_state)
else:
return
Expand Down
5 changes: 3 additions & 2 deletions querent/core/transformers/gpt_llm_gpt_ner.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ async def process_tokens(self, data: IngestedTokens):
self.set_termination_event()
return

doc_source = data.doc_source
if data.data:
clean_text = ' '.join(data.data)
#clean_text = unidecode(single_string)
Expand Down Expand Up @@ -246,14 +247,14 @@ async def process_tokens(self, data: IngestedTokens):
if not self.termination_event.is_set():
graph_json = json.dumps(triple)
if graph_json:
current_state = EventState(EventType.Graph,1.0, graph_json, file)
current_state = EventState(EventType.Graph,1.0, graph_json, file, doc_source=doc_source)
await self.set_state(new_state=current_state)
context_embeddings = self.create_emb.get_embeddings([triple['sentence']])[0]
triple['context_embeddings'] = context_embeddings
triple['context'] = triple['sentence']
vector_json = json.dumps(TripleToJsonConverter.convert_vectorjson((triple['subject'],json.dumps(triple), triple['object'])))
if vector_json:
current_state = EventState(EventType.Vector,1.0, vector_json, file)
current_state = EventState(EventType.Vector,1.0, vector_json, file, doc_source=doc_source)
await self.set_state(new_state=current_state)
else:
return
Expand Down
9 changes: 5 additions & 4 deletions querent/ingestors/audio/audio_ingestors.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,25 +81,26 @@ async def ingest(
async for text in self.extract_and_process_audio(
CollectedBytes(file=current_file, data=collected_bytes)
):
yield IngestedTokens(file=current_file, data=[text], error=None)
yield IngestedTokens(file=current_file, data=[text], error=None, doc_source= chunk_bytes.doc_source)
yield IngestedTokens(
file=current_file,
data=None,
error=None,
doc_source= chunk_bytes.doc_source
)
collected_bytes = b""
current_file = chunk_bytes.file
collected_bytes += chunk_bytes.data
except Exception as e:
yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}")
yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}", doc_source=chunk_bytes.doc_source)
finally:
# process the last file
async for text in self.extract_and_process_audio(
CollectedBytes(file=current_file, data=collected_bytes)
):
yield IngestedTokens(file=current_file, data=[text], error=None)
yield IngestedTokens(file=current_file, data=[text], error=None, doc_source=chunk_bytes.doc_source)

yield IngestedTokens(file=current_file, data=None, error=None)
yield IngestedTokens(file=current_file, data=None, error=None, doc_source=chunk_bytes.doc_source)

async def extract_and_process_audio(
self, collected_bytes: CollectedBytes
Expand Down
Loading
Loading