Skip to content

Commit

Permalink
fix modified ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
emrgnt-cmplxty committed Jan 11, 2025
1 parent 84f5273 commit 3e583ce
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 24 deletions.
23 changes: 10 additions & 13 deletions py/core/main/api/v3/documents_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ async def create_document(
),
"user": auth_user.model_dump_json(),
"size_in_bytes": content_length,
"version": "v0",
}

file_name = file_data["filename"]
Expand All @@ -534,21 +535,17 @@ async def create_document(
file_data["content_type"],
)

self.services.ingestion.ingest_file_ingress(
file_data=workflow_input["file_data"],
user=auth_user,
document_id=workflow_input["document_id"],
size_in_bytes=workflow_input["size_in_bytes"],
metadata=workflow_input["metadata"],
version=workflow_input["version"],
)

if run_with_orchestration:
# TODO - Modify create_chunks so that we can add chunks to existing document
document_info = (
self.services.ingestion._create_document_info_from_file(
document_id,
auth_user,
file_name,
workflow_input["metadata"],
"v0",
workflow_input["size_in_bytes"],
)
)
await self.providers.database.documents_handler.upsert_documents_overview(
document_info
)

raw_message: dict[str, str | None] = await self.providers.orchestration.run_workflow( # type: ignore
"ingest-files",
Expand Down
20 changes: 15 additions & 5 deletions py/core/main/orchestration/hatchet/ingestion_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,24 @@ async def parse(self, context: Context) -> dict:
input_data
)

ingestion_result = (
await self.ingestion_service.ingest_file_ingress(
**parsed_data
# ingestion_result = (
# await self.ingestion_service.ingest_file_ingress(
# **parsed_data
# )
# )

# document_info = ingestion_result["info"]
document_info = (
self.ingestion_service.create_document_info_from_file(
parsed_data["document_id"],
parsed_data["user"],
parsed_data["file_data"]["filename"],
parsed_data["metadata"],
parsed_data["version"],
parsed_data["size_in_bytes"],
)
)

document_info = ingestion_result["info"]

await self.ingestion_service.update_document_status(
document_info,
status=IngestionStatus.PARSING,
Expand Down
13 changes: 11 additions & 2 deletions py/core/main/orchestration/simple/ingestion_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,17 @@ async def ingest_files(input_data):
parsed_data = IngestionServiceAdapter.parse_ingest_file_input(
input_data
)
ingestion_result = await service.ingest_file_ingress(**parsed_data)
document_info = ingestion_result["info"]
# ingestion_result = await service.ingest_file_ingress(**parsed_data)
# document_info = ingestion_result["info"]

document_info = service.create_document_info_from_file(
parsed_data["document_id"],
parsed_data["user"],
parsed_data["file_data"]["filename"],
parsed_data["metadata"],
parsed_data["version"],
parsed_data["size_in_bytes"],
)

await service.update_document_status(
document_info, status=IngestionStatus.PARSING
Expand Down
8 changes: 4 additions & 4 deletions py/core/main/services/ingestion_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def ingest_file_ingress(
size_in_bytes,
metadata: Optional[dict] = None,
version: Optional[str] = None,
collection_ids: Optional[list[UUID]] = None,
# collection_ids: Optional[list[UUID]] = None,
*args: Any,
**kwargs: Any,
) -> dict:
Expand All @@ -91,7 +91,7 @@ async def ingest_file_ingress(
metadata = metadata or {}

version = version or STARTING_VERSION
document_info = self._create_document_info_from_file(
document_info = self.create_document_info_from_file(
document_id,
user,
file_data["filename"],
Expand Down Expand Up @@ -121,7 +121,7 @@ async def ingest_file_ingress(
status_code=409,
message=f"Document {document_id} is currently ingesting with status {existing_doc.ingestion_status}.",
)

document_info.ingestion_status = IngestionStatus.PARSING
await self.providers.database.documents_handler.upsert_documents_overview(
document_info
)
Expand All @@ -137,7 +137,7 @@ async def ingest_file_ingress(
status_code=500, detail=f"Error during ingestion: {str(e)}"
)

def _create_document_info_from_file(
def create_document_info_from_file(
self,
document_id: UUID,
user: User,
Expand Down

0 comments on commit 3e583ce

Please sign in to comment.