Skip to content

Commit

Permalink
Fix minio update
Browse files Browse the repository at this point in the history
  • Loading branch information
mawandm committed Jul 16, 2024
1 parent 2817f0e commit ca1827f
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 17 deletions.
1 change: 0 additions & 1 deletion nesis/api/core/document_loaders/minio.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ def _sync_documents(
) -> None:

try:

connection = datasource.connection
# Data objects allow us to specify bucket names
bucket_names = connection.get("dataobjects")
Expand Down
26 changes: 16 additions & 10 deletions nesis/api/core/document_loaders/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def _is_modified(
"""
documents = self._extraction_store.get(document_id=document_id)
for document in documents:
if document is None or document.last_modified < last_modified:
if document is None or document.last_modified == last_modified:
return False
try:
self.delete(document=document)
Expand Down Expand Up @@ -183,19 +183,24 @@ def _is_modified(
return False
elif document.base_uri == endpoint:
store_metadata = document.store_metadata
if store_metadata and store_metadata.get("last_modified"):
if (
not strptime(date_string=store_metadata["last_modified"]).replace(
tzinfo=None
)
< last_modified
):
document_last_modified = document.last_modified
if (
document_last_modified is None
and store_metadata
and store_metadata.get("last_modified")
):
document_last_modified = strptime(
date_string=store_metadata["last_modified"]
).replace(tzinfo=None)
if document_last_modified is not None:
if document_last_modified == last_modified:
return False
try:
self.delete(document=document)
self.delete(document=document, rag_metadata=document.rag_metadata)
except:
_LOG.warning(
f"Failed to delete document {document_id}'s record. Continuing anyway..."
f"Failed to delete document {document_id}'s record. Continuing anyway...",
exc_info=True,
)
return True
else:
Expand All @@ -209,6 +214,7 @@ def save(self, **kwargs) -> Document:
rag_metadata=kwargs["rag_metadata"],
store_metadata=kwargs["store_metadata"],
last_modified=kwargs["last_modified"],
datasource_id=kwargs["datasource_id"],
)

def delete(self, document: Document, **kwargs) -> None:
Expand Down
1 change: 1 addition & 0 deletions nesis/api/core/services/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def save_document(**kwargs) -> Document:
store_metadata=kwargs["store_metadata"],
base_uri=kwargs["base_uri"],
last_modified=kwargs["last_modified"],
datasource_id=kwargs["datasource_id"],
)

session = kwargs.get("session")
Expand Down
121 changes: 115 additions & 6 deletions nesis/api/tests/core/document_loaders/test_minio.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
DatasourceType,
DatasourceStatus,
)
from nesis.api.core.util.dateutil import strptime


@pytest.fixture
Expand Down Expand Up @@ -53,8 +54,8 @@ def test_ingest_documents(
minio_instance: mock.MagicMock, cache: mock.MagicMock, session: Session
) -> None:
data = {
"name": "s3 documents",
"engine": "s3",
"name": "minio documents",
"engine": "minio",
"connection": {
"endpoint": "https://s3.endpoint",
"access_key": "",
Expand Down Expand Up @@ -274,10 +275,6 @@ def test_uningest_documents(
datasource=datasource,
)

# # No document records exist
# document_records = session.query(Document).all()
# assert 0 == len(document_records)

minio_ingestor.run(
metadata={"datasource": "documents"},
)
Expand Down Expand Up @@ -373,3 +370,115 @@ def test_unextract_documents(
minio_ingestor._extract_runner._extraction_store.Store
).all()
assert len(documents) == initial_count - 1


@mock.patch("nesis.api.core.document_loaders.minio.Minio")
def test_update_ingest_documents(
client: mock.MagicMock, cache: mock.MagicMock, session: Session
) -> None:
"""
Test updating documents if they have been updated at the minio bucket end.
"""

data = {
"name": "s3 documents",
"engine": "minio",
"connection": {
"endpoint": "http://localhost:4566",
"region": "us-east-1",
"dataobjects": "my-test-bucket",
},
}

datasource = Datasource(
name=data["name"],
connection=data["connection"],
source_type=DatasourceType.MINIO,
status=DatasourceStatus.ONLINE,
)

session.add(datasource)
session.commit()

# The document record

document = Document(
base_uri="http://localhost:4566",
document_id="d41d8cd98f00b204e9800998ecf8427e",
filename="invalid.pdf",
rag_metadata={"data": [{"doc_id": str(uuid.uuid4())}]},
store_metadata={
"bucket_name": "some-bucket",
"object_name": "file/path.pdf",
"last_modified": "2023-07-18 06:40:07",
},
last_modified=strptime("2023-07-19 06:40:07"),
datasource_id=datasource.uuid,
)

session.add(document)
session.commit()

http_client = mock.MagicMock()
http_client.upload.return_value = json.dumps({})
minio_client = mock.MagicMock()
bucket = mock.MagicMock()

client.return_value = minio_client
type(bucket).etag = mock.PropertyMock(
return_value="d41d8cd98f00b204e9800998ecf8427e"
)
last_modified = datetime.datetime.now()
type(bucket).bucket_name = mock.PropertyMock(return_value="SomeName")
type(bucket).object_name = mock.PropertyMock(return_value="SomeName")
type(bucket).last_modified = mock.PropertyMock(return_value=last_modified)
type(bucket).size = mock.PropertyMock(return_value=1000)
type(bucket).version_id = mock.PropertyMock(return_value="2")

minio_client.list_objects.return_value = [bucket]

minio_ingestor = minio.MinioProcessor(
config=tests.config,
http_client=http_client,
cache_client=cache,
datasource=datasource,
)

minio_ingestor.run(
metadata={"datasource": "documents"},
)

# The document would be deleted from the rag engine
_, upload_kwargs = http_client.deletes.call_args_list[0]
urls = upload_kwargs["urls"]

assert (
urls[0]
== f"http://localhost:8080/v1/ingest/documents/{document.rag_metadata['data'][0]['doc_id']}"
)
documents = session.query(Document).all()
assert len(documents) == 1
assert document.id != documents[0].id

# And then re-ingested
_, upload_kwargs = http_client.upload.call_args_list[0]
url = upload_kwargs["url"]
file_path = upload_kwargs["filepath"]
metadata = upload_kwargs["metadata"]
field = upload_kwargs["field"]

assert url == f"http://localhost:8080/v1/ingest/files"
assert field == "file"
ut.TestCase().assertDictEqual(
metadata,
{
"datasource": "documents",
"file_name": "my-test-bucket/SomeName",
"self_link": "http://localhost:4566/my-test-bucket/SomeName",
},
)

# The document has now been updated
documents = session.query(Document).all()
assert len(documents) == 1
assert documents[0].last_modified == last_modified

0 comments on commit ca1827f

Please sign in to comment.