Skip to content

Commit

Permalink
fix(api): ingest runner modified checker - minio (#140)
Browse files Browse the repository at this point in the history
The minio update doesn't work well with late_modified date. This fixes
update process for minio.
  • Loading branch information
mawandm authored Aug 1, 2024
1 parent 69430ef commit d6135df
Show file tree
Hide file tree
Showing 14 changed files with 247 additions and 89 deletions.
2 changes: 1 addition & 1 deletion nesis/api/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"memcache": {
"hosts": [os.environ.get("NESIS_MEMCACHE_HOSTS", "127.0.0.1:11211")],
"session": {
"expiry": os.environ.get("NESIS_API_MEMCACHE_SESSION_EXPIRY") or 1800
"expiry": os.environ.get("NESIS_API_MEMCACHE_SESSION_EXPIRY") or 21600
},
"cache": {
"timeout_default": 300,
Expand Down
6 changes: 4 additions & 2 deletions nesis/api/core/document_loaders/minio.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ def _sync_documents(
) -> None:

try:

connection = datasource.connection
# Data objects allow us to specify bucket names
bucket_names = connection.get("dataobjects")
Expand Down Expand Up @@ -213,11 +212,14 @@ def _sync_document(
)
except ValueError:
_LOG.warning(f"File {file_path} ingestion failed", exc_info=True)
response_json = {}
response_json = None
except UserWarning:
_LOG.debug(f"File {file_path} is already processing")
return

if response_json is None:
return

_ingest_runner.save(
document_id=item.etag,
datasource_id=datasource.uuid,
Expand Down
67 changes: 37 additions & 30 deletions nesis/api/core/document_loaders/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,17 @@ def _is_modified(
"""
documents = self._extraction_store.get(document_id=document_id)
for document in documents:
if document is None or document.last_modified < last_modified:
return False
try:
self.delete(document=document)
except:
_LOG.warning(
f"Failed to delete document {document_id}'s record. Continuing anyway..."
)
return True
if document is not None and last_modified.replace(
microsecond=0
) > document.last_modified.replace(microsecond=0):
try:
self.delete(document=document)
except:
_LOG.warning(
f"Failed to delete document {document_id}'s record. Continuing anyway..."
)
return True
return False

def save(self, **kwargs):
return self._extraction_store.save(
Expand Down Expand Up @@ -181,25 +183,28 @@ def _is_modified(
document: Document = get_document(document_id=document_id)
if document is None or document.base_uri != endpoint:
return False
elif document.base_uri == endpoint:
store_metadata = document.store_metadata
if store_metadata and store_metadata.get("last_modified"):
if (
not strptime(date_string=store_metadata["last_modified"]).replace(
tzinfo=None
)
< last_modified
):
return False
try:
self.delete(document=document)
except:
_LOG.warning(
f"Failed to delete document {document_id}'s record. Continuing anyway..."
)
return True
else:
return None
store_metadata = document.store_metadata
document_last_modified = document.last_modified
if (
document_last_modified is None
and store_metadata is not None
and store_metadata.get("last_modified")
):
document_last_modified = strptime(
date_string=store_metadata["last_modified"]
).replace(tzinfo=None)
if document_last_modified is not None and last_modified.replace(
microsecond=0
) > document_last_modified.replace(microsecond=0):
try:
self.delete(document=document, rag_metadata=document.rag_metadata)
except:
_LOG.warning(
f"Failed to delete document {document_id}'s record. Continuing anyway...",
exc_info=True,
)
return True
return False

def save(self, **kwargs) -> Document:
return save_document(
Expand All @@ -209,6 +214,7 @@ def save(self, **kwargs) -> Document:
rag_metadata=kwargs["rag_metadata"],
store_metadata=kwargs["store_metadata"],
last_modified=kwargs["last_modified"],
datasource_id=kwargs["datasource_id"],
)

def delete(self, document: Document, **kwargs) -> None:
Expand All @@ -222,10 +228,11 @@ def delete(self, document: Document, **kwargs) -> None:
for document_data in rag_metadata.get("data") or []
]
)
_LOG.info(f"Deleting document {document.filename}")
delete_document(document_id=document.id)
except:
_LOG.warning(
f"Failed to delete document {document.filename}",
exc_info=True,
)

_LOG.info(f"Deleting document {document.filename}")
delete_document(document_id=document.id)
16 changes: 10 additions & 6 deletions nesis/api/core/document_loaders/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import memcache

import nesis.api.core.util.http as http
from nesis.api.core.models.entities import Document
from nesis.api.core.models.entities import Document, Datasource
from nesis.api.core.services import util
from nesis.api.core.services.util import (
save_document,
Expand All @@ -24,13 +24,14 @@


def fetch_documents(
connection: Dict[str, str],
datasource: Datasource,
rag_endpoint: str,
http_client: http.HttpClient,
cache_client: memcache.Client,
metadata: Dict[str, Any],
) -> None:
try:
connection = datasource.connection
endpoint = connection.get("endpoint")
access_key = connection.get("user")
secret_key = connection.get("password")
Expand Down Expand Up @@ -61,7 +62,7 @@ def fetch_documents(

_sync_documents(
client=s3_client,
connection=connection,
datasource=datasource,
rag_endpoint=rag_endpoint,
http_client=http_client,
cache_client=cache_client,
Expand All @@ -79,7 +80,7 @@ def fetch_documents(

def _sync_documents(
client,
connection: dict,
datasource: Datasource,
rag_endpoint: str,
http_client: http.HttpClient,
cache_client: memcache.Client,
Expand All @@ -89,6 +90,7 @@ def _sync_documents(
try:

# Data objects allow us to specify bucket names
connection = datasource.connection
bucket_paths = connection.get("dataobjects")
if bucket_paths is None:
_LOG.warning("No bucket names supplied, so I can't do much")
Expand Down Expand Up @@ -138,7 +140,7 @@ def _sync_documents(
try:
_sync_document(
client=client,
connection=connection,
datasource=datasource,
rag_endpoint=rag_endpoint,
http_client=http_client,
metadata=_metadata,
Expand All @@ -158,13 +160,14 @@ def _sync_documents(

def _sync_document(
client,
connection: dict,
datasource: Datasource,
rag_endpoint: str,
http_client: http.HttpClient,
metadata: dict,
bucket_name: str,
item,
):
connection = datasource.connection
endpoint = connection["endpoint"]
_metadata = metadata

Expand Down Expand Up @@ -235,6 +238,7 @@ def _sync_document(
save_document(
document_id=item["ETag"],
filename=item["Key"],
datasource_id=datasource.uuid,
base_uri=endpoint,
rag_metadata=response_json,
store_metadata={
Expand Down
31 changes: 20 additions & 11 deletions nesis/api/core/document_loaders/sharepoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from nesis.api.core.util import http, clean_control, isblank
import logging
from nesis.api.core.models.entities import Document
from nesis.api.core.models.entities import Document, Datasource
from nesis.api.core.services.util import (
save_document,
get_document,
Expand All @@ -29,14 +29,15 @@


def fetch_documents(
connection: Dict[str, str],
datasource: Datasource,
rag_endpoint: str,
http_client: http.HttpClient,
metadata: Dict[str, Any],
cache_client: memcache.Client,
) -> None:
try:

connection = datasource.connection
site_url = connection.get("endpoint")
client_id = connection.get("client_id")
tenant = connection.get("tenant_id")
Expand All @@ -55,7 +56,7 @@ def fetch_documents(

_sync_sharepoint_documents(
sp_context=_sharepoint_context,
connection=connection,
datasource=datasource,
rag_endpoint=rag_endpoint,
http_client=http_client,
metadata=metadata,
Expand All @@ -72,7 +73,7 @@ def fetch_documents(


def _sync_sharepoint_documents(
sp_context, connection, rag_endpoint, http_client, metadata, cache_client
sp_context, datasource, rag_endpoint, http_client, metadata, cache_client
):
try:
_LOG.info(f"Initializing sharepoint syncing to endpoint {rag_endpoint}")
Expand All @@ -83,6 +84,7 @@ def _sync_sharepoint_documents(
)

# Data objects allow us to specify folder names
connection = datasource.connection
sharepoint_folders = connection.get("dataobjects")
if sharepoint_folders is None:
_LOG.warning("Sharepoint folders are specified, so I can't do much")
Expand All @@ -102,7 +104,7 @@ def _sync_sharepoint_documents(

_process_folder_files(
sharepoint_folder,
connection=connection,
datasource=datasource,
rag_endpoint=rag_endpoint,
http_client=http_client,
metadata=metadata,
Expand Down Expand Up @@ -130,7 +132,10 @@ def _sync_sharepoint_documents(
)


def _process_file(file, connection, rag_endpoint, http_client, metadata, cache_client):
def _process_file(
file, datasource: Datasource, rag_endpoint, http_client, metadata, cache_client
):
connection = datasource.connection
site_url = connection.get("endpoint")
parsed_site_url = urlparse(site_url)
site_root_url = "{uri.scheme}://{uri.netloc}".format(uri=parsed_site_url)
Expand All @@ -149,7 +154,7 @@ def _process_file(file, connection, rag_endpoint, http_client, metadata, cache_c
if cache_client.add(key=_lock_key, val=_lock_key, time=30 * 60):
try:
_sync_document(
connection=connection,
datasource=datasource,
rag_endpoint=rag_endpoint,
http_client=http_client,
metadata=_metadata,
Expand All @@ -162,14 +167,14 @@ def _process_file(file, connection, rag_endpoint, http_client, metadata, cache_c


def _process_folder_files(
folder, connection, rag_endpoint, http_client, metadata, cache_client
folder, datasource, rag_endpoint, http_client, metadata, cache_client
):
# process files in folder
_files = folder.get_files(False).execute_query()
for file in _files:
_process_file(
file=file,
connection=connection,
datasource=datasource,
rag_endpoint=rag_endpoint,
http_client=http_client,
metadata=metadata,
Expand All @@ -178,12 +183,13 @@ def _process_folder_files(


def _sync_document(
connection: dict,
datasource: Datasource,
rag_endpoint: str,
http_client: http.HttpClient,
metadata: dict,
file,
):
connection = datasource.connection
site_url = connection["endpoint"]
_metadata = metadata

Expand Down Expand Up @@ -260,6 +266,7 @@ def _sync_document(
filename=file.serverRelativeUrl,
base_uri=site_url,
rag_metadata=response_json,
datasource_id=datasource.uuid,
store_metadata={
"file_name": file.name,
"file_url": file.serverRelativeUrl,
Expand All @@ -274,7 +281,9 @@ def _sync_document(
)
_LOG.info(f"Done syncing object {file.name} in at {file.serverRelativeUrl}")
except Exception as ex:
_LOG.warning(f"Error when getting and ingesting file {file.name} - {ex}")
_LOG.warning(
f"Error when getting and ingesting file {file.name}", exc_info=True
)


def _unsync_sharepoint_documents(sp_context, http_client, rag_endpoint, connection):
Expand Down
1 change: 1 addition & 0 deletions nesis/api/core/services/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def save_document(**kwargs) -> Document:
store_metadata=kwargs["store_metadata"],
base_uri=kwargs["base_uri"],
last_modified=kwargs["last_modified"],
datasource_id=kwargs["datasource_id"],
)

session = kwargs.get("session")
Expand Down
11 changes: 2 additions & 9 deletions nesis/api/core/tasks/document_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,12 @@ def ingest_datasource(**kwargs) -> None:

case DatasourceType.SHAREPOINT:
sharepoint.fetch_documents(
connection=datasource.connection,
datasource=datasource,
rag_endpoint=rag_endpoint,
http_client=http_client,
cache_client=cache_client,
metadata={"datasource": datasource.name},
)
case DatasourceType.GOOGLE_DRIVE:
google_drive.fetch_documents(
connection=datasource.connection,
rag_endpoint=rag_endpoint,
http_client=http_client,
metadata={"datasource": datasource.name},
)
case DatasourceType.WINDOWS_SHARE:
samba.fetch_documents(
connection=datasource.connection,
Expand All @@ -75,7 +68,7 @@ def ingest_datasource(**kwargs) -> None:
)
case DatasourceType.S3:
s3.fetch_documents(
connection=datasource.connection,
datasource=datasource,
rag_endpoint=rag_endpoint,
http_client=http_client,
metadata={"datasource": datasource.name},
Expand Down
Loading

0 comments on commit d6135df

Please sign in to comment.