Skip to content

Commit

Permalink
Cleanup batch processing (#34)
Browse files Browse the repository at this point in the history
<!-- begin-generated-description -->

This pull request removes the batch processing functionality from the
Compass SDK.

- The `BatchPutDocumentsInput` class is removed from `__init__.py`.
- The `BatchProcessFilesParameters` class is removed from `__init__.py`.
- The `BatchPutDocumentsInput` import is removed from `compass.py`.
- The `put_documents_batch` and `put_documents_batch` URL are removed
from `compass.py`.
- The `insert_docs_batch` and `batch_status` functions are removed from
`compass.py`.
- The `BatchProcessFilesParameters`, `CompassDocument`,
`MetadataConfig`, `ParserConfig`, `ProcessFileParameters`, and `logger`
imports are removed from `parser.py`.
- The `batch_upload`, `batch_status`, and `batch_run` functions are
removed from `parser.py`.

<!-- end-generated-description -->
  • Loading branch information
ankush-cohere authored Nov 6, 2024
1 parent 2f5ff59 commit 769ddb8
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 136 deletions.
9 changes: 0 additions & 9 deletions compass_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,10 +397,6 @@ class PutDocumentsInput(BaseModel):
merge_groups_on_conflict: bool = False


class BatchPutDocumentsInput(BaseModel):
uuid: str


class ProcessFileParameters(ValidatedModel):
parser_config: ParserConfig
metadata_config: MetadataConfig
Expand All @@ -413,11 +409,6 @@ class ProcessFilesParameters(ValidatedModel):
metadata_config: MetadataConfig


class BatchProcessFilesParameters(ProcessFilesParameters):
uuid: str
file_name_to_doc_ids: Optional[Dict[str, str]] = None


class GroupAuthorizationActions(str, Enum):
ADD = "add"
REMOVE = "remove"
Expand Down
33 changes: 0 additions & 33 deletions compass_sdk/compass.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from tqdm import tqdm

from compass_sdk import (
BatchPutDocumentsInput,
Chunk,
CompassDocument,
CompassDocumentStatus,
Expand Down Expand Up @@ -93,7 +92,6 @@ def __init__(
"delete_document": self.session.delete,
"get_document": self.session.get,
"put_documents": self.session.put,
"put_documents_batch": self.session.post,
"search_documents": self.session.post,
"add_context": self.session.post,
"refresh": self.session.post,
Expand All @@ -107,7 +105,6 @@ def __init__(
"delete_document": "/api/v1/indexes/{index_name}/documents/{doc_id}",
"get_document": "/api/v1/indexes/{index_name}/documents/{doc_id}",
"put_documents": "/api/v1/indexes/{index_name}/documents",
"put_documents_batch": "/api/v1/batch/indexes/{index_name}",
"search_documents": "/api/v1/indexes/{index_name}/documents/search",
"add_context": "/api/v1/indexes/{index_name}/documents/add_context/{doc_id}",
"refresh": "/api/v1/indexes/{index_name}/refresh",
Expand Down Expand Up @@ -251,36 +248,6 @@ def insert_doc(
merge_groups_on_conflict=merge_groups_on_conflict,
)

def insert_docs_batch(self, *, uuid: str, index_name: str):
"""
Insert a batch of parsed documents into an index in Compass
:param uuid: the uuid of the batch
:param index_name: the name of the index
"""
return self._send_request(
function="put_documents_batch",
index_name=index_name,
data=BatchPutDocumentsInput(uuid=uuid),
max_retries=DEFAULT_MAX_RETRIES,
sleep_retry_seconds=DEFAULT_SLEEP_RETRY_SECONDS,
)

def batch_status(self, *, uuid: str):
"""
Get the status of a batch
:param uuid: the uuid of the batch
"""
auth = (self.username, self.password) if self.username and self.password else None
resp = self.session.get(
url=f"{self.index_url}/api/v1/batch/status/{uuid}",
auth=auth,
)

if resp.ok:
return resp.json()
else:
raise Exception(f"Failed to get batch status: {resp.status_code} {resp.text}")

def push_document(
self,
*,
Expand Down
95 changes: 1 addition & 94 deletions compass_sdk/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,7 @@

import requests

from compass_sdk import (
BatchProcessFilesParameters,
CompassDocument,
MetadataConfig,
ParserConfig,
ProcessFileParameters,
logger,
)
from compass_sdk import CompassDocument, MetadataConfig, ParserConfig, ProcessFileParameters, logger
from compass_sdk.constants import DEFAULT_MAX_ACCEPTED_FILE_SIZE_BYTES
from compass_sdk.utils import imap_queued, open_document, scan_folder

Expand Down Expand Up @@ -227,89 +220,3 @@ def process_file(
logger.error(f"Error processing file: {res.text}")

return docs

def batch_upload(self, *, zip_file_path: str) -> str:
"""
Uploads a zip file to the for offline processing. The zip file should contain the files to process.
The zip file is sent to the server, and the server will process each file in the zip file using the default
parser and metadata configurations passed when creating the client.
:param zip_file_path: the path to the zip file to upload
:return: uuid for the uploaded zip file
"""
if not zip_file_path.endswith(".zip"):
raise Exception(f"Allowed type is only zip")

auth = (self.username, self.password) if self.username and self.password else None
with open(zip_file_path, "rb") as zip_file:
zip_data = zip_file.read()
res = self.session.post(
url=f"{self.parser_url}/v1/batch/upload",
files={"file": ("data.zip", zip_data)},
auth=auth,
)

if res.ok:
return res.json()
else:
logger.error(f"Error uploading file: {res.text}")
raise Exception(f"Error uploading zip file: {res.text}")

def batch_status(self, uuid: str) -> str:
"""
Returns the status of the batch processing job with the specified uuid. The status can be one of the following:
- "PROCESSING": the job is being processed
- "DONE": the job has been processed successfully
- "ERROR": the job has failed to process
:param uuid: the uuid of the batch processing job
:return: the status of the batch processing job
"""
auth = (self.username, self.password) if self.username and self.password else None
res = self.session.get(
url=f"{self.parser_url}/v1/batch/status",
params={"uuid": uuid},
auth=auth,
)

if res.ok:
return res.json()
else:
logger.error(f"Error getting batch status: {res.text}")
raise Exception(f"Error getting batch status: {res.text}")

def batch_run(
self,
*,
uuid: str,
file_name_to_doc_ids: Optional[Dict[str, str]] = None,
parser_config: Optional[ParserConfig] = None,
metadata_config: Optional[MetadataConfig] = None,
) -> List[CompassDocument]:

parser_config = parser_config or self.parser_config
metadata_config = metadata_config or self.metadata_config

params = BatchProcessFilesParameters(
uuid=uuid,
file_name_to_doc_ids=file_name_to_doc_ids,
parser_config=parser_config,
metadata_config=metadata_config,
)
auth = (self.username, self.password) if self.username and self.password else None
res = self.session.post(
url=f"{self.parser_url}/v1/batch/run",
data={"data": json.dumps(params.model_dump())},
auth=auth,
)

if res.ok:
return res.json()
else:
docs = []
logger.error(f"Error processing file: {res.text}")

# # Run metadata detection locally if a metadata detector was provided.
# # This overrides the metadata generated by the server using the metadata_config provided in the method call
# self._add_metadata(docs=docs, metadata_detector=metadata_detector, metadata_config=metadata_config)
return docs

0 comments on commit 769ddb8

Please sign in to comment.