Skip to content

Commit

Permalink
improved reindexing
Browse files Browse the repository at this point in the history
  • Loading branch information
khoroshevskyi committed Dec 20, 2024
1 parent 2d7403b commit a6d5119
Showing 1 changed file with 24 additions and 6 deletions.
30 changes: 24 additions & 6 deletions bbconf/modules/bedfiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, aliased
from tqdm import tqdm
from qdrant_client.http.models import PointStruct

from bbconf.config_parser.bedbaseconfig import BedBaseConfig
from geniml.search.backends import QdrantBackend
Expand Down Expand Up @@ -1257,14 +1258,16 @@ def _sql_search_count(self, query: str) -> int:
count = session.execute(statement).one()
return count[0]

def reindex_qdrant(self) -> None:
def reindex_qdrant(self, batch: int = 1000) -> None:
"""
Re-upload all files to quadrant.
!Warning: only hg38 genome can be added to qdrant!
If you want to fully reindex/reupload to qdrant, first delete collection and create new one.
Upload all files to qdrant.
:param batch: number of files to upload in one batch
"""
bb_client = BBClient()

Expand All @@ -1276,6 +1279,7 @@ def reindex_qdrant(self) -> None:
results = annotation_result.results

with tqdm(total=len(results), position=0, leave=True) as pbar:
points_list = []
for record in results:
try:
bed_region_set_obj = GRegionSet(bb_client.seek(record.id))
Expand All @@ -1284,14 +1288,28 @@ def reindex_qdrant(self) -> None:

pbar.set_description(f"Processing file: {record.id}")

self.upload_file_qdrant(
bed_id=record.id,
bed_file=bed_region_set_obj,
payload=record.annotation.model_dump() if record.annotation else {},
file_embedding = self._embed_file(bed_region_set_obj)
points_list.append(
PointStruct(
id=record.id,
vector=file_embedding.tolist()[0],
payload=(
record.annotation.model_dump() if record.annotation else {}
),
)
)
pbar.write(f"File: {record.id} uploaded to qdrant successfully.")
pbar.write(f"File: {record.id} successfully indexed.")
pbar.update(1)

_LOGGER.info(f"Uploading points to qdrant using batches...")
for i in range(0, len(points_list), batch):
operation_info = self._config.qdrant_engine.qd_client.upsert(
collection_name=self._config.config.qdrant.file_collection,
points=points_list[i : i + batch],
)

assert operation_info.status == "completed"

return None

def delete_qdrant_point(self, identifier: str) -> None:
Expand Down

0 comments on commit a6d5119

Please sign in to comment.