From 504b01aaa0dfb9081ba7be53a26cc0035d880dea Mon Sep 17 00:00:00 2001 From: Michael Sekamanya Date: Mon, 1 Jul 2024 19:27:46 -0700 Subject: [PATCH 1/6] Add frontend input for extract destination --- nesis/api/core/document_loaders/minio.py | 14 ++++-- nesis/api/core/document_loaders/validators.py | 22 +++++++++ nesis/api/core/models/entities.py | 10 +++- nesis/api/core/services/datasources.py | 23 ++++++++-- .../core/controllers/test_datasources.py | 36 +++++++++++++++ .../Datasources/DatasourcesDetailPage.js | 46 +++++++++++++++++++ nesis/frontend/server/profile.js | 4 +- 7 files changed, 142 insertions(+), 13 deletions(-) diff --git a/nesis/api/core/document_loaders/minio.py b/nesis/api/core/document_loaders/minio.py index 4fc4bf6..52068e4 100644 --- a/nesis/api/core/document_loaders/minio.py +++ b/nesis/api/core/document_loaders/minio.py @@ -58,15 +58,17 @@ def __init__( self._ingest_runners = [IngestRunner(config=config, http_client=http_client)] - mode = self._datasource.connection.get("mode") or "ingest" + self._mode = self._datasource.connection.get("mode") or "ingest" - match mode: + match self._mode: case "ingest": self._ingest_runners: list[RagRunner] = [_ingest_runner] case "extract": self._ingest_runners: list[RagRunner] = [_extract_runner] case _: - raise ValueError(f"Invalid mode {mode}. Expected 'ingest' or 'extract'") + raise ValueError( + f"Invalid mode {self._mode}. Expected 'ingest' or 'extract'" + ) def run(self, metadata: Dict[str, Any]): connection: Dict[str, str] = self._datasource.connection @@ -179,7 +181,7 @@ def _sync_document( tmp_file = tempfile.NamedTemporaryFile(delete=False) try: _LOG.info( - f"Starting syncing object {item.object_name} in bucket {bucket_name}" + f"Starting {self._mode}ing object {item.object_name} in bucket {bucket_name}" ) file_path = f"{tmp_file.name}-{item.object_name}" @@ -234,7 +236,9 @@ def _sync_document( last_modified=item.last_modified, ) - _LOG.info(f"Done syncing object {item.object_name} in bucket {bucket_name}") + _LOG.info( + f"Done {self._mode}ing object {item.object_name} in bucket {bucket_name}" + ) except Exception as ex: _LOG.warning( f"Error when getting and ingesting document {item.object_name} - {ex}", diff --git a/nesis/api/core/document_loaders/validators.py b/nesis/api/core/document_loaders/validators.py index f98964b..9cc4092 100644 --- a/nesis/api/core/document_loaders/validators.py +++ b/nesis/api/core/document_loaders/validators.py @@ -1,3 +1,5 @@ +from sqlalchemy import engine, create_engine + from nesis.api.core.document_loaders import samba from nesis.api.core.document_loaders import s3 from nesis.api.core.document_loaders import minio @@ -13,6 +15,10 @@ def validate_datasource_connection(datasource) -> dict: if source_type is None: raise ValueError("source_type must be supplied") + mode = datasource.get("mode") + if mode not in ["ingest", "extract", None]: + raise ValueError(f"Mode must be 'ingest' or 'extract'") + try: datasource_type = DatasourceType[source_type.upper()] except KeyError: @@ -22,6 +28,22 @@ def validate_datasource_connection(datasource) -> dict: if connection is None: raise ValueError("Missing connection") + destination_url = ((connection.get("destination") or {}).get("sql") or {}).get( + "url" + ) + if mode == "extract": + if destination_url is None: + raise ValueError("Missing url for destination target") + else: + try: + _engine = create_engine( + url=destination_url, connect_args={"connect_timeout": 10} + ) + with _engine.connect() as conn: + pass + except Exception as ex: + raise ValueError(f"Failed to connect to the destination - {ex}") + match datasource_type: case DatasourceType.WINDOWS_SHARE: return samba.validate_connection_info(connection=connection) diff --git a/nesis/api/core/models/entities.py b/nesis/api/core/models/entities.py index 6361f4a..930dedf 100644 --- a/nesis/api/core/models/entities.py +++ b/nesis/api/core/models/entities.py @@ -100,6 +100,7 @@ class Datasource(Base): uuid = Column(Unicode(255), unique=True, nullable=False) type = Column(Enum(objects.DatasourceType, name="datasource_type"), nullable=False) name = Column(Unicode(255), nullable=False) + schedule = Column(Unicode(255)) enabled = Column(Boolean, default=True, nullable=False) status = Column( Enum(objects.DatasourceStatus, name="datasource_status"), nullable=False @@ -133,6 +134,7 @@ def to_dict(self, **kwargs) -> dict: "enabled": self.enabled, "connection": connection, "status": self.status.name.lower(), + "schedule": self.schedule, } return dict_value @@ -147,6 +149,7 @@ class DocumentObject: filename = Column(Unicode(4096), nullable=False) rag_metadata = Column(JSONB) extract_metadata = Column(JSONB) + # We leave this as nullable to allow for external database tables that will not have the Datasource entity datasource_id = Column(Unicode(255)) store_metadata = Column(JSONB) status = Column( @@ -160,8 +163,13 @@ class DocumentObject: __table_args__ = ( UniqueConstraint( - "uuid", "base_uri", "filename", name="uq_document_uuid_base_url_filename" + "uuid", + "base_uri", + "filename", + "datasource_id", + name="uq_document_uuid_datasource_id", ), + # Index("idx_document_base_uri", "base_uri"), ) def __init__( diff --git a/nesis/api/core/services/datasources.py b/nesis/api/core/services/datasources.py index 4fe67b4..58ecef7 100644 --- a/nesis/api/core/services/datasources.py +++ b/nesis/api/core/services/datasources.py @@ -113,6 +113,8 @@ def create(self, **kwargs): # We validate the schedule (if supplied), before we create the datasource self._validate_schedule(datasource) + entity.schedule = schedule + session.add(entity) session.commit() session.refresh(entity) @@ -301,13 +303,14 @@ def update(self, **kwargs): raise ServiceException(ve) # We validate the schedule (if supplied), before we create the datasource - self._validate_schedule(datasource) + schedule = self._validate_schedule(datasource) + datasource_record.schedule = schedule or datasource_record.schedule session.merge(datasource_record) session.commit() # if we do have a schedule on this datasource, we update it - schedule = datasource.get("schedule") + schedule = datasource.get("schedule") or "" if all([schedule, self._task_service]): task = { "schedule": schedule, @@ -326,9 +329,16 @@ def update(self, **kwargs): ) == 1 ): - self._task_service.update( - token=kwargs["token"], task=task, task_id=task_records[0].uuid - ) + if schedule != "": + self._task_service.update( + token=kwargs["token"], + task=task, + task_id=task_records[0].uuid, + ) + else: + self._task_service.delete( + token=kwargs["token"], task_id=task_records[0].uuid + ) return datasource_record except Exception as e: @@ -346,3 +356,6 @@ def _validate_schedule(self, datasource): validate_schedule(schedule) except Exception as e: raise ServiceException(e) + return schedule + else: + return None diff --git a/nesis/api/tests/core/controllers/test_datasources.py b/nesis/api/tests/core/controllers/test_datasources.py index 9fc8aea..60aebff 100644 --- a/nesis/api/tests/core/controllers/test_datasources.py +++ b/nesis/api/tests/core/controllers/test_datasources.py @@ -162,6 +162,42 @@ def test_create_datasource(client, tc): assert 404 == response.status_code, response.json +def test_create_datasource_destination(client, tc): + payload = { + "type": "minio", + "name": "finance6", + "mode": "extract", + "connection": { + "user": "caikuodda", + "password": "some.password", + "endpoint": "localhost", + "dataobjects": "initdb", + "destination": { + "sql": {"url": "postgresql://localhost"}, + }, + }, + } + + admin_session = get_admin_session(client=client) + + # Test invalid destination url + response = client.post( + f"/v1/datasources", + headers=tests.get_header(token=admin_session["token"]), + data=json.dumps(payload), + ) + assert 400 == response.status_code, response.json + + # Test valid destination url + payload["connection"]["destination"]["sql"]["url"] = tests.config["database"]["url"] + response = client.post( + f"/v1/datasources", + headers=tests.get_header(token=admin_session["token"]), + data=json.dumps(payload), + ) + assert 200 == response.status_code, response.json + + def test_update_datasource(client, tc): # Create a datasource payload = { diff --git a/nesis/frontend/client/src/pages/Settings/Datasources/DatasourcesDetailPage.js b/nesis/frontend/client/src/pages/Settings/Datasources/DatasourcesDetailPage.js index 06c2229..4e5b570 100644 --- a/nesis/frontend/client/src/pages/Settings/Datasources/DatasourcesDetailPage.js +++ b/nesis/frontend/client/src/pages/Settings/Datasources/DatasourcesDetailPage.js @@ -112,6 +112,11 @@ function DataSourceForm({ port: datasource?.connection?.port, region: datasource?.connection?.region, dataobjects: datasource?.connection?.dataobjects, + destination: { + sql: { + url: datasource?.connection?.destination?.sql?.url, + }, + }, }, }, }) { @@ -120,6 +125,14 @@ function DataSourceForm({ const addToast = useAddToast(); function handleSubmit(values, actions) { + values.connection.mode = + !values?.connection?.destination?.sql?.url || + values?.connection?.destination?.sql?.url.trim() == '' + ? 'ingest' + : 'extract'; + if (values.connection.mode == 'ingest') { + values.connection.destination = null; + } client .post(`datasources`, values) .then(() => { @@ -178,6 +191,39 @@ function DataSourceForm({ values?.type, ) &&
} + {['s3', 'minio', 'windows_share', 'sharepoint'].includes( + values?.type, + ) && ( +
+
Extract to
+
+
+ Enter a connection string to extract text and data to +
+ + + + Attribute + Value + + + + + Connection String + + + + + + +
+
+ )} Date: Tue, 2 Jul 2024 07:32:29 -0700 Subject: [PATCH 2/6] Add mssql support --- compose-dev.yml | 29 ++++++-- ...5801a2_use_json_type_for_document_table.py | 74 +++++++++++++++++++ nesis/api/core/models/entities.py | 13 ++-- nesis/api/requirements.txt | 4 + .../tests/core/document_loaders/test_minio.py | 9 ++- 5 files changed, 111 insertions(+), 18 deletions(-) create mode 100644 nesis/api/alembic/versions/96fce45801a2_use_json_type_for_document_table.py diff --git a/compose-dev.yml b/compose-dev.yml index 66be307..450d310 100644 --- a/compose-dev.yml +++ b/compose-dev.yml @@ -4,7 +4,7 @@ version: '3' networks: - ametnes: + nesis: driver: overlay attachable: true @@ -14,14 +14,14 @@ services: ports: - "11211:11211" networks: - - ametnes + - nesis samba: image: andyzhangx/samba:win-fix command: ["-u", "username;password", "-s", "share;/smbshare/;yes;no;no;all;none", "-p"] ports: - '2445:445' networks: - - ametnes + - nesis volumes: - 'samba_data2:/smbshare' environment: @@ -33,7 +33,7 @@ services: - '59000:9000' - '59001:9001' networks: - - ametnes + - nesis volumes: - 'minio_data:/data' environment: @@ -52,7 +52,7 @@ services: - postgres16_data:/var/lib/postgresql/data restart: on-failure networks: - - ametnes + - nesis qdrant: image: qdrant/qdrant:latest restart: always @@ -67,7 +67,7 @@ services: volumes: - ./qdrant_data:/qdrant_data networks: - - ametnes + - nesis chroma: image: ghcr.io/chroma-core/chroma:latest environment: @@ -79,7 +79,7 @@ services: ports: - 18000:8000 networks: - - ametnes + - nesis localstack: image: localstack/localstack:latest @@ -94,7 +94,19 @@ services: volumes: - '/var/run/docker.sock:/var/run/docker.sock' - local_stack:/localstack/data - + mssql: + container_name: sql-server + image: mcr.microsoft.com/mssql/server:2022-latest + restart: always + environment: + ACCEPT_EULA: "Y" + MSSQL_SA_PASSWORD: "Pa55woR.d12345" + ports: + - "11433:1433" +# volumes: +# - mssql_data:/var/opt/mssql + networks: + - nesis volumes: minio_data: samba_data2: @@ -102,3 +114,4 @@ volumes: qdrant_data: chroma-data: local_stack: + mssql_data: diff --git a/nesis/api/alembic/versions/96fce45801a2_use_json_type_for_document_table.py b/nesis/api/alembic/versions/96fce45801a2_use_json_type_for_document_table.py new file mode 100644 index 0000000..0ce5e7c --- /dev/null +++ b/nesis/api/alembic/versions/96fce45801a2_use_json_type_for_document_table.py @@ -0,0 +1,74 @@ +"""use json type for document table + +Revision ID: 96fce45801a2 +Revises: ea840c8d9e58 +Create Date: 2024-07-02 07:26:03.614939 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = '96fce45801a2' +down_revision: Union[str, None] = 'ea840c8d9e58' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('datasource', sa.Column('schedule', sa.Unicode(length=255), nullable=True)) + op.alter_column('document', 'base_uri', + existing_type=sa.VARCHAR(length=4096), + type_=sa.Unicode(length=4000), + existing_nullable=False) + op.alter_column('document', 'filename', + existing_type=sa.VARCHAR(length=4096), + type_=sa.Unicode(length=4000), + existing_nullable=False) + op.alter_column('document', 'rag_metadata', + existing_type=postgresql.JSONB(astext_type=sa.Text()), + type_=sa.JSON(), + existing_nullable=True) + op.alter_column('document', 'extract_metadata', + existing_type=postgresql.JSONB(astext_type=sa.Text()), + type_=sa.JSON(), + existing_nullable=True) + op.alter_column('document', 'store_metadata', + existing_type=postgresql.JSONB(astext_type=sa.Text()), + type_=sa.JSON(), + existing_nullable=True) + op.drop_constraint('uq_document_uuid_base_url_filename', 'document', type_='unique') + op.create_unique_constraint('uq_document_uuid_datasource_id', 'document', ['uuid', 'datasource_id']) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint('uq_document_uuid_datasource_id', 'document', type_='unique') + op.create_unique_constraint('uq_document_uuid_base_url_filename', 'document', ['uuid', 'base_uri', 'filename']) + op.alter_column('document', 'store_metadata', + existing_type=sa.JSON(), + type_=postgresql.JSONB(astext_type=sa.Text()), + existing_nullable=True) + op.alter_column('document', 'extract_metadata', + existing_type=sa.JSON(), + type_=postgresql.JSONB(astext_type=sa.Text()), + existing_nullable=True) + op.alter_column('document', 'rag_metadata', + existing_type=sa.JSON(), + type_=postgresql.JSONB(astext_type=sa.Text()), + existing_nullable=True) + op.alter_column('document', 'filename', + existing_type=sa.Unicode(length=4000), + type_=sa.VARCHAR(length=4096), + existing_nullable=False) + op.alter_column('document', 'base_uri', + existing_type=sa.Unicode(length=4000), + type_=sa.VARCHAR(length=4096), + existing_nullable=False) + op.drop_column('datasource', 'schedule') + # ### end Alembic commands ### diff --git a/nesis/api/core/models/entities.py b/nesis/api/core/models/entities.py index 930dedf..d68a23e 100644 --- a/nesis/api/core/models/entities.py +++ b/nesis/api/core/models/entities.py @@ -22,6 +22,7 @@ Enum, ForeignKeyConstraint, Text, + JSON, ) DEFAULT_DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" @@ -145,13 +146,13 @@ class DocumentObject: id = Column(BigInteger, primary_key=True, autoincrement=True) uuid = Column(Unicode(255), nullable=False) # This is likely the endpoint e.g. hostname, URL, SambaShare e.t.c - base_uri = Column(Unicode(4096), nullable=False) - filename = Column(Unicode(4096), nullable=False) - rag_metadata = Column(JSONB) - extract_metadata = Column(JSONB) + base_uri = Column(Unicode(4000), nullable=False) + filename = Column(Unicode(4000), nullable=False) + rag_metadata = Column(JSON) + extract_metadata = Column(JSON) # We leave this as nullable to allow for external database tables that will not have the Datasource entity datasource_id = Column(Unicode(255)) - store_metadata = Column(JSONB) + store_metadata = Column(JSON) status = Column( Enum(objects.DocumentStatus, name="document_status"), nullable=False, @@ -164,8 +165,6 @@ class DocumentObject: __table_args__ = ( UniqueConstraint( "uuid", - "base_uri", - "filename", "datasource_id", name="uq_document_uuid_datasource_id", ), diff --git a/nesis/api/requirements.txt b/nesis/api/requirements.txt index 9c50d44..694a496 100644 --- a/nesis/api/requirements.txt +++ b/nesis/api/requirements.txt @@ -32,3 +32,7 @@ dropbox==11.36.2 injector==0.21.0 apscheduler==3.10.4 + +# sql engines as text extract destination +pymssql==2.3.0 +oracledb==2.2.1 diff --git a/nesis/api/tests/core/document_loaders/test_minio.py b/nesis/api/tests/core/document_loaders/test_minio.py index e587772..c3c90be 100644 --- a/nesis/api/tests/core/document_loaders/test_minio.py +++ b/nesis/api/tests/core/document_loaders/test_minio.py @@ -128,6 +128,8 @@ def test_ingest_documents( def test_extract_documents( minio_instance: mock.MagicMock, cache: mock.MagicMock, session: Session ) -> None: + destination_sql_url = tests.config["database"]["url"] + # destination_sql_url = "mssql+pymssql://sa:Pa55woR.d12345@localhost:11433/master" data = { "name": "s3 documents", "engine": "s3", @@ -138,7 +140,7 @@ def test_extract_documents( "dataobjects": "buckets", "mode": "extract", "destination": { - "sql": {"url": tests.config["database"]["url"]}, + "sql": {"url": destination_sql_url}, }, }, } @@ -154,7 +156,7 @@ def test_extract_documents( session.commit() http_client = mock.MagicMock() - http_client.upload.return_value = json.dumps({}) + http_client.upload.return_value = json.dumps({"data": {}}) minio_client = mock.MagicMock() bucket = mock.MagicMock() @@ -202,4 +204,5 @@ def test_extract_documents( ) with Session(extract_store._engine) as session: - assert len(session.query(Document).filter().all()) == initial_count + 1 + all_documents = session.query(Document).filter().all() + assert len(all_documents) == initial_count + 1 From 6a4744c6c789f33add86653f71edb63bf062c41b Mon Sep 17 00:00:00 2001 From: Michael Sekamanya Date: Tue, 2 Jul 2024 10:31:26 -0700 Subject: [PATCH 3/6] Fix formating for migrations --- ...5801a2_use_json_type_for_document_table.py | 135 +++++++++++------- 1 file changed, 87 insertions(+), 48 deletions(-) diff --git a/nesis/api/alembic/versions/96fce45801a2_use_json_type_for_document_table.py b/nesis/api/alembic/versions/96fce45801a2_use_json_type_for_document_table.py index 0ce5e7c..940a915 100644 --- a/nesis/api/alembic/versions/96fce45801a2_use_json_type_for_document_table.py +++ b/nesis/api/alembic/versions/96fce45801a2_use_json_type_for_document_table.py @@ -5,6 +5,7 @@ Create Date: 2024-07-02 07:26:03.614939 """ + from typing import Sequence, Union from alembic import op @@ -12,63 +13,101 @@ from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. -revision: str = '96fce45801a2' -down_revision: Union[str, None] = 'ea840c8d9e58' +revision: str = "96fce45801a2" +down_revision: Union[str, None] = "ea840c8d9e58" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None def upgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### - op.add_column('datasource', sa.Column('schedule', sa.Unicode(length=255), nullable=True)) - op.alter_column('document', 'base_uri', - existing_type=sa.VARCHAR(length=4096), - type_=sa.Unicode(length=4000), - existing_nullable=False) - op.alter_column('document', 'filename', - existing_type=sa.VARCHAR(length=4096), - type_=sa.Unicode(length=4000), - existing_nullable=False) - op.alter_column('document', 'rag_metadata', - existing_type=postgresql.JSONB(astext_type=sa.Text()), - type_=sa.JSON(), - existing_nullable=True) - op.alter_column('document', 'extract_metadata', - existing_type=postgresql.JSONB(astext_type=sa.Text()), - type_=sa.JSON(), - existing_nullable=True) - op.alter_column('document', 'store_metadata', - existing_type=postgresql.JSONB(astext_type=sa.Text()), - type_=sa.JSON(), - existing_nullable=True) - op.drop_constraint('uq_document_uuid_base_url_filename', 'document', type_='unique') - op.create_unique_constraint('uq_document_uuid_datasource_id', 'document', ['uuid', 'datasource_id']) + op.add_column( + "datasource", sa.Column("schedule", sa.Unicode(length=255), nullable=True) + ) + op.alter_column( + "document", + "base_uri", + existing_type=sa.VARCHAR(length=4096), + type_=sa.Unicode(length=4000), + existing_nullable=False, + ) + op.alter_column( + "document", + "filename", + existing_type=sa.VARCHAR(length=4096), + type_=sa.Unicode(length=4000), + existing_nullable=False, + ) + op.alter_column( + "document", + "rag_metadata", + existing_type=postgresql.JSONB(astext_type=sa.Text()), + type_=sa.JSON(), + existing_nullable=True, + ) + op.alter_column( + "document", + "extract_metadata", + existing_type=postgresql.JSONB(astext_type=sa.Text()), + type_=sa.JSON(), + existing_nullable=True, + ) + op.alter_column( + "document", + "store_metadata", + existing_type=postgresql.JSONB(astext_type=sa.Text()), + type_=sa.JSON(), + existing_nullable=True, + ) + op.drop_constraint("uq_document_uuid_base_url_filename", "document", type_="unique") + op.create_unique_constraint( + "uq_document_uuid_datasource_id", "document", ["uuid", "datasource_id"] + ) # ### end Alembic commands ### def downgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### - op.drop_constraint('uq_document_uuid_datasource_id', 'document', type_='unique') - op.create_unique_constraint('uq_document_uuid_base_url_filename', 'document', ['uuid', 'base_uri', 'filename']) - op.alter_column('document', 'store_metadata', - existing_type=sa.JSON(), - type_=postgresql.JSONB(astext_type=sa.Text()), - existing_nullable=True) - op.alter_column('document', 'extract_metadata', - existing_type=sa.JSON(), - type_=postgresql.JSONB(astext_type=sa.Text()), - existing_nullable=True) - op.alter_column('document', 'rag_metadata', - existing_type=sa.JSON(), - type_=postgresql.JSONB(astext_type=sa.Text()), - existing_nullable=True) - op.alter_column('document', 'filename', - existing_type=sa.Unicode(length=4000), - type_=sa.VARCHAR(length=4096), - existing_nullable=False) - op.alter_column('document', 'base_uri', - existing_type=sa.Unicode(length=4000), - type_=sa.VARCHAR(length=4096), - existing_nullable=False) - op.drop_column('datasource', 'schedule') + op.drop_constraint("uq_document_uuid_datasource_id", "document", type_="unique") + op.create_unique_constraint( + "uq_document_uuid_base_url_filename", + "document", + ["uuid", "base_uri", "filename"], + ) + op.alter_column( + "document", + "store_metadata", + existing_type=sa.JSON(), + type_=postgresql.JSONB(astext_type=sa.Text()), + existing_nullable=True, + ) + op.alter_column( + "document", + "extract_metadata", + existing_type=sa.JSON(), + type_=postgresql.JSONB(astext_type=sa.Text()), + existing_nullable=True, + ) + op.alter_column( + "document", + "rag_metadata", + existing_type=sa.JSON(), + type_=postgresql.JSONB(astext_type=sa.Text()), + existing_nullable=True, + ) + op.alter_column( + "document", + "filename", + existing_type=sa.Unicode(length=4000), + type_=sa.VARCHAR(length=4096), + existing_nullable=False, + ) + op.alter_column( + "document", + "base_uri", + existing_type=sa.Unicode(length=4000), + type_=sa.VARCHAR(length=4096), + existing_nullable=False, + ) + op.drop_column("datasource", "schedule") # ### end Alembic commands ### From 5ed017fb2b77c7adc61a398954410a1a6772d7c4 Mon Sep 17 00:00:00 2001 From: Michael Sekamanya Date: Tue, 2 Jul 2024 11:36:29 -0700 Subject: [PATCH 4/6] Fix update datasource schedule --- nesis/api/core/services/datasources.py | 18 +++++++++--------- .../Datasources/DatasourcesDetailPage.js | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/nesis/api/core/services/datasources.py b/nesis/api/core/services/datasources.py index 58ecef7..32ab417 100644 --- a/nesis/api/core/services/datasources.py +++ b/nesis/api/core/services/datasources.py @@ -304,17 +304,13 @@ def update(self, **kwargs): # We validate the schedule (if supplied), before we create the datasource schedule = self._validate_schedule(datasource) - datasource_record.schedule = schedule or datasource_record.schedule + datasource_record.schedule = schedule session.merge(datasource_record) session.commit() # if we do have a schedule on this datasource, we update it - schedule = datasource.get("schedule") or "" - if all([schedule, self._task_service]): - task = { - "schedule": schedule, - } + if self._task_service: task_records: List[Task] = self._task_service.get( token=kwargs["token"], parent_id=datasource_id ) @@ -329,7 +325,10 @@ def update(self, **kwargs): ) == 1 ): - if schedule != "": + if schedule is not None: + task = { + "schedule": schedule, + } self._task_service.update( token=kwargs["token"], task=task, @@ -349,9 +348,10 @@ def update(self, **kwargs): if session: session.close() - def _validate_schedule(self, datasource): + @staticmethod + def _validate_schedule(datasource): schedule = datasource.get("schedule") - if schedule is not None: + if schedule is not None and schedule.strip() != "": try: validate_schedule(schedule) except Exception as e: diff --git a/nesis/frontend/client/src/pages/Settings/Datasources/DatasourcesDetailPage.js b/nesis/frontend/client/src/pages/Settings/Datasources/DatasourcesDetailPage.js index 4e5b570..0187eee 100644 --- a/nesis/frontend/client/src/pages/Settings/Datasources/DatasourcesDetailPage.js +++ b/nesis/frontend/client/src/pages/Settings/Datasources/DatasourcesDetailPage.js @@ -104,6 +104,7 @@ function DataSourceForm({ id: datasource?.id, name: datasource?.name, type: datasource?.type, + schedule: datasource?.schedule, connection: { user: datasource?.connection?.user, client_id: datasource?.connection?.client_id, @@ -370,7 +371,6 @@ function minioConnection() { id="password" placeholder="password" name="connection.password" - validate={required} /> From 8f044c07826f140f6a46c0cee9bd431fc4b39bca Mon Sep 17 00:00:00 2001 From: Michael Sekamanya Date: Wed, 3 Jul 2024 04:58:27 -0700 Subject: [PATCH 5/6] Revert to JSONB for document table --- ...add_datasource_rag_processing_info_to_.py} | 34 +++-- ...5801a2_use_json_type_for_document_table.py | 113 ----------------- nesis/api/core/document_loaders/minio.py | 9 +- nesis/api/core/document_loaders/runners.py | 12 +- nesis/api/core/document_loaders/stores.py | 117 +++++++++++++----- nesis/api/core/models/entities.py | 17 +-- nesis/api/core/services/task_service.py | 7 +- .../tests/core/document_loaders/test_minio.py | 13 +- nesis/rag/requirements.txt | 1 + version.txt | 2 +- 10 files changed, 147 insertions(+), 178 deletions(-) rename nesis/api/alembic/versions/{ea840c8d9e58_add_extract_management_fields_to_.py => 090822101cb5_add_datasource_rag_processing_info_to_.py} (76%) delete mode 100644 nesis/api/alembic/versions/96fce45801a2_use_json_type_for_document_table.py diff --git a/nesis/api/alembic/versions/ea840c8d9e58_add_extract_management_fields_to_.py b/nesis/api/alembic/versions/090822101cb5_add_datasource_rag_processing_info_to_.py similarity index 76% rename from nesis/api/alembic/versions/ea840c8d9e58_add_extract_management_fields_to_.py rename to nesis/api/alembic/versions/090822101cb5_add_datasource_rag_processing_info_to_.py index d600b48..38d1e6a 100644 --- a/nesis/api/alembic/versions/ea840c8d9e58_add_extract_management_fields_to_.py +++ b/nesis/api/alembic/versions/090822101cb5_add_datasource_rag_processing_info_to_.py @@ -1,8 +1,8 @@ -"""add extract management fields to document +"""add datasource,rag processing info to document -Revision ID: ea840c8d9e58 +Revision ID: 090822101cb5 Revises: 7cfa662dff86 -Create Date: 2024-07-01 12:29:36.310411 +Create Date: 2024-07-03 04:28:31.842226 """ @@ -13,7 +13,7 @@ from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. -revision: str = "ea840c8d9e58" +revision: str = "090822101cb5" down_revision: Union[str, None] = "7cfa662dff86" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None @@ -28,11 +28,9 @@ def upgrade() -> None: document_status.create(op.get_bind()) op.add_column( - "document", - sa.Column( - "extract_metadata", postgresql.JSONB(astext_type=sa.Text()), nullable=True - ), + "datasource", sa.Column("schedule", sa.Unicode(length=255), nullable=True) ) + op.add_column( "document", sa.Column("datasource_id", sa.Unicode(length=255), nullable=True) ) @@ -71,11 +69,20 @@ def upgrade() -> None: existing_type=postgresql.JSONB(astext_type=sa.Text()), nullable=True, ) + + op.drop_constraint("uq_document_uuid_base_url_filename", "document", type_="unique") + op.create_unique_constraint( + "uq_document_uuid_datasource_id", "document", ["uuid", "datasource_id"] + ) + + op.create_index(op.f("idx_document_base_uri"), "document", ["base_uri"]) # ### end Alembic commands ### def downgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("datasource", "schedule") + op.alter_column( "document", "rag_metadata", @@ -101,7 +108,16 @@ def downgrade() -> None: op.drop_column("document", "last_modified") op.drop_column("document", "status") op.drop_column("document", "datasource_id") - op.drop_column("document", "extract_metadata") document_status.drop(op.get_bind()) + + # op.drop_constraint("uq_document_uuid_datasource_id", "document", type_="unique") + op.create_unique_constraint( + "uq_document_uuid_base_url_filename", + "document", + ["uuid", "base_uri", "filename"], + ) + + op.drop_index(op.f("idx_document_base_uri"), table_name="document") + # ### end Alembic commands ### diff --git a/nesis/api/alembic/versions/96fce45801a2_use_json_type_for_document_table.py b/nesis/api/alembic/versions/96fce45801a2_use_json_type_for_document_table.py deleted file mode 100644 index 940a915..0000000 --- a/nesis/api/alembic/versions/96fce45801a2_use_json_type_for_document_table.py +++ /dev/null @@ -1,113 +0,0 @@ -"""use json type for document table - -Revision ID: 96fce45801a2 -Revises: ea840c8d9e58 -Create Date: 2024-07-02 07:26:03.614939 - -""" - -from typing import Sequence, Union - -from alembic import op -import sqlalchemy as sa -from sqlalchemy.dialects import postgresql - -# revision identifiers, used by Alembic. -revision: str = "96fce45801a2" -down_revision: Union[str, None] = "ea840c8d9e58" -branch_labels: Union[str, Sequence[str], None] = None -depends_on: Union[str, Sequence[str], None] = None - - -def upgrade() -> None: - # ### commands auto generated by Alembic - please adjust! ### - op.add_column( - "datasource", sa.Column("schedule", sa.Unicode(length=255), nullable=True) - ) - op.alter_column( - "document", - "base_uri", - existing_type=sa.VARCHAR(length=4096), - type_=sa.Unicode(length=4000), - existing_nullable=False, - ) - op.alter_column( - "document", - "filename", - existing_type=sa.VARCHAR(length=4096), - type_=sa.Unicode(length=4000), - existing_nullable=False, - ) - op.alter_column( - "document", - "rag_metadata", - existing_type=postgresql.JSONB(astext_type=sa.Text()), - type_=sa.JSON(), - existing_nullable=True, - ) - op.alter_column( - "document", - "extract_metadata", - existing_type=postgresql.JSONB(astext_type=sa.Text()), - type_=sa.JSON(), - existing_nullable=True, - ) - op.alter_column( - "document", - "store_metadata", - existing_type=postgresql.JSONB(astext_type=sa.Text()), - type_=sa.JSON(), - existing_nullable=True, - ) - op.drop_constraint("uq_document_uuid_base_url_filename", "document", type_="unique") - op.create_unique_constraint( - "uq_document_uuid_datasource_id", "document", ["uuid", "datasource_id"] - ) - # ### end Alembic commands ### - - -def downgrade() -> None: - # ### commands auto generated by Alembic - please adjust! ### - op.drop_constraint("uq_document_uuid_datasource_id", "document", type_="unique") - op.create_unique_constraint( - "uq_document_uuid_base_url_filename", - "document", - ["uuid", "base_uri", "filename"], - ) - op.alter_column( - "document", - "store_metadata", - existing_type=sa.JSON(), - type_=postgresql.JSONB(astext_type=sa.Text()), - existing_nullable=True, - ) - op.alter_column( - "document", - "extract_metadata", - existing_type=sa.JSON(), - type_=postgresql.JSONB(astext_type=sa.Text()), - existing_nullable=True, - ) - op.alter_column( - "document", - "rag_metadata", - existing_type=sa.JSON(), - type_=postgresql.JSONB(astext_type=sa.Text()), - existing_nullable=True, - ) - op.alter_column( - "document", - "filename", - existing_type=sa.Unicode(length=4000), - type_=sa.VARCHAR(length=4096), - existing_nullable=False, - ) - op.alter_column( - "document", - "base_uri", - existing_type=sa.Unicode(length=4000), - type_=sa.VARCHAR(length=4096), - existing_nullable=False, - ) - op.drop_column("datasource", "schedule") - # ### end Alembic commands ### diff --git a/nesis/api/core/document_loaders/minio.py b/nesis/api/core/document_loaders/minio.py index 52068e4..2eb13a9 100644 --- a/nesis/api/core/document_loaders/minio.py +++ b/nesis/api/core/document_loaders/minio.py @@ -5,7 +5,7 @@ import os import queue import tempfile -from typing import Dict, Any +from typing import Dict, Any, Optional import memcache import minio @@ -46,10 +46,11 @@ def __init__( self._cache_client = cache_client self._datasource = datasource - _extract_runner = None + # This is left public for testing + self._extract_runner: ExtractRunner = Optional[None] _ingest_runner = IngestRunner(config=config, http_client=http_client) if self._datasource.connection.get("destination") is not None: - _extract_runner = ExtractRunner( + self._extract_runner = ExtractRunner( config=config, http_client=http_client, destination=self._datasource.connection.get("destination"), @@ -64,7 +65,7 @@ def __init__( case "ingest": self._ingest_runners: list[RagRunner] = [_ingest_runner] case "extract": - self._ingest_runners: list[RagRunner] = [_extract_runner] + self._ingest_runners: list[RagRunner] = [self._extract_runner] case _: raise ValueError( f"Invalid mode {self._mode}. Expected 'ingest' or 'extract'" diff --git a/nesis/api/core/document_loaders/runners.py b/nesis/api/core/document_loaders/runners.py index 641b9fe..9e29ea4 100644 --- a/nesis/api/core/document_loaders/runners.py +++ b/nesis/api/core/document_loaders/runners.py @@ -6,7 +6,7 @@ import nesis.api.core.util.http as http from nesis.api.core.document_loaders.stores import SqlDocumentStore -from nesis.api.core.models.entities import Document, Datasource, DocumentObject +from nesis.api.core.models.entities import Document, Datasource from nesis.api.core.services.util import ( save_document, get_document, @@ -92,7 +92,7 @@ def _is_modified( Here we check if this file has been updated. If the file has been updated, we delete it from the vector store and re-ingest the new updated file """ - document: DocumentObject = self._extraction_store.get(document_id=document_id) + document = self._extraction_store.get(document_id=document_id) if document is None or document.last_modified < last_modified: return False @@ -104,7 +104,7 @@ def _is_modified( ) return True - def save(self, **kwargs) -> DocumentObject: + def save(self, **kwargs): return self._extraction_store.save( document_id=kwargs["document_id"], datasource_id=kwargs["datasource_id"], @@ -116,7 +116,7 @@ def save(self, **kwargs) -> DocumentObject: filename=kwargs["filename"], ) - def delete(self, document: DocumentObject, **kwargs) -> None: + def delete(self, document, **kwargs) -> None: self._extraction_store.delete(document_id=document.uuid) @@ -180,9 +180,7 @@ def _is_modified( ): return False try: - self.delete( - document_id=document_id, rag_metadata=document.rag_metadata - ) + self.delete(document=document) except: _LOG.warning( f"Failed to delete document {document_id}'s record. Continuing anyway..." diff --git a/nesis/api/core/document_loaders/stores.py b/nesis/api/core/document_loaders/stores.py index 68bc314..8d181de 100644 --- a/nesis/api/core/document_loaders/stores.py +++ b/nesis/api/core/document_loaders/stores.py @@ -1,10 +1,25 @@ import json import logging +import datetime as dt +import os +import uuid import sqlalchemy as sa -from sqlalchemy.orm import registry, Session - -from nesis.api.core.models.entities import Document, DocumentObject +from sqlalchemy import ( + Column, + UniqueConstraint, + Index, + BigInteger, + Unicode, + JSON, + DateTime, + Text, + Enum, +) +from sqlalchemy.exc import OperationalError +from sqlalchemy.orm import Session, declarative_base + +from nesis.api.core.models import objects _LOG = logging.getLogger(__name__) @@ -45,38 +60,82 @@ def __init__(self, url, echo=False, pool_size=10): pool_size=pool_size, ) - mapper_registry = registry() + Base = declarative_base() + + """ + 1. Create a dynamic class/type so as to allow for multiple database connections. + For example, we can extract data to a SQL Server or Postgres instance and avoid the class already mapped exception + + 2. We use JSON database to allow for other database + """ + self.Store = type( + f"Store{str(uuid.uuid4()).split('-')[0]}", + (Base,), + { + "__tablename__": "document_extract", + "id": Column(BigInteger, primary_key=True, autoincrement=True), + "uuid": Column(Unicode(255), nullable=False), + "base_uri": Column(Unicode(4000), nullable=False), + "filename": Column(Unicode(4000), nullable=False), + "extract_metadata": Column(JSON), + "datasource_id": Column(Unicode(255)), + "store_metadata": Column(JSON), + "status": Column( + Enum(objects.DocumentStatus, name="document_status"), + nullable=False, + default=objects.DocumentStatus.PROCESSING, + ), + "last_modified": Column( + DateTime, default=dt.datetime.utcnow, nullable=False + ), + "last_processed": Column( + DateTime, default=dt.datetime.utcnow, nullable=False + ), + "last_processed_message": Column(Text), + "__table_args__": ( + UniqueConstraint( + "uuid", + "datasource_id", + name="uq_document_extract_uuid_datasource_id", + ), + Index("idx_document_extract_base_uri", "base_uri"), + ), + }, + ) try: - mapper_registry.map_declaratively(DocumentObject) - - except sa.exc.ArgumentError as ex: - # Table is already mapped - _LOG.warning("Error mapping store", exc_info=True) - pass - mapper_registry.metadata.create_all(self._engine) - - def get(self, document_id) -> DocumentObject: + Base.metadata.create_all(self._engine) + except Exception as ex: + ext_str = str(ex) + if ( + "uq_document_extract_uuid_datasource_id" in ext_str + or "idx_document_extract_base_uri" in ext_str + ): + # This would be an error about the index or unique key + pass + else: + raise ex + + def get(self, document_id): with Session(self._engine) as session: return ( - session.query(DocumentObject) - .filter(DocumentObject.uuid == document_id) - .first() + session.query(self.Store).filter(self.Store.uuid == document_id).first() ) - def save(self, **kwargs) -> DocumentObject: + def save(self, **kwargs): with Session(self._engine) as session: session.expire_on_commit = False - store_record = Document( - document_id=kwargs["document_id"], - datasource_id=kwargs["datasource_id"], - extract_metadata=kwargs["extract_metadata"], - store_metadata=kwargs["store_metadata"], - last_modified=kwargs["last_modified"], - base_uri=kwargs["base_uri"], - rag_metadata=kwargs["rag_metadata"], - filename=kwargs["filename"], - ) + store_record = self.Store() + + store_record.uuid = kwargs["document_id"] + store_record.datasource_id = kwargs["datasource_id"] + store_record.extract_metadata = kwargs["extract_metadata"] + store_record.store_metadata = kwargs["store_metadata"] + store_record.last_modified = kwargs["last_modified"] + store_record.base_uri = kwargs["base_uri"] + store_record.rag_metadata = kwargs["rag_metadata"] + store_record.filename = kwargs["filename"] + session.add(store_record) session.commit() @@ -85,8 +144,8 @@ def save(self, **kwargs) -> DocumentObject: def delete(self, document_id): with Session(self._engine) as session: store_record = ( - session.query(DocumentObject) - .filter(DocumentObject.document_id == document_id) + session.query(self.Store) + .filter(self.Store.document_id == document_id) .first() ) session.delete(store_record) diff --git a/nesis/api/core/models/entities.py b/nesis/api/core/models/entities.py index d68a23e..abd8277 100644 --- a/nesis/api/core/models/entities.py +++ b/nesis/api/core/models/entities.py @@ -141,18 +141,16 @@ def to_dict(self, **kwargs) -> dict: return dict_value -class DocumentObject: +class Document(Base): __tablename__ = "document" id = Column(BigInteger, primary_key=True, autoincrement=True) uuid = Column(Unicode(255), nullable=False) # This is likely the endpoint e.g. hostname, URL, SambaShare e.t.c base_uri = Column(Unicode(4000), nullable=False) filename = Column(Unicode(4000), nullable=False) - rag_metadata = Column(JSON) - extract_metadata = Column(JSON) - # We leave this as nullable to allow for external database tables that will not have the Datasource entity - datasource_id = Column(Unicode(255)) - store_metadata = Column(JSON) + rag_metadata = Column(JSONB) + datasource_id = Column(Unicode(length=255)) + store_metadata = Column(JSONB) status = Column( Enum(objects.DocumentStatus, name="document_status"), nullable=False, @@ -168,7 +166,7 @@ class DocumentObject: "datasource_id", name="uq_document_uuid_datasource_id", ), - # Index("idx_document_base_uri", "base_uri"), + Index("idx_document_base_uri", "base_uri"), ) def __init__( @@ -213,11 +211,6 @@ def to_dict(self, **kwargs) -> dict: return dict_value -class Document(Base, DocumentObject): - def __init__(self, **kwargs): - DocumentObject.__init__(self, **kwargs) - - # RBAC class User(Base): __tablename__ = "users" diff --git a/nesis/api/core/services/task_service.py b/nesis/api/core/services/task_service.py index 203df9e..296a881 100644 --- a/nesis/api/core/services/task_service.py +++ b/nesis/api/core/services/task_service.py @@ -11,6 +11,7 @@ EVENT_JOB_ADDED, ) from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor +from apscheduler.jobstores.base import JobLookupError from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.date import DateTrigger @@ -327,7 +328,11 @@ def delete(self, **kwargs): session.delete(task) session.commit() - self._scheduler.remove_job(job_id=task.uuid) + try: + self._scheduler.remove_job(job_id=task.uuid) + except JobLookupError: + # Ignore the error if job does not exist for some reason + pass except Exception: session.rollback() diff --git a/nesis/api/tests/core/document_loaders/test_minio.py b/nesis/api/tests/core/document_loaders/test_minio.py index c3c90be..8768119 100644 --- a/nesis/api/tests/core/document_loaders/test_minio.py +++ b/nesis/api/tests/core/document_loaders/test_minio.py @@ -180,8 +180,13 @@ def test_extract_documents( extract_store = SqlDocumentStore( url=data["connection"]["destination"]["sql"]["url"] ) + with Session(extract_store._engine) as session: - initial_count = len(session.query(Document).filter().all()) + initial_count = len( + session.query(minio_ingestor._extract_runner._extraction_store.Store) + .filter() + .all() + ) minio_ingestor.run( metadata={"datasource": "documents"}, @@ -204,5 +209,9 @@ def test_extract_documents( ) with Session(extract_store._engine) as session: - all_documents = session.query(Document).filter().all() + all_documents = ( + session.query(minio_ingestor._extract_runner._extraction_store.Store) + .filter() + .all() + ) assert len(all_documents) == initial_count + 1 diff --git a/nesis/rag/requirements.txt b/nesis/rag/requirements.txt index 2e1a89d..27fc738 100644 --- a/nesis/rag/requirements.txt +++ b/nesis/rag/requirements.txt @@ -37,6 +37,7 @@ pydub==0.25.1 odfpy==1.4.1 EbookLib==0.18 html2text==2024.2.26 +pikepdf==8.15.0 # Dependency for video/audio encoders openai-whisper @ git+https://github.com/openai/whisper.git diff --git a/version.txt b/version.txt index 12023bf..a03a18d 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.1.3-rc11 +0.1.3-rc14 From f15978ddd1d5e257fbf2d4f7ff73c4052c8f53a0 Mon Sep 17 00:00:00 2001 From: Michael Sekamanya Date: Wed, 3 Jul 2024 11:33:21 -0700 Subject: [PATCH 6/6] Fix extract data store delete --- nesis/api/core/document_loaders/minio.py | 33 ++-- nesis/api/core/document_loaders/runners.py | 32 ++-- nesis/api/core/document_loaders/stores.py | 18 +- .../tests/core/document_loaders/test_minio.py | 158 ++++++++++++++++++ version.txt | 2 +- 5 files changed, 211 insertions(+), 32 deletions(-) diff --git a/nesis/api/core/document_loaders/minio.py b/nesis/api/core/document_loaders/minio.py index 2eb13a9..83572a6 100644 --- a/nesis/api/core/document_loaders/minio.py +++ b/nesis/api/core/document_loaders/minio.py @@ -258,23 +258,28 @@ def _unsync_documents( try: endpoint = connection.get("endpoint") - documents = get_documents(base_uri=endpoint) - for document in documents: - store_metadata = document.store_metadata - rag_metadata = document.rag_metadata - bucket_name = store_metadata["bucket_name"] - object_name = store_metadata["object_name"] - try: - client.stat_object(bucket_name=bucket_name, object_name=object_name) - except minio.error.S3Error as ex: - str_ex = str(ex) - if "NoSuchKey" in str_ex and "does not exist" in str_ex: - for _ingest_runner in self._ingest_runners: + for _ingest_runner in self._ingest_runners: + documents = _ingest_runner.get(base_uri=endpoint) + for document in documents: + store_metadata = document.store_metadata + try: + rag_metadata = document.rag_metadata + except AttributeError: + rag_metadata = document.extract_metadata + bucket_name = store_metadata["bucket_name"] + object_name = store_metadata["object_name"] + try: + client.stat_object( + bucket_name=bucket_name, object_name=object_name + ) + except Exception as ex: + str_ex = str(ex) + if "NoSuchKey" in str_ex and "does not exist" in str_ex: _ingest_runner.delete( document=document, rag_metadata=rag_metadata ) - else: - raise + else: + raise except: _LOG.warn("Error fetching and updating documents", exc_info=True) diff --git a/nesis/api/core/document_loaders/runners.py b/nesis/api/core/document_loaders/runners.py index 9e29ea4..01e4167 100644 --- a/nesis/api/core/document_loaders/runners.py +++ b/nesis/api/core/document_loaders/runners.py @@ -11,6 +11,7 @@ save_document, get_document, delete_document, + get_documents, ) from nesis.api.core.util.dateutil import strptime @@ -38,6 +39,10 @@ def save(self, **kwargs) -> Document: def delete(self, document: Document, **kwargs) -> None: pass + @abc.abstractmethod + def get(self, **kwargs) -> list: + pass + class ExtractRunner(RagRunner): @@ -85,6 +90,9 @@ def run( ) return json.loads(response) + def get(self, **kwargs) -> list: + return self._extraction_store.get(base_uri=kwargs.get("base_uri")) + def _is_modified( self, document_id, last_modified: datetime.datetime ) -> Union[bool, None]: @@ -92,16 +100,16 @@ def _is_modified( Here we check if this file has been updated. If the file has been updated, we delete it from the vector store and re-ingest the new updated file """ - document = self._extraction_store.get(document_id=document_id) - - if document is None or document.last_modified < last_modified: - return False - try: - self.delete(document=document) - except: - _LOG.warning( - f"Failed to delete document {document_id}'s record. Continuing anyway..." - ) + documents = self._extraction_store.get(document_id=document_id) + for document in documents: + if document is None or document.last_modified < last_modified: + return False + try: + self.delete(document=document) + except: + _LOG.warning( + f"Failed to delete document {document_id}'s record. Continuing anyway..." + ) return True def save(self, **kwargs): @@ -122,6 +130,10 @@ def delete(self, document, **kwargs) -> None: class IngestRunner(RagRunner): + def get(self, **kwargs) -> list: + base_uri = kwargs.get("base_uri") + return get_documents(base_uri=base_uri) + def __init__(self, config, http_client): self._config = config self._http_client: http.HttpClient = http_client diff --git a/nesis/api/core/document_loaders/stores.py b/nesis/api/core/document_loaders/stores.py index 8d181de..cb3fe0c 100644 --- a/nesis/api/core/document_loaders/stores.py +++ b/nesis/api/core/document_loaders/stores.py @@ -116,11 +116,17 @@ def __init__(self, url, echo=False, pool_size=10): else: raise ex - def get(self, document_id): + def get(self, **kwargs): + document_id = kwargs.get("document_id") + base_uri = kwargs.get("base_uri") with Session(self._engine) as session: - return ( - session.query(self.Store).filter(self.Store.uuid == document_id).first() - ) + query = session.query(self.Store) + if document_id is not None: + query = query.filter(self.Store.uuid == document_id) + if base_uri is not None: + query = query.filter(self.Store.base_uri == base_uri) + + return query.all() def save(self, **kwargs): with Session(self._engine) as session: @@ -144,9 +150,7 @@ def save(self, **kwargs): def delete(self, document_id): with Session(self._engine) as session: store_record = ( - session.query(self.Store) - .filter(self.Store.document_id == document_id) - .first() + session.query(self.Store).filter(self.Store.uuid == document_id).first() ) session.delete(store_record) session.commit() diff --git a/nesis/api/tests/core/document_loaders/test_minio.py b/nesis/api/tests/core/document_loaders/test_minio.py index 8768119..3f09b3f 100644 --- a/nesis/api/tests/core/document_loaders/test_minio.py +++ b/nesis/api/tests/core/document_loaders/test_minio.py @@ -215,3 +215,161 @@ def test_extract_documents( .all() ) assert len(all_documents) == initial_count + 1 + + +@mock.patch("nesis.api.core.document_loaders.minio.Minio") +def test_uningest_documents( + client: mock.MagicMock, cache: mock.MagicMock, session: Session +) -> None: + """ + Test deleting of s3 documents from the rag engine if they have been deleted from the s3 bucket + """ + data = { + "name": "s3 documents", + "engine": "s3", + "connection": { + "endpoint": "http://localhost:4566", + # "user": "test", + # "password": "test", + "region": "us-east-1", + "dataobjects": "some-non-existing-bucket", + }, + } + + datasource = Datasource( + name=data["name"], + connection=data["connection"], + source_type=DatasourceType.MINIO, + status=DatasourceStatus.ONLINE, + ) + + session.add(datasource) + session.commit() + + document = Document( + base_uri="http://localhost:4566", + document_id=str(uuid.uuid4()), + filename="invalid.pdf", + rag_metadata={"data": [{"doc_id": str(uuid.uuid4())}]}, + store_metadata={"bucket_name": "some-bucket", "object_name": "file/path.pdf"}, + last_modified=datetime.datetime.utcnow(), + ) + + session.add(document) + session.commit() + + http_client = mock.MagicMock() + minio_client = mock.MagicMock() + + client.return_value = minio_client + minio_client.stat_object.side_effect = Exception("NoSuchKey - does not exist") + + documents = session.query(Document).all() + assert len(documents) == 1 + + minio_ingestor = minio.MinioProcessor( + config=tests.config, + http_client=http_client, + cache_client=cache, + datasource=datasource, + ) + + # # No document records exist + # document_records = session.query(Document).all() + # assert 0 == len(document_records) + + minio_ingestor.run( + metadata={"datasource": "documents"}, + ) + + _, upload_kwargs = http_client.deletes.call_args_list[0] + urls = upload_kwargs["urls"] + + assert ( + urls[0] + == f"http://localhost:8080/v1/ingest/documents/{document.rag_metadata['data'][0]['doc_id']}" + ) + documents = session.query(Document).all() + assert len(documents) == 0 + + +@mock.patch("nesis.api.core.document_loaders.minio.Minio") +def test_unextract_documents( + client: mock.MagicMock, cache: mock.MagicMock, session: Session +) -> None: + """ + Test deleting of s3 documents from the rag engine if they have been deleted from the s3 bucket + """ + destination_sql_url = tests.config["database"]["url"] + data = { + "name": "s3 documents", + "engine": "s3", + "connection": { + "endpoint": "https://s3.endpoint", + "access_key": "", + "secret_key": "", + "dataobjects": "buckets", + "mode": "extract", + "destination": { + "sql": {"url": destination_sql_url}, + }, + }, + } + datasource = Datasource( + name=data["name"], + connection=data["connection"], + source_type=DatasourceType.MINIO, + status=DatasourceStatus.ONLINE, + ) + + session.add(datasource) + session.commit() + + http_client = mock.MagicMock() + minio_client = mock.MagicMock() + + client.return_value = minio_client + minio_client.stat_object.side_effect = Exception("NoSuchKey - does not exist") + + minio_ingestor = minio.MinioProcessor( + config=tests.config, + http_client=http_client, + cache_client=cache, + datasource=datasource, + ) + + extract_store = SqlDocumentStore( + url=data["connection"]["destination"]["sql"]["url"] + ) + + with Session(extract_store._engine) as session: + session.query(minio_ingestor._extract_runner._extraction_store.Store).delete() + document = minio_ingestor._extract_runner._extraction_store.Store() + document.base_uri = data["connection"]["endpoint"] + document.uuid = str(uuid.uuid4()) + document.filename = "invalid.pdf" + document.extract_metadata = {"data": [{"doc_id": str(uuid.uuid4())}]} + document.store_metadata = { + "bucket_name": "some-bucket", + "object_name": "file/path.pdf", + } + document.last_modified = datetime.datetime.utcnow() + + session.add(document) + session.commit() + + initial_count = len( + session.query(minio_ingestor._extract_runner._extraction_store.Store) + .filter() + .all() + ) + + minio_ingestor.run( + metadata={"datasource": "documents"}, + ) + + with Session(extract_store._engine) as session: + documents = session.query( + minio_ingestor._extract_runner._extraction_store.Store + ).all() + assert len(documents) == initial_count - 1 diff --git a/version.txt b/version.txt index a03a18d..12023bf 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.1.3-rc14 +0.1.3-rc11