Skip to content

Commit

Permalink
Fixes in workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
Ansh5461 committed Apr 17, 2024
1 parent 9da2a19 commit 0cca9a3
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 156 deletions.
4 changes: 2 additions & 2 deletions querent/common/types/querent_event.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import Any
from enum import Enum


class EventType:
class EventType(Enum):
Graph = "Graph"
Vector = "Vector"
Terminate="Terminate"
Expand All @@ -14,5 +15,4 @@ def __init__(self, event_type: EventType, timestamp: float, payload: Any, file:
self.timestamp = timestamp
self.payload = payload
self.file = file
self.file = file
self.doc_source = doc_source
2 changes: 1 addition & 1 deletion querent/core/base_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ async def _inner_worker():
none_counter = 0
while not self.termination_event.is_set():
retries = 0
await asyncio.sleep(30)
await asyncio.sleep(5)
data = await self.input_queue.get()
try:
if isinstance(data, IngestedMessages):
Expand Down
14 changes: 0 additions & 14 deletions querent/ingestors/ingestor_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,20 +159,6 @@ async def chunk_generator() -> AsyncGenerator[CollectedBytes, None]:
async for chunk_tokens in ingestor.ingest(chunk_generator()):
if self.result_queue is not None:
self.result_queue.put_nowait(chunk_tokens)
# elif tokens_feader is not None:
# tokens_feader.send_tokens_in_rust(
# {
# "data": (
# chunk_tokens.data
# if type(chunk_tokens) == IngestedTokens
# else chunk_tokens.ocr_text
# ),
# "file": chunk_tokens.file,
# "is_token_stream": True,
# "doc_source": chunk_tokens.doc_source,
# }
# )
# await asyncio.sleep(0.1)
else:
self.logger.warning(
f"Unsupported file extension {file_extension} for file {collected_bytes_list[0].file}"
Expand Down
78 changes: 0 additions & 78 deletions querent/storage/graph-traversal.py

This file was deleted.

61 changes: 0 additions & 61 deletions querent/workflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,64 +90,3 @@ async def start_workflow(config_dict: dict):
await asyncio.gather(collector_tasks, engine_tasks)
logger.info("Workflow is finished. All events have been released.")


# async def start_ingestion(config_dict: dict):
# if not config_dict:
# return
# collectors = []
# collector_configs = config_dict.get("collectors", [])
# for collector_config in collector_configs:
# collectors.append(CollectorConfig(config_source=collector_config).resolve())
# workflow_config = config_dict.get("workflow")
# workflow = WorkflowConfig(config_source=workflow_config)
# config_dict["collectors"] = collectors
# config_dict["workflow"] = workflow
# config = Config(config_source=config_dict)
# collectors = []
# for collector_config in config.collectors:
# uri = Uri(collector_config.uri)
# collectors.append(CollectorResolver().resolve(uri=uri, config=collector_config))

# for collector in collectors:
# await collector.connect()

# ingestor_factory_manager = IngestorFactoryManager(
# collectors=collectors,
# result_queue=None,
# tokens_feader=config.workflow.tokens_feader,
# )

# resource_manager = ResourceManager()

# ingest_task = asyncio.create_task(ingestor_factory_manager.ingest_all_async())
# check_message_states_task = asyncio.create_task(
# check_message_states(config, resource_manager, [ingest_task])
# )
# await asyncio.gather(ingest_task, check_message_states_task)

# async def start_workflow_engine(config_dict: Config):
# if not config_dict:
# return
# workflow_config = config_dict.get("workflow")
# workflow = WorkflowConfig(config_source=workflow_config)
# engine_configs = config_dict.get("engines", [])
# engines = []
# for engine_config in engine_configs:
# engine_config_source = engine_config.get("config", {})
# if engine_config["name"] == "knowledge_graph_using_openai":
# engines.append(GPTConfig(config_source=engine_config_source))
# elif engine_config["name"] == "knowledge_graph_using_llama2_v1":
# engines.append(LLM_Config(config_source=engine_config_source))
# config_dict["engines"] = engines
# config_dict["collectors"] = None
# config_dict["workflow"] = workflow
# config = Config(config_source=config_dict)
# workflows = {
# "knowledge_graph_using_openai": start_gpt_workflow,
# "knowledge_graph_using_llama2_v1": start_llama_workflow,
# }
# workflow = workflows.get(config.workflow.name)
# result_queue = QuerentQueue()
# resource_manager = ResourceManager()

# await workflow(resource_manager, config, result_queue)

0 comments on commit 0cca9a3

Please sign in to comment.