Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend)(api): add frontend input for extract destination #138

Merged
merged 6 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
version: '3'

networks:
ametnes:
nesis:
driver: overlay
attachable: true

Expand All @@ -14,14 +14,14 @@ services:
ports:
- "11211:11211"
networks:
- ametnes
- nesis
samba:
image: andyzhangx/samba:win-fix
command: ["-u", "username;password", "-s", "share;/smbshare/;yes;no;no;all;none", "-p"]
ports:
- '2445:445'
networks:
- ametnes
- nesis
volumes:
- 'samba_data2:/smbshare'
environment:
Expand All @@ -33,7 +33,7 @@ services:
- '59000:9000'
- '59001:9001'
networks:
- ametnes
- nesis
volumes:
- 'minio_data:/data'
environment:
Expand All @@ -52,7 +52,7 @@ services:
- postgres16_data:/var/lib/postgresql/data
restart: on-failure
networks:
- ametnes
- nesis
qdrant:
image: qdrant/qdrant:latest
restart: always
Expand All @@ -67,7 +67,7 @@ services:
volumes:
- ./qdrant_data:/qdrant_data
networks:
- ametnes
- nesis
chroma:
image: ghcr.io/chroma-core/chroma:latest
environment:
Expand All @@ -79,7 +79,7 @@ services:
ports:
- 18000:8000
networks:
- ametnes
- nesis

localstack:
image: localstack/localstack:latest
Expand All @@ -94,11 +94,24 @@ services:
volumes:
- '/var/run/docker.sock:/var/run/docker.sock'
- local_stack:/localstack/data

mssql:
container_name: sql-server
image: mcr.microsoft.com/mssql/server:2022-latest
restart: always
environment:
ACCEPT_EULA: "Y"
MSSQL_SA_PASSWORD: "Pa55woR.d12345"
ports:
- "11433:1433"
# volumes:
# - mssql_data:/var/opt/mssql
networks:
- nesis
volumes:
minio_data:
samba_data2:
postgres16_data:
qdrant_data:
chroma-data:
local_stack:
mssql_data:
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""add extract management fields to document
"""add datasource,rag processing info to document

Revision ID: ea840c8d9e58
Revision ID: 090822101cb5
Revises: 7cfa662dff86
Create Date: 2024-07-01 12:29:36.310411
Create Date: 2024-07-03 04:28:31.842226

"""

Expand All @@ -13,7 +13,7 @@
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision: str = "ea840c8d9e58"
revision: str = "090822101cb5"
down_revision: Union[str, None] = "7cfa662dff86"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
Expand All @@ -28,11 +28,9 @@ def upgrade() -> None:
document_status.create(op.get_bind())

op.add_column(
"document",
sa.Column(
"extract_metadata", postgresql.JSONB(astext_type=sa.Text()), nullable=True
),
"datasource", sa.Column("schedule", sa.Unicode(length=255), nullable=True)
)

op.add_column(
"document", sa.Column("datasource_id", sa.Unicode(length=255), nullable=True)
)
Expand Down Expand Up @@ -71,11 +69,20 @@ def upgrade() -> None:
existing_type=postgresql.JSONB(astext_type=sa.Text()),
nullable=True,
)

op.drop_constraint("uq_document_uuid_base_url_filename", "document", type_="unique")
op.create_unique_constraint(
"uq_document_uuid_datasource_id", "document", ["uuid", "datasource_id"]
)

op.create_index(op.f("idx_document_base_uri"), "document", ["base_uri"])
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("datasource", "schedule")

