diff --git a/bbconf/modules/bedfiles.py b/bbconf/modules/bedfiles.py index c04a547..047d1ec 100644 --- a/bbconf/modules/bedfiles.py +++ b/bbconf/modules/bedfiles.py @@ -14,6 +14,7 @@ from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session, aliased from tqdm import tqdm +from qdrant_client.http.models import PointStruct from bbconf.config_parser.bedbaseconfig import BedBaseConfig from geniml.search.backends import QdrantBackend @@ -1257,7 +1258,7 @@ def _sql_search_count(self, query: str) -> int: count = session.execute(statement).one() return count[0] - def reindex_qdrant(self) -> None: + def reindex_qdrant(self, batch: int = 1000) -> None: """ Re-upload all files to quadrant. !Warning: only hg38 genome can be added to qdrant! @@ -1265,6 +1266,8 @@ def reindex_qdrant(self) -> None: If you want to fully reindex/reupload to qdrant, first delete collection and create new one. Upload all files to qdrant. + + :param batch: number of files to upload in one batch """ bb_client = BBClient() @@ -1276,6 +1279,7 @@ def reindex_qdrant(self) -> None: results = annotation_result.results with tqdm(total=len(results), position=0, leave=True) as pbar: + points_list = [] for record in results: try: bed_region_set_obj = GRegionSet(bb_client.seek(record.id)) @@ -1284,14 +1288,28 @@ def reindex_qdrant(self) -> None: pbar.set_description(f"Processing file: {record.id}") - self.upload_file_qdrant( - bed_id=record.id, - bed_file=bed_region_set_obj, - payload=record.annotation.model_dump() if record.annotation else {}, + file_embedding = self._embed_file(bed_region_set_obj) + points_list.append( + PointStruct( + id=record.id, + vector=file_embedding.tolist()[0], + payload=( + record.annotation.model_dump() if record.annotation else {} + ), + ) ) - pbar.write(f"File: {record.id} uploaded to qdrant successfully.") + pbar.write(f"File: {record.id} successfully indexed.") pbar.update(1) + _LOGGER.info(f"Uploading points to qdrant using batches...") + for i in range(0, len(points_list), batch): + operation_info = self._config.qdrant_engine.qd_client.upsert( + collection_name=self._config.config.qdrant.file_collection, + points=points_list[i : i + batch], + ) + + assert operation_info.status == "completed" + return None def delete_qdrant_point(self, identifier: str) -> None: