From 56d694318436fe9f8187763354a0e297c5fe3522 Mon Sep 17 00:00:00 2001 From: Hauke Hund Date: Mon, 16 Dec 2024 17:50:30 +0100 Subject: [PATCH] unique draft Task rules, db constraint, duplicate delete query and tests --- .../authorization/TaskAuthorizationRule.java | 119 ++++++++++++--- .../db.constraint_trigger.changelog-1.6.1.xml | 4 + ...te_duplicate_resources.changelog-1.6.1.xml | 18 +++ .../unique_trigger_functions/tasks_unique.sql | 27 ++++ .../ParallelCreateIntegrationTest.java | 139 ++++++++++++++++++ 5 files changed, 289 insertions(+), 18 deletions(-) create mode 100644 dsf-fhir/dsf-fhir-server/src/main/resources/db/unique_trigger_functions/tasks_unique.sql diff --git a/dsf-fhir/dsf-fhir-server/src/main/java/dev/dsf/fhir/authorization/TaskAuthorizationRule.java b/dsf-fhir/dsf-fhir-server/src/main/java/dev/dsf/fhir/authorization/TaskAuthorizationRule.java index 96fb15633..e23b70492 100644 --- a/dsf-fhir/dsf-fhir-server/src/main/java/dev/dsf/fhir/authorization/TaskAuthorizationRule.java +++ b/dsf-fhir/dsf-fhir-server/src/main/java/dev/dsf/fhir/authorization/TaskAuthorizationRule.java @@ -4,6 +4,7 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.function.Predicate; @@ -15,6 +16,7 @@ import org.hl7.fhir.r4.model.ActivityDefinition; import org.hl7.fhir.r4.model.CanonicalType; import org.hl7.fhir.r4.model.Coding; +import org.hl7.fhir.r4.model.Identifier; import org.hl7.fhir.r4.model.Organization; import org.hl7.fhir.r4.model.Resource; import org.hl7.fhir.r4.model.StringType; @@ -34,6 +36,10 @@ import dev.dsf.fhir.dao.TaskDao; import dev.dsf.fhir.dao.provider.DaoProvider; import dev.dsf.fhir.help.ParameterConverter; +import dev.dsf.fhir.search.PageAndCount; +import dev.dsf.fhir.search.PartialResult; +import dev.dsf.fhir.search.SearchQuery; +import dev.dsf.fhir.search.SearchQueryParameterError; import dev.dsf.fhir.service.ReferenceResolver; import dev.dsf.fhir.service.ResourceReference; @@ -88,10 +94,20 @@ public Optional reasonCreateAllowed(Connection connection, Identity iden Optional errors = draftTaskOk(connection, identity, newResource); if (errors.isEmpty()) { - logger.info("Create of Task authorized for local organization identity '{}', Task.status draft", - identity.getName()); + if (!draftTaskExists(connection, newResource)) + { + logger.info( + "Create of Task authorized for local organization identity '{}', Task.status draft", + identity.getName()); + + return Optional.of("Local identity, Task.status draft"); + } + else + { + logger.warn("Create of Task unauthorized, unique resource already exists"); - return Optional.of("Local identity, Task.status draft"); + return Optional.empty(); + } } else { @@ -284,9 +300,16 @@ private Optional draftTaskOk(Connection connection, Identity identity, T { List errors = new ArrayList<>(); - if (newResource.getIdentifier().stream().noneMatch(i -> NAMING_SYSTEM_TASK_IDENTIFIER.equals(i.getSystem()))) + if (newResource.getIdentifier().stream().filter(i -> NAMING_SYSTEM_TASK_IDENTIFIER.equals(i.getSystem())) + .count() != 1) + { + errors.add("Task.identifier[" + NAMING_SYSTEM_TASK_IDENTIFIER + "] not defined or more than once"); + } + else if (newResource.getIdentifier().stream().filter(i -> NAMING_SYSTEM_TASK_IDENTIFIER.equals(i.getSystem())) + .findFirst().filter(Identifier::hasValueElement).map(Identifier::getValueElement) + .filter(StringType::hasValue).isEmpty()) { - errors.add("Task.identifier[" + NAMING_SYSTEM_TASK_IDENTIFIER + "] missing"); + errors.add("Task.identifier[" + NAMING_SYSTEM_TASK_IDENTIFIER + "] value not defined"); } if (newResource.hasRequester()) @@ -399,6 +422,49 @@ private Stream getMessageNames(Task newResource) .filter(s -> !s.isBlank()); } + /** + * A draft {@link Task} with identifier (system {@link #NAMING_SYSTEM_TASK_IDENTIFIER}) may not exist + * + * @param connection + * not null + * @param newResource + * not null + * @return true if the given draft Task is unique + */ + private boolean draftTaskExists(Connection connection, Task newResource) + { + SearchQuery query = getDao().createSearchQueryWithoutUserFilter(PageAndCount.exists()) + .configureParameters(Map.of("identifier", + List.of(NAMING_SYSTEM_TASK_IDENTIFIER + "|" + getDraftTaskIdentifierValue(newResource)))); + + List uQp = query.getUnsupportedQueryParameters(); + if (!uQp.isEmpty()) + { + logger.warn("Unable to search for Task: Unsupported query parameters: {}", uQp); + + throw new IllegalStateException("Unable to search for Task: Unsupported query parameters"); + } + + try + { + PartialResult result = getDao().searchWithTransaction(connection, query); + return result.getTotal() >= 1; + } + catch (SQLException e) + { + logger.debug("Unable to search for Task", e); + logger.warn("Unable to search for Task: {} - {}", e.getClass().getName(), e.getMessage()); + + throw new RuntimeException("Unable to search for Task", e); + } + } + + private String getDraftTaskIdentifierValue(Task newResource) + { + return newResource.getIdentifier().stream().filter(i -> NAMING_SYSTEM_TASK_IDENTIFIER.equals(i.getSystem())) + .findFirst().map(Identifier::getValue).get(); + } + private boolean taskAllowedForRequesterAndRecipient(Connection connection, Identity requester, Task newResource) { Optional recipientOpt = organizationProvider.getLocalOrganizationAsIdentity(); @@ -549,24 +615,24 @@ public Optional reasonReadAllowed(Connection connection, Identity identi if (identity.hasDsfRole(FhirServerRole.READ)) { - if (isCurrentIdentityPartOfReferencedOrganization(connection, identity, "Task.requester", - existingResource.getRequester())) + if (identity.isLocalIdentity() && isCurrentIdentityPartOfReferencedOrganization(connection, identity, + "Task.restriction.recipient", existingResource.getRestriction().getRecipientFirstRep())) { logger.info( - "Read of Task/{}/_history/{} authorized for identity '{}', Task.requester reference could be resolved and current identity part of referenced organization", + "Read of Task/{}/_history/{} authorized for identity '{}', Task.restriction.recipient reference could be resolved and current identity part of referenced organization", resourceId, resourceVersion, identity.getName()); - return Optional.of("Task.requester resolved and identity part of referenced organization"); + return Optional + .of("Task.restriction.recipient resolved and local identity part of referenced organization"); } - else if (identity.isLocalIdentity() && isCurrentIdentityPartOfReferencedOrganization(connection, identity, - "Task.restriction.recipient", existingResource.getRestriction().getRecipientFirstRep())) + else if (isCurrentIdentityPartOfReferencedOrganization(connection, identity, "Task.requester", + existingResource.getRequester())) { logger.info( - "Read of Task/{}/_history/{} authorized for identity '{}', Task.restriction.recipient reference could be resolved and current identity part of referenced organization", + "Read of Task/{}/_history/{} authorized for identity '{}', Task.requester reference could be resolved and current identity part of referenced organization", resourceId, resourceVersion, identity.getName()); - return Optional - .of("Task.restriction.recipient resolved and local identity part of referenced organization"); + return Optional.of("Task.requester resolved and identity part of referenced organization"); } else { @@ -605,11 +671,23 @@ public Optional reasonUpdateAllowed(Connection connection, Identity iden Optional errors = draftTaskOk(connection, identity, newResource); if (errors.isEmpty()) { - logger.info("Update of Task/{}/_history/{} ({} -> {}) authorized for local identity '{}'", - oldResourceId, oldResourceVersion, TaskStatus.DRAFT.toCode(), TaskStatus.DRAFT.toCode(), - identity.getName()); + if (draftTaskIdentifierSame(oldResource, newResource)) + { + logger.info("Update of Task/{}/_history/{} ({} -> {}) authorized for local identity '{}'", + oldResourceId, oldResourceVersion, TaskStatus.DRAFT.toCode(), + TaskStatus.DRAFT.toCode(), identity.getName()); - return Optional.of("Local identity, old Task.status draft, new Task.status draft"); + return Optional.of("Local identity, old Task.status draft, new Task.status draft"); + } + else + { + logger.warn( + "Update of Task/{}/_history/{} ({} -> {}) unauthorized for identity '{}' - identifier modified", + oldResourceId, oldResourceVersion, TaskStatus.DRAFT.toCode(), + TaskStatus.DRAFT.toCode(), identity.getName()); + + return Optional.empty(); + } } else { @@ -790,6 +868,11 @@ else if (TaskStatus.INPROGRESS.equals(oldResource.getStatus()) } } + private boolean draftTaskIdentifierSame(Task oldResource, Task newResource) + { + return Objects.equals(getDraftTaskIdentifierValue(oldResource), getDraftTaskIdentifierValue(newResource)); + } + private Optional reasonNotSame(Task oldResource, Task newResource) { List errors = new ArrayList<>(); diff --git a/dsf-fhir/dsf-fhir-server/src/main/resources/db/db.constraint_trigger.changelog-1.6.1.xml b/dsf-fhir/dsf-fhir-server/src/main/resources/db/db.constraint_trigger.changelog-1.6.1.xml index e96e21fe4..0ba9ae4eb 100644 --- a/dsf-fhir/dsf-fhir-server/src/main/resources/db/db.constraint_trigger.changelog-1.6.1.xml +++ b/dsf-fhir/dsf-fhir-server/src/main/resources/db/db.constraint_trigger.changelog-1.6.1.xml @@ -30,6 +30,9 @@ + + + @@ -44,6 +47,7 @@ CREATE CONSTRAINT TRIGGER organization_affiliations_unique AFTER INSERT ON organization_affiliations FOR EACH ROW EXECUTE PROCEDURE organization_affiliations_unique(); CREATE CONSTRAINT TRIGGER structure_definitions_unique AFTER INSERT ON structure_definitions FOR EACH ROW EXECUTE PROCEDURE structure_definitions_unique(); CREATE CONSTRAINT TRIGGER subscriptions_unique AFTER INSERT ON subscriptions FOR EACH ROW EXECUTE PROCEDURE subscriptions_unique(); + CREATE CONSTRAINT TRIGGER tasks_unique AFTER INSERT ON tasks FOR EACH ROW EXECUTE PROCEDURE tasks_unique(); CREATE CONSTRAINT TRIGGER value_sets_unique AFTER INSERT ON value_sets FOR EACH ROW EXECUTE PROCEDURE value_sets_unique(); diff --git a/dsf-fhir/dsf-fhir-server/src/main/resources/db/db.delete_duplicate_resources.changelog-1.6.1.xml b/dsf-fhir/dsf-fhir-server/src/main/resources/db/db.delete_duplicate_resources.changelog-1.6.1.xml index 75f575e74..6a15cf8a6 100644 --- a/dsf-fhir/dsf-fhir-server/src/main/resources/db/db.delete_duplicate_resources.changelog-1.6.1.xml +++ b/dsf-fhir/dsf-fhir-server/src/main/resources/db/db.delete_duplicate_resources.changelog-1.6.1.xml @@ -235,6 +235,24 @@ ) + + + DELETE FROM tasks + WHERE task->>'status' = 'draft' + AND jsonb_path_exists(task, '$.identifier[*] ? (@.system == "http://dsf.dev/sid/task-identifier").value') + AND task_id IN ( + SELECT task_id FROM ( + SELECT + row_number() OVER ( + PARTITION BY jsonb_path_query_array(task, '$.identifier[*] ? (@.system == "http://dsf.dev/sid/task-identifier").value') + ORDER BY jsonb_path_query_array(task, '$.identifier[*] ? (@.system == "http://dsf.dev/sid/task-identifier").value'), (task->'meta'->>'lastUpdated')::timestamp DESC + ) AS rn + , task_id + FROM current_tasks + ) AS t WHERE rn > 1 + ) + + DELETE FROM value_sets diff --git a/dsf-fhir/dsf-fhir-server/src/main/resources/db/unique_trigger_functions/tasks_unique.sql b/dsf-fhir/dsf-fhir-server/src/main/resources/db/unique_trigger_functions/tasks_unique.sql new file mode 100644 index 000000000..564ebd92a --- /dev/null +++ b/dsf-fhir/dsf-fhir-server/src/main/resources/db/unique_trigger_functions/tasks_unique.sql @@ -0,0 +1,27 @@ +CREATE OR REPLACE FUNCTION tasks_unique() RETURNS TRIGGER AS $$ +BEGIN + IF NEW.task->>'status' = 'draft' THEN + PERFORM pg_advisory_xact_lock(hashtext(jsonb_path_query_array(NEW.task, '$.identifier[*] ? (@.system == "http://dsf.dev/sid/task-identifier").value')::text)); + IF EXISTS (SELECT 1 FROM current_tasks WHERE task_id <> NEW.task_id + AND (( + jsonb_path_exists(NEW.task, '$.identifier[*] ? (@.system == "http://dsf.dev/sid/task-identifier").value') + AND + jsonb_path_query_array(task, '$.identifier[*] ? (@.system == "http://dsf.dev/sid/task-identifier").value') @> + jsonb_path_query_array(NEW.task, '$.identifier[*] ? (@.system == "http://dsf.dev/sid/task-identifier").value') + ) OR ( + jsonb_path_exists(task, '$.identifier[*] ? (@.system == "http://dsf.dev/sid/task-identifier").value') + AND + jsonb_path_query_array(NEW.task, '$.identifier[*] ? (@.system == "http://dsf.dev/sid/task-identifier").value') @> + jsonb_path_query_array(task, '$.identifier[*] ? (@.system == "http://dsf.dev/sid/task-identifier").value') + ) + )) THEN + RAISE EXCEPTION 'Conflict: Not inserting Task with identifier.value %, resource already exists with given identifier.value', + jsonb_path_query_array(NEW.task, '$.identifier[*] ? (@.system == "http://dsf.dev/sid/task-identifier").value') USING ERRCODE = 'unique_violation'; + ELSE + RETURN NEW; + END IF; + ELSE + RETURN NEW; + END IF; +END; +$$ LANGUAGE PLPGSQL \ No newline at end of file diff --git a/dsf-fhir/dsf-fhir-server/src/test/java/dev/dsf/fhir/integration/ParallelCreateIntegrationTest.java b/dsf-fhir/dsf-fhir-server/src/test/java/dev/dsf/fhir/integration/ParallelCreateIntegrationTest.java index bcf347cbb..0574bfedb 100644 --- a/dsf-fhir/dsf-fhir-server/src/test/java/dev/dsf/fhir/integration/ParallelCreateIntegrationTest.java +++ b/dsf-fhir/dsf-fhir-server/src/test/java/dev/dsf/fhir/integration/ParallelCreateIntegrationTest.java @@ -48,6 +48,9 @@ import org.hl7.fhir.r4.model.Subscription.SubscriptionChannelComponent; import org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType; import org.hl7.fhir.r4.model.Subscription.SubscriptionStatus; +import org.hl7.fhir.r4.model.Task; +import org.hl7.fhir.r4.model.Task.TaskIntent; +import org.hl7.fhir.r4.model.Task.TaskStatus; import org.hl7.fhir.r4.model.ValueSet; import org.junit.Test; import org.slf4j.Logger; @@ -63,6 +66,7 @@ import dev.dsf.fhir.dao.OrganizationDao; import dev.dsf.fhir.dao.ResourceDao; import dev.dsf.fhir.dao.SubscriptionDao; +import dev.dsf.fhir.dao.TaskDao; import dev.dsf.fhir.dao.ValueSetDao; import dev.dsf.fhir.dao.jdbc.StructureDefinitionDaoJdbc; import jakarta.ws.rs.WebApplicationException; @@ -93,6 +97,10 @@ public class ParallelCreateIntegrationTest extends AbstractIntegrationTest private static final SubscriptionChannelType SUBSCRIPTION_CHANNEL_TYPE = SubscriptionChannelType.WEBSOCKET; private static final String SUBSCRIPTION_CHANNEL_PAYLOAD = "application/fhir+json"; + private static final String NAMING_SYSTEM_TASK_IDENTIFIER = "http://dsf.dev/sid/task-identifier"; + private static final String TASK_IDENTIFIER_VALUE = ACTIVITY_DEFINITION_URL + "/" + ACTIVITY_DEFINITION_VERSION + + "/test"; + private static final String VALUE_SET_URL = "http://test.com/fhir/ValueSet/test"; private static final String VALUE_SET_VERSION = "test-version"; @@ -318,6 +326,30 @@ public void testCreateDuplicateSubscriptionsViaBatchBundle() throws Exception checkReturnBatchBundle(getWebserviceClient().postBundle(bundle)); } + @Test + public void testCreateDuplicateTasksViaTransactionBundle() throws Exception + { + ActivityDefinitionDao activityDefinitionDao = getSpringWebApplicationContext() + .getBean(ActivityDefinitionDao.class); + activityDefinitionDao.create(createActivityDefinition()); + + Bundle bundle = createBundle(BundleType.TRANSACTION, createTask(), null, 2); + + expectForbidden(() -> getWebserviceClient().postBundle(bundle)); + } + + @Test + public void testCreateDuplicateTasksViaBatchBundle() throws Exception + { + ActivityDefinitionDao activityDefinitionDao = getSpringWebApplicationContext() + .getBean(ActivityDefinitionDao.class); + activityDefinitionDao.create(createActivityDefinition()); + + Bundle bundle = createBundle(BundleType.BATCH, createTask(), null, 2); + + checkReturnBatchBundle(getWebserviceClient().postBundle(bundle)); + } + @Test public void testCreateDuplicateValueSetsViaTransactionBundle() throws Exception { @@ -574,6 +606,34 @@ public void testCreateDuplicateSubscriptionsViaBatchBundleWithIfNoneExists() thr testCreateDuplicatesViaBundleWithIfNoneExists(bundle, BundleType.BATCHRESPONSE); } + @Test + public void testCreateDuplicateTasksViaTransactionBundleWithIfNoneExists() throws Exception + { + ActivityDefinitionDao activityDefinitionDao = getSpringWebApplicationContext() + .getBean(ActivityDefinitionDao.class); + activityDefinitionDao.create(createActivityDefinition()); + + Bundle bundle = createBundle(BundleType.TRANSACTION, createTask(), + (t, r) -> r.setIfNoneExist("identifier=" + NAMING_SYSTEM_TASK_IDENTIFIER + "|" + TASK_IDENTIFIER_VALUE), + 2); + + testCreateDuplicatesViaBundleWithIfNoneExists(bundle, BundleType.TRANSACTIONRESPONSE); + } + + @Test + public void testCreateDuplicateTasksViaBatchBundleWithIfNoneExists() throws Exception + { + ActivityDefinitionDao activityDefinitionDao = getSpringWebApplicationContext() + .getBean(ActivityDefinitionDao.class); + activityDefinitionDao.create(createActivityDefinition()); + + Bundle bundle = createBundle(BundleType.BATCH, createTask(), + (t, r) -> r.setIfNoneExist("identifier=" + NAMING_SYSTEM_TASK_IDENTIFIER + "|" + TASK_IDENTIFIER_VALUE), + 2); + + testCreateDuplicatesViaBundleWithIfNoneExists(bundle, BundleType.BATCHRESPONSE); + } + @Test public void testCreateDuplicateValueSetsViaTransactionBundleWithIfNoneExists() throws Exception { @@ -777,6 +837,23 @@ public void testCreateDuplicateSubscriptionsParallelDirect() throws Exception && SUBSCRIPTION_CHANNEL_PAYLOAD.equals(s.getChannel().getPayload())); } + @Test + public void testCreateDuplicateTasksParallelDirect() throws Exception + { + ActivityDefinitionDao activityDefinitionDao = getSpringWebApplicationContext() + .getBean(ActivityDefinitionDao.class); + activityDefinitionDao.create(createActivityDefinition()); + + testCreateDuplicatesParallel(() -> + { + Task returnT = getWebserviceClient().create(createTask()); + assertNotNull(returnT); + }, TaskDao.class, + t -> TASK_IDENTIFIER_VALUE.equals( + t.getIdentifier().stream().filter(i -> NAMING_SYSTEM_TASK_IDENTIFIER.equals(i.getSystem())) + .findFirst().map(Identifier::getValue).get())); + } + @Test public void testCreateDuplicateValueSetsParallelDirect() throws Exception { @@ -1193,6 +1270,50 @@ public void testCreateDuplicateSubscriptionsParallelBatchBundle() throws Excepti && SUBSCRIPTION_CHANNEL_PAYLOAD.equals(s.getChannel().getPayload())); } + @Test + public void testCreateDuplicateTasksParallelTransactionBundle() throws Exception + { + ActivityDefinitionDao activityDefinitionDao = getSpringWebApplicationContext() + .getBean(ActivityDefinitionDao.class); + activityDefinitionDao.create(createActivityDefinition()); + + testCreateDuplicatesParallel(() -> + { + Bundle returnBundle = getWebserviceClient() + .postBundle(createBundle(BundleType.TRANSACTION, createTask(), null, 1)); + assertNotNull(returnBundle); + }, TaskDao.class, + t -> TASK_IDENTIFIER_VALUE.equals( + t.getIdentifier().stream().filter(i -> NAMING_SYSTEM_TASK_IDENTIFIER.equals(i.getSystem())) + .findFirst().map(Identifier::getValue).get())); + } + + @Test + public void testCreateDuplicateTasksParallelBatchBundle() throws Exception + { + ActivityDefinitionDao activityDefinitionDao = getSpringWebApplicationContext() + .getBean(ActivityDefinitionDao.class); + activityDefinitionDao.create(createActivityDefinition()); + + testCreateDuplicatesParallel(() -> + { + Bundle returnBundle = getWebserviceClient() + .postBundle(createBundle(BundleType.BATCH, createTask(), null, 1)); + assertNotNull(returnBundle); + + assertNotNull(returnBundle.getEntry()); + assertEquals(1, returnBundle.getEntry().size()); + assertNotNull(returnBundle.getEntry().get(0).getResponse()); + assertNotNull(returnBundle.getEntry().get(0).getResponse().getStatus()); + + if ("403 Forbidden".equals(returnBundle.getEntry().get(0).getResponse().getStatus())) + throw new WebApplicationException(403); + }, TaskDao.class, + t -> TASK_IDENTIFIER_VALUE.equals( + t.getIdentifier().stream().filter(i -> NAMING_SYSTEM_TASK_IDENTIFIER.equals(i.getSystem())) + .findFirst().map(Identifier::getValue).get())); + } + @Test public void testCreateDuplicateValueSetsParallelTransactionBundle() throws Exception { @@ -1464,6 +1585,24 @@ private Subscription createSubscription() return s; } + private Task createTask() + { + Task t = new Task(); + t.setStatus(TaskStatus.DRAFT); + t.setIntent(TaskIntent.ORDER); + t.setAuthoredOn(new Date()); + t.addIdentifier().setSystem(NAMING_SYSTEM_TASK_IDENTIFIER).setValue(TASK_IDENTIFIER_VALUE); + t.setInstantiatesCanonical(ACTIVITY_DEFINITION_URL + "|" + ACTIVITY_DEFINITION_VERSION); + t.getRequester().setType("Organization").getIdentifier().setSystem("http://dsf.dev/sid/organization-identifier") + .setValue("Test_Organization"); + t.getRestriction().getRecipientFirstRep().setType("Organization").getIdentifier() + .setSystem("http://dsf.dev/sid/organization-identifier").setValue("Test_Organization"); + t.getInputFirstRep().setValue(new StringType("test")).getType().getCodingFirstRep() + .setSystem("http://dsf.dev/fhir/CodeSystem/bpmn-message").setCode("message-name"); + + return t; + } + private ValueSet createValueSet() { ValueSet vS = new ValueSet().setUrl(VALUE_SET_URL).setVersion(VALUE_SET_VERSION)