diff --git a/api/src/constants/lookup_constants.py b/api/src/constants/lookup_constants.py index f413d6427..32e3f44e3 100644 --- a/api/src/constants/lookup_constants.py +++ b/api/src/constants/lookup_constants.py @@ -130,3 +130,9 @@ class OpportunityAttachmentType(StrEnum): class ExternalUserType(StrEnum): LOGIN_GOV = "login_gov" + + +class JobStatus(StrEnum): + STARTED = "started" + COMPLETED = "completed" + FAILED = "failed" diff --git a/api/src/db/migrations/versions/2025_01_16_rename_tables_and_create_job_table.py b/api/src/db/migrations/versions/2025_01_16_rename_tables_and_create_job_table.py new file mode 100644 index 000000000..f2b8d449e --- /dev/null +++ b/api/src/db/migrations/versions/2025_01_16_rename_tables_and_create_job_table.py @@ -0,0 +1,225 @@ +"""Rename tables and create job table + +Revision ID: dc04ce955a9a +Revises: 99bb8e01ad38 +Create Date: 2025-01-16 18:34:48.013913 + +""" + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql +from sqlalchemy.sql import text + +# revision identifiers, used by Alembic. +revision = "dc04ce955a9a" +down_revision = "fe052c05c757" +branch_labels = None +depends_on = None + + +create_trigger_function = """ +CREATE OR REPLACE FUNCTION update_opportunity_search_queue() +RETURNS TRIGGER AS $$ +DECLARE + opp_id bigint; +BEGIN + -- Determine the opportunity_id based on the table + CASE TG_TABLE_NAME + WHEN 'link_opportunity_summary_funding_instrument' THEN + opp_id := (SELECT opportunity_id FROM api.opportunity_summary WHERE opportunity_summary_id = NEW.opportunity_summary_id); + WHEN 'link_opportunity_summary_funding_category' THEN + opp_id := (SELECT opportunity_id FROM api.opportunity_summary WHERE opportunity_summary_id = NEW.opportunity_summary_id); + WHEN 'link_opportunity_summary_applicant_type' THEN + opp_id := (SELECT opportunity_id FROM api.opportunity_summary WHERE opportunity_summary_id = NEW.opportunity_summary_id); + WHEN 'opportunity_summary' THEN + opp_id := NEW.opportunity_id; + WHEN 'current_opportunity_summary' THEN + opp_id := NEW.opportunity_id; + ELSE + opp_id := NEW.opportunity_id; + END CASE; + + INSERT INTO api.opportunity_change_audit (opportunity_id) + VALUES (opp_id) + ON CONFLICT (opportunity_id) + DO UPDATE SET updated_at = CURRENT_TIMESTAMP; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; +""" + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "job_log", + sa.Column("job_id", sa.UUID(), nullable=False), + sa.Column("job_type", sa.Text(), nullable=False), + sa.Column( + "job_status", + sa.Text(), + nullable=False, + ), + sa.Column( + "created_at", + sa.TIMESTAMP(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.Column( + "updated_at", + sa.TIMESTAMP(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.PrimaryKeyConstraint("job_id", name=op.f("job_pkey")), + schema="api", + ) + op.create_table( + "opportunity_change_audit", + sa.Column("opportunity_id", sa.BigInteger(), nullable=False), + sa.Column( + "created_at", + sa.TIMESTAMP(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.Column( + "updated_at", + sa.TIMESTAMP(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.ForeignKeyConstraint( + ["opportunity_id"], + ["api.opportunity.opportunity_id"], + name=op.f("opportunity_change_audit_opportunity_id_opportunity_fkey"), + ), + sa.PrimaryKeyConstraint("opportunity_id", name=op.f("opportunity_change_audit_pkey")), + schema="api", + ) + op.create_index( + op.f("opportunity_change_audit_opportunity_id_idx"), + "opportunity_change_audit", + ["opportunity_id"], + unique=False, + schema="api", + ) + + op.execute(create_trigger_function) + + # Insert all existing opportunities into the audit table + op.execute( + text( + """ + INSERT INTO api.opportunity_change_audit (opportunity_id, created_at, updated_at) + SELECT + opportunity_id, + CURRENT_TIMESTAMP as created_at, + CURRENT_TIMESTAMP as updated_at + FROM api.opportunity + ON CONFLICT (opportunity_id) DO NOTHING + """ + ) + ) + + op.drop_index( + "opportunity_search_index_queue_opportunity_id_idx", + table_name="opportunity_search_index_queue", + schema="api", + ) + op.drop_table("opportunity_search_index_queue", schema="api") + + op.create_table( + "lk_job_status", + sa.Column("job_status_id", sa.Integer(), nullable=False), + sa.Column("description", sa.Text(), nullable=False), + sa.Column( + "created_at", + sa.TIMESTAMP(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.Column( + "updated_at", + sa.TIMESTAMP(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.PrimaryKeyConstraint("job_status_id", name=op.f("lk_job_status_pkey")), + schema="api", + ) + op.add_column("job_log", sa.Column("job_status_id", sa.Integer(), nullable=False), schema="api") + op.add_column( + "job_log", + sa.Column("metrics", postgresql.JSONB(astext_type=sa.Text()), nullable=True), + schema="api", + ) + op.create_foreign_key( + op.f("job_log_job_status_id_lk_job_status_fkey"), + "job_log", + "lk_job_status", + ["job_status_id"], + ["job_status_id"], + source_schema="api", + referent_schema="api", + ) + op.drop_column("job_log", "job_status", schema="api") + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column( + "job_log", + sa.Column("job_status", sa.TEXT(), autoincrement=False, nullable=False), + schema="api", + ) + op.drop_constraint( + op.f("job_job_status_id_lk_job_status_fkey"), "job_log", schema="api", type_="foreignkey" + ) + op.drop_column("job_log", "metrics", schema="api") + op.drop_column("job_log", "job_status_id", schema="api") + op.drop_table("lk_job_status", schema="api") + op.create_table( + "opportunity_search_index_queue", + sa.Column("opportunity_id", sa.BIGINT(), autoincrement=False, nullable=False), + sa.Column( + "created_at", + postgresql.TIMESTAMP(timezone=True), + server_default=sa.text("now()"), + autoincrement=False, + nullable=False, + ), + sa.Column( + "updated_at", + postgresql.TIMESTAMP(timezone=True), + server_default=sa.text("now()"), + autoincrement=False, + nullable=False, + ), + sa.ForeignKeyConstraint( + ["opportunity_id"], + ["api.opportunity.opportunity_id"], + name="opportunity_search_index_queue_opportunity_id_opportunity_fkey", + ), + sa.PrimaryKeyConstraint("opportunity_id", name="opportunity_search_index_queue_pkey"), + schema="api", + ) + op.create_index( + "opportunity_search_index_queue_opportunity_id_idx", + "opportunity_search_index_queue", + ["opportunity_id"], + unique=False, + schema="api", + ) + op.drop_index( + op.f("opportunity_change_audit_opportunity_id_idx"), + table_name="opportunity_change_audit", + schema="api", + ) + op.drop_table("opportunity_change_audit", schema="api") + op.drop_table("job_log", schema="api") + # ### end Alembic commands ### diff --git a/api/src/db/models/__init__.py b/api/src/db/models/__init__.py index 83ad80a9d..23d00f704 100644 --- a/api/src/db/models/__init__.py +++ b/api/src/db/models/__init__.py @@ -1,6 +1,14 @@ import logging -from . import agency_models, base, extract_models, lookup_models, opportunity_models, user_models +from . import ( + agency_models, + base, + extract_models, + lookup_models, + opportunity_models, + task_models, + user_models, +) logger = logging.getLogger(__name__) @@ -15,4 +23,5 @@ "agency_models", "user_models", "extract_models", + "task_models", ] diff --git a/api/src/db/models/lookup_models.py b/api/src/db/models/lookup_models.py index b6d9ca3a4..76cb3bf08 100644 --- a/api/src/db/models/lookup_models.py +++ b/api/src/db/models/lookup_models.py @@ -8,6 +8,7 @@ ExtractType, FundingCategory, FundingInstrument, + JobStatus, OpportunityAttachmentType, OpportunityCategory, OpportunityStatus, @@ -116,6 +117,14 @@ ] ) +JOB_STATUS_CONFIG = LookupConfig( + [ + LookupStr(JobStatus.STARTED, 1), + LookupStr(JobStatus.COMPLETED, 2), + LookupStr(JobStatus.FAILED, 3), + ] +) + EXTERNAL_USER_TYPE_CONFIG = LookupConfig([LookupStr(ExternalUserType.LOGIN_GOV, 1)]) EXTRACT_TYPE_CONFIG = LookupConfig( @@ -266,3 +275,15 @@ def from_lookup(cls, lookup: Lookup) -> "LkExtractType": return LkExtractType( extract_type_id=lookup.lookup_val, description=lookup.get_description() ) + + +@LookupRegistry.register_lookup(JOB_STATUS_CONFIG) +class LkJobStatus(LookupTable, TimestampMixin): + __tablename__ = "lk_job_status" + + job_status_id: Mapped[int] = mapped_column(primary_key=True) + description: Mapped[str] + + @classmethod + def from_lookup(cls, lookup: Lookup) -> "LkJobStatus": + return LkJobStatus(job_status_id=lookup.lookup_val, description=lookup.get_description()) diff --git a/api/src/db/models/opportunity_models.py b/api/src/db/models/opportunity_models.py index 4e8b944fd..3bd6036e4 100644 --- a/api/src/db/models/opportunity_models.py +++ b/api/src/db/models/opportunity_models.py @@ -69,7 +69,7 @@ def agency(self) -> str | None: back_populates="opportunity", uselist=True, cascade="all, delete-orphan" ) - opportunity_search_index_queue: Mapped["OpportunitySearchIndexQueue | None"] = relationship( + opportunity_change_audit: Mapped["OpportunityChangeAudit | None"] = relationship( back_populates="opportunity", single_parent=True, cascade="all, delete-orphan" ) @@ -452,8 +452,8 @@ class OpportunityAttachment(ApiSchemaTable, TimestampMixin): legacy_folder_id: Mapped[int | None] = mapped_column(BigInteger) -class OpportunitySearchIndexQueue(ApiSchemaTable, TimestampMixin): - __tablename__ = "opportunity_search_index_queue" +class OpportunityChangeAudit(ApiSchemaTable, TimestampMixin): + __tablename__ = "opportunity_change_audit" opportunity_id: Mapped[int] = mapped_column( BigInteger, ForeignKey(Opportunity.opportunity_id), primary_key=True, index=True diff --git a/api/src/db/models/task_models.py b/api/src/db/models/task_models.py new file mode 100644 index 000000000..1abecebe8 --- /dev/null +++ b/api/src/db/models/task_models.py @@ -0,0 +1,22 @@ +import uuid + +from sqlalchemy import ForeignKey +from sqlalchemy.dialects.postgresql import JSONB, UUID +from sqlalchemy.orm import Mapped, mapped_column + +from src.adapters.db.type_decorators.postgres_type_decorators import LookupColumn +from src.db.models.base import ApiSchemaTable, TimestampMixin +from src.db.models.lookup_models import JobStatus, LkJobStatus + + +class JobLog(ApiSchemaTable, TimestampMixin): + __tablename__ = "job_log" + + job_id: Mapped[uuid.UUID] = mapped_column(UUID, primary_key=True, default=uuid.uuid4) + job_type: Mapped[str] + job_status: Mapped[JobStatus] = mapped_column( + "job_status_id", + LookupColumn(LkJobStatus), + ForeignKey(LkJobStatus.job_status_id), + ) + metrics: Mapped[dict | None] = mapped_column(JSONB) diff --git a/api/src/search/backend/load_opportunities_to_index.py b/api/src/search/backend/load_opportunities_to_index.py index 5b0e5ba80..d91941c91 100644 --- a/api/src/search/backend/load_opportunities_to_index.py +++ b/api/src/search/backend/load_opportunities_to_index.py @@ -6,7 +6,7 @@ from opensearchpy.exceptions import ConnectionTimeout, TransportError from pydantic import Field from pydantic_settings import SettingsConfigDict -from sqlalchemy import delete, select +from sqlalchemy import select, update from sqlalchemy.orm import noload, selectinload from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed @@ -14,14 +14,16 @@ import src.adapters.search as search from src.api.opportunities_v1.opportunity_schemas import OpportunityV1Schema from src.db.models.agency_models import Agency +from src.db.models.lookup_models import JobStatus from src.db.models.opportunity_models import ( CurrentOpportunitySummary, Opportunity, OpportunityAttachment, - OpportunitySearchIndexQueue, + OpportunityChangeAudit, ) +from src.db.models.task_models import JobLog from src.task.task import Task -from src.util import file_util +from src.util import datetime_util, file_util from src.util.datetime_util import get_now_us_eastern_datetime from src.util.env_config import PydanticBaseEnvConfig @@ -121,22 +123,35 @@ def incremental_updates_and_deletes(self) -> None: def _handle_incremental_upserts(self, existing_opportunity_ids: set[int]) -> None: """Handle updates/inserts of opportunities into the search index when running incrementally""" + # Get last successful job timestamp + last_successful_job = ( + self.db_session.query(JobLog) + .filter( + JobLog.job_type == self.cls_name(), + JobLog.job_status == JobStatus.COMPLETED, + ) + .order_by(JobLog.created_at.desc()) + .first() + ) + # Fetch opportunities that need processing from the queue - queued_opportunities = ( - self.db_session.execute( - select(Opportunity) - .join(OpportunitySearchIndexQueue) - .join(CurrentOpportunitySummary) - .where( - Opportunity.is_draft.is_(False), - CurrentOpportunitySummary.opportunity_status.isnot(None), - ) - .options(selectinload("*"), noload(Opportunity.all_opportunity_summaries)) + query = ( + select(Opportunity) + .join(OpportunityChangeAudit) + .join(CurrentOpportunitySummary) + .where( + Opportunity.is_draft.is_(False), + CurrentOpportunitySummary.opportunity_status.isnot(None), ) - .scalars() - .all() + .options(selectinload("*"), noload(Opportunity.all_opportunity_summaries)) ) + # Add timestamp filter + if last_successful_job: + query = query.where(OpportunityChangeAudit.updated_at > last_successful_job.created_at) + + queued_opportunities = self.db_session.execute(query).scalars().all() + # Process updates and inserts processed_opportunity_ids = set() opportunities_to_index = [] @@ -163,12 +178,12 @@ def _handle_incremental_upserts(self, existing_opportunity_ids: set[int]) -> Non loaded_ids = self.load_records(opportunities_to_index) logger.info(f"Indexed {len(loaded_ids)} opportunities") - # Clear processed / skipped entries from the queue - self.db_session.execute( - delete(OpportunitySearchIndexQueue).where( - OpportunitySearchIndexQueue.opportunity_id.in_(processed_opportunity_ids) + # Update updated_at timestamp instead of deleting records + self.db_session.execute( + update(OpportunityChangeAudit) + .where(OpportunityChangeAudit.opportunity_id.in_(processed_opportunity_ids)) + .values(updated_at=datetime_util.utcnow()) ) - ) def _handle_incremental_delete(self, existing_opportunity_ids: set[int]) -> None: """Handle deletion of opportunities when running incrementally diff --git a/api/src/task/task.py b/api/src/task/task.py index f1619d0ac..679af0202 100644 --- a/api/src/task/task.py +++ b/api/src/task/task.py @@ -5,6 +5,7 @@ from typing import Any import src.adapters.db as db +from src.db.models.task_models import JobLog, JobStatus logger = logging.getLogger(__name__) @@ -28,12 +29,20 @@ class Metrics(StrEnum): def __init__(self, db_session: db.Session) -> None: self.db_session = db_session self.metrics: dict[str, Any] = {} + self.job: JobLog | None = None def run(self) -> None: + job_succeeded = True + try: logger.info("Starting %s", self.cls_name()) start = time.perf_counter() + # Create initial job record + self.job = JobLog(job_type=self.cls_name(), job_status=JobStatus.STARTED) + self.db_session.add(self.job) + self.db_session.commit() + # Initialize the metrics self.initialize_metrics() @@ -47,8 +56,17 @@ def run(self) -> None: logger.info("Completed %s in %s seconds", self.cls_name(), duration, extra=self.metrics) except Exception: - logger.exception("Failed to run task %s", self.cls_name()) + job_succeeded = False raise + finally: + job_status = JobStatus.COMPLETED if job_succeeded else JobStatus.FAILED + # If the session is active, we can commit the job update + if job_succeeded: + self.update_job(job_status, metrics=self.metrics) + else: + # If the session is not active due to an error upstream, we need to begin a new transaction + with self.db_session.begin(): + self.update_job(job_status, metrics=self.metrics) def initialize_metrics(self) -> None: zero_metrics_dict: dict[str, Any] = {metric: 0 for metric in self.Metrics} @@ -70,6 +88,14 @@ def increment(self, name: str, value: int = 1, prefix: str | None = None) -> Non def cls_name(self) -> str: return self.__class__.__name__ + def update_job(self, job_status: JobStatus, metrics: dict[str, Any] | None = None) -> None: + if self.job is None: + raise ValueError("Job is not initialized") + + self.job.job_status = job_status + self.job.metrics = self.metrics + self.db_session.commit() + @abc.abstractmethod def run_task(self) -> None: """Override to define the task logic""" diff --git a/api/tests/src/db/models/factories.py b/api/tests/src/db/models/factories.py index b707bcd44..c6bca6156 100644 --- a/api/tests/src/db/models/factories.py +++ b/api/tests/src/db/models/factories.py @@ -1949,9 +1949,9 @@ def create_tgroups_agency( return groups -class OpportunitySearchIndexQueueFactory(BaseFactory): +class OpportunityChangeAuditFactory(BaseFactory): class Meta: - model = opportunity_models.OpportunitySearchIndexQueue + model = opportunity_models.OpportunityChangeAudit opportunity = factory.SubFactory(OpportunityFactory) opportunity_id = factory.LazyAttribute(lambda s: s.opportunity.opportunity_id) diff --git a/api/tests/src/search/backend/test_load_opportunities_to_index.py b/api/tests/src/search/backend/test_load_opportunities_to_index.py index 4f2310a54..ac3be474a 100644 --- a/api/tests/src/search/backend/test_load_opportunities_to_index.py +++ b/api/tests/src/search/backend/test_load_opportunities_to_index.py @@ -1,8 +1,9 @@ import itertools import pytest +from sqlalchemy import select -from src.db.models.opportunity_models import OpportunitySearchIndexQueue +from src.db.models.opportunity_models import OpportunityChangeAudit from src.search.backend.load_opportunities_to_index import ( LoadOpportunitiesToIndex, LoadOpportunitiesToIndexConfig, @@ -13,8 +14,8 @@ from tests.src.db.models.factories import ( AgencyFactory, OpportunityAttachmentFactory, + OpportunityChangeAuditFactory, OpportunityFactory, - OpportunitySearchIndexQueueFactory, ) @@ -81,7 +82,7 @@ def test_load_opportunities_to_index( ) for opportunity in opportunities: - OpportunitySearchIndexQueueFactory.create( + OpportunityChangeAuditFactory.create( opportunity=opportunity, ) @@ -253,9 +254,7 @@ def test_load_opportunities_to_index( ) for opportunity in itertools.chain(opportunities, test_opps): - OpportunitySearchIndexQueueFactory.create( - opportunity=opportunity, - ) + OpportunityChangeAuditFactory.create(opportunity=opportunity, updated_at=None) load_opportunities_to_index.run() @@ -286,11 +285,6 @@ def test_load_opportunities_to_index( for opportunity in opportunities_now_with_test_agency: opportunity.agency_code = "MY-TEST-AGENCY-123" - for opportunity in opportunities: - OpportunitySearchIndexQueueFactory.create( - opportunity=opportunity, - ) - db_session.commit() db_session.expunge_all() load_opportunities_to_index.run() @@ -314,17 +308,33 @@ def test_load_opportunities_to_index_index_does_not_exist(self, db_session, sear with pytest.raises(RuntimeError, match="please run the full refresh job"): load_opportunities_to_index.run() - def test_new_opportunity_gets_indexed(self, db_session, load_opportunities_to_index): + def test_new_opportunity_gets_indexed( + self, + db_session, + load_opportunities_to_index, + ): """Test that a new opportunity in the queue gets indexed""" - test_opportunity = OpportunityFactory.create(opportunity_attachments=[]) + test_opportunity = OpportunityFactory.create( + opportunity_attachments=[], + is_draft=False, + ) # Add to queue - OpportunitySearchIndexQueueFactory.create(opportunity=test_opportunity) + OpportunityChangeAuditFactory.create(opportunity=test_opportunity, updated_at=None) load_opportunities_to_index.run() # Verify queue was cleared - remaining_queue = db_session.query(OpportunitySearchIndexQueue).all() + remaining_queue = ( + db_session.execute( + select(OpportunityChangeAudit).where( + OpportunityChangeAudit.opportunity_id == test_opportunity.opportunity_id, + OpportunityChangeAudit.updated_at.is_(None), + ) + ) + .scalars() + .all() + ) assert len(remaining_queue) == 0 def test_draft_opportunity_not_indexed(self, db_session, load_opportunities_to_index): @@ -332,10 +342,18 @@ def test_draft_opportunity_not_indexed(self, db_session, load_opportunities_to_i test_opportunity = OpportunityFactory.create(is_draft=True, opportunity_attachments=[]) # Add to queue - OpportunitySearchIndexQueueFactory.create(opportunity=test_opportunity) - - load_opportunities_to_index.run() + OpportunityChangeAuditFactory.create(opportunity=test_opportunity, updated_at=None) + now = get_now_us_eastern_datetime() # Verify queue was not cleared - remaining_queue = db_session.query(OpportunitySearchIndexQueue).all() + remaining_queue = ( + db_session.execute( + select(OpportunityChangeAudit).where( + OpportunityChangeAudit.opportunity_id == test_opportunity.opportunity_id, + OpportunityChangeAudit.updated_at <= now, + ) + ) + .scalars() + .all() + ) assert len(remaining_queue) == 1 diff --git a/api/tests/src/task/test_task.py b/api/tests/src/task/test_task.py new file mode 100644 index 000000000..85251a5eb --- /dev/null +++ b/api/tests/src/task/test_task.py @@ -0,0 +1,69 @@ +import pytest +from sqlalchemy.exc import InvalidRequestError + +from src.db.models.task_models import JobStatus +from src.task.task import Task + + +class SimpleTask(Task): + """Test implementation of Task""" + + def run_task(self) -> None: + pass + + +class FailingTask(Task): + """Test implementation that fails during run_task""" + + def run_task(self) -> None: + raise ValueError("Task failed") + + +class DBFailingTask(Task): + """Test implementation that fails during DB operation""" + + def run_task(self) -> None: + # Simulate DB operation failing + raise InvalidRequestError("DB Error", None, None) + + +def test_task_handles_general_error(db_session): + """Test that task properly handles non-DB errors and rolls back session""" + task = FailingTask(db_session) + + with pytest.raises(ValueError): + task.run() + + # Verify job was created and updated to failed status + assert task.job is not None + assert task.job.job_status == JobStatus.FAILED + + # Verify session is still usable + db_session.begin() # Start a new transaction + assert db_session.is_active # Session should be active with new transaction + + +def test_task_handles_db_error(db_session): + """Test that task properly handles DB errors""" + task = DBFailingTask(db_session) + + with pytest.raises(InvalidRequestError): + task.run() + + # Verify session was rolled back and is usable + db_session.begin() # Start a new transaction + assert db_session.is_active # Session should be active with new transaction + + +def test_successful_task_completion(db_session): + """Test that task completes successfully and updates job status""" + task = SimpleTask(db_session) + task.run() + + assert task.job is not None + assert task.job.job_status == JobStatus.COMPLETED + assert "task_duration_sec" in task.metrics + + # Verify session is still usable by starting a new transaction + db_session.begin() # Start a new transaction + assert db_session.is_active # Session should be active with new transaction diff --git a/documentation/api/database/erds/api-schema.png b/documentation/api/database/erds/api-schema.png index ea81146c5..fec3f5188 100644 Binary files a/documentation/api/database/erds/api-schema.png and b/documentation/api/database/erds/api-schema.png differ