op.alter_column(
"document",
"rag_metadata",
Expand All @@ -101,7 +108,16 @@ def downgrade() -> None:
op.drop_column("document", "last_modified")
op.drop_column("document", "status")
op.drop_column("document", "datasource_id")
op.drop_column("document", "extract_metadata")

document_status.drop(op.get_bind())

# op.drop_constraint("uq_document_uuid_datasource_id", "document", type_="unique")
op.create_unique_constraint(
"uq_document_uuid_base_url_filename",
"document",
["uuid", "base_uri", "filename"],
)

op.drop_index(op.f("idx_document_base_uri"), table_name="document")

# ### end Alembic commands ###
56 changes: 33 additions & 23 deletions nesis/api/core/document_loaders/minio.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import os
import queue
import tempfile
from typing import Dict, Any
from typing import Dict, Any, Optional

import memcache
import minio
Expand Down Expand Up @@ -46,10 +46,11 @@ def __init__(
self._cache_client = cache_client
self._datasource = datasource

_extract_runner = None
# This is left public for testing
self._extract_runner: ExtractRunner = Optional[None]
_ingest_runner = IngestRunner(config=config, http_client=http_client)
if self._datasource.connection.get("destination") is not None:
_extract_runner = ExtractRunner(
self._extract_runner = ExtractRunner(
config=config,
http_client=http_client,
destination=self._datasource.connection.get("destination"),
Expand All @@ -58,15 +59,17 @@ def __init__(

self._ingest_runners = [IngestRunner(config=config, http_client=http_client)]

mode = self._datasource.connection.get("mode") or "ingest"
self._mode = self._datasource.connection.get("mode") or "ingest"

match mode:
match self._mode:
case "ingest":
self._ingest_runners: list[RagRunner] = [_ingest_runner]
case "extract":
self._ingest_runners: list[RagRunner] = [_extract_runner]
self._ingest_runners: list[RagRunner] = [self._extract_runner]
case _:
raise ValueError(f"Invalid mode {mode}. Expected 'ingest' or 'extract'")
raise ValueError(
f"Invalid mode {self._mode}. Expected 'ingest' or 'extract'"
)

def run(self, metadata: Dict[str, Any]):
connection: Dict[str, str] = self._datasource.connection
Expand Down Expand Up @@ -179,7 +182,7 @@ def _sync_document(
tmp_file = tempfile.NamedTemporaryFile(delete=False)
try:
_LOG.info(
f"Starting syncing object {item.object_name} in bucket {bucket_name}"
f"Starting {self._mode}ing object {item.object_name} in bucket {bucket_name}"
)
file_path = f"{tmp_file.name}-{item.object_name}"

Expand Down Expand Up @@ -234,7 +237,9 @@ def _sync_document(
last_modified=item.last_modified,
)

_LOG.info(f"Done syncing object {item.object_name} in bucket {bucket_name}")
_LOG.info(
f"Done {self._mode}ing object {item.object_name} in bucket {bucket_name}"
)
except Exception as ex:
_LOG.warning(
f"Error when getting and ingesting document {item.object_name} - {ex}",
Expand All @@ -253,23 +258,28 @@ def _unsync_documents(
try:
endpoint = connection.get("endpoint")

documents = get_documents(base_uri=endpoint)
for document in documents:
store_metadata = document.store_metadata
rag_metadata = document.rag_metadata
bucket_name = store_metadata["bucket_name"]
object_name = store_metadata["object_name"]
try:
client.stat_object(bucket_name=bucket_name, object_name=object_name)
except minio.error.S3Error as ex:
str_ex = str(ex)
if "NoSuchKey" in str_ex and "does not exist" in str_ex:
for _ingest_runner in self._ingest_runners:
for _ingest_runner in self._ingest_runners:
documents = _ingest_runner.get(base_uri=endpoint)
for document in documents:
store_metadata = document.store_metadata
try:
rag_metadata = document.rag_metadata
except AttributeError:
rag_metadata = document.extract_metadata
bucket_name = store_metadata["bucket_name"]
object_name = store_metadata["object_name"]
try:
client.stat_object(
bucket_name=bucket_name, object_name=object_name
)
except Exception as ex:
str_ex = str(ex)
if "NoSuchKey" in str_ex and "does not exist" in str_ex:
_ingest_runner.delete(
document=document, rag_metadata=rag_metadata
)
else:
raise
else:
raise

except:
_LOG.warn("Error fetching and updating documents", exc_info=True)
Expand Down
42 changes: 26 additions & 16 deletions nesis/api/core/document_loaders/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@

import nesis.api.core.util.http as http
from nesis.api.core.document_loaders.stores import SqlDocumentStore
from nesis.api.core.models.entities import Document, Datasource, DocumentObject
from nesis.api.core.models.entities import Document, Datasource
from nesis.api.core.services.util import (
save_document,
get_document,
delete_document,
get_documents,
)
from nesis.api.core.util.dateutil import strptime

Expand Down Expand Up @@ -38,6 +39,10 @@ def save(self, **kwargs) -> Document:
def delete(self, document: Document, **kwargs) -> None:
pass

@abc.abstractmethod
def get(self, **kwargs) -> list:
pass


class ExtractRunner(RagRunner):

Expand Down Expand Up @@ -85,26 +90,29 @@ def run(
)
return json.loads(response)

def get(self, **kwargs) -> list:
return self._extraction_store.get(base_uri=kwargs.get("base_uri"))

def _is_modified(
self, document_id, last_modified: datetime.datetime
) -> Union[bool, None]:
"""
Here we check if this file has been updated.
If the file has been updated, we delete it from the vector store and re-ingest the new updated file
"""
document: DocumentObject = self._extraction_store.get(document_id=document_id)

if document is None or document.last_modified < last_modified:
return False
try:
self.delete(document=document)
except:
_LOG.warning(
f"Failed to delete document {document_id}'s record. Continuing anyway..."
)
documents = self._extraction_store.get(document_id=document_id)
for document in documents:
if document is None or document.last_modified < last_modified:
return False
try:
self.delete(document=document)
except:
_LOG.warning(
f"Failed to delete document {document_id}'s record. Continuing anyway..."
)
return True

def save(self, **kwargs) -> DocumentObject:
def save(self, **kwargs):
return self._extraction_store.save(
document_id=kwargs["document_id"],
datasource_id=kwargs["datasource_id"],
Expand All @@ -116,12 +124,16 @@ def save(self, **kwargs) -> DocumentObject:
filename=kwargs["filename"],
)

def delete(self, document: DocumentObject, **kwargs) -> None:
def delete(self, document, **kwargs) -> None:
self._extraction_store.delete(document_id=document.uuid)


class IngestRunner(RagRunner):

def get(self, **kwargs) -> list:
base_uri = kwargs.get("base_uri")
return get_documents(base_uri=base_uri)

def __init__(self, config, http_client):
self._config = config
self._http_client: http.HttpClient = http_client
Expand Down Expand Up @@ -180,9 +192,7 @@ def _is_modified(
):
return False
try:
self.delete(
document_id=document_id, rag_metadata=document.rag_metadata
)
self.delete(document=document)
except:
_LOG.warning(
f"Failed to delete document {document_id}'s record. Continuing anyway..."
Expand Down
Loading
Loading