From 03c96163330bb20bfd01280d3ced77922f807c69 Mon Sep 17 00:00:00 2001 From: stroomdev66 Date: Wed, 23 Oct 2024 12:07:07 +0100 Subject: [PATCH] #4550 Fix datasource already in use issue --- .../processor/impl/db/ProcessorDaoImpl.java | 61 ++++++++++--------- .../impl/db/ProcessorFilterDaoImpl.java | 32 +++++----- .../stroom/security/impl/db/UserDaoImpl.java | 4 +- .../20241023_110006_453__4550.md | 24 ++++++++ 4 files changed, 77 insertions(+), 44 deletions(-) create mode 100644 unreleased_changes/20241023_110006_453__4550.md diff --git a/stroom-processor/stroom-processor-impl-db/src/main/java/stroom/processor/impl/db/ProcessorDaoImpl.java b/stroom-processor/stroom-processor-impl-db/src/main/java/stroom/processor/impl/db/ProcessorDaoImpl.java index 981c326ca8d..85fbf0ce23d 100644 --- a/stroom-processor/stroom-processor-impl-db/src/main/java/stroom/processor/impl/db/ProcessorDaoImpl.java +++ b/stroom-processor/stroom-processor-impl-db/src/main/java/stroom/processor/impl/db/ProcessorDaoImpl.java @@ -113,27 +113,27 @@ public Processor create(final Processor processor) { @Override public Processor update(final Processor processor) { return JooqUtil.contextResult( - processorDbConnProvider, - context -> { - final int count = context - .update(PROCESSOR) - .set(PROCESSOR.VERSION, PROCESSOR.VERSION.plus(1)) - .set(PROCESSOR.UPDATE_TIME_MS, processor.getUpdateTimeMs()) - .set(PROCESSOR.UPDATE_USER, processor.getUpdateUser()) - .set(PROCESSOR.ENABLED, processor.isEnabled()) - .set(PROCESSOR.DELETED, processor.isDeleted()) - .where(PROCESSOR.ID.eq(processor.getId())) - .and(PROCESSOR.VERSION.eq(processor.getVersion())) - .execute(); - - if (count == 0) { - throw new DataChangedException("Failed to update processor, " + - "it may have been updated by another user or deleted"); - } - - return fetch(processor.getId()).orElseThrow(() -> - new RuntimeException("Error fetching updated processor")); - }); + processorDbConnProvider, + context -> { + final int count = context + .update(PROCESSOR) + .set(PROCESSOR.VERSION, PROCESSOR.VERSION.plus(1)) + .set(PROCESSOR.UPDATE_TIME_MS, processor.getUpdateTimeMs()) + .set(PROCESSOR.UPDATE_USER, processor.getUpdateUser()) + .set(PROCESSOR.ENABLED, processor.isEnabled()) + .set(PROCESSOR.DELETED, processor.isDeleted()) + .where(PROCESSOR.ID.eq(processor.getId())) + .and(PROCESSOR.VERSION.eq(processor.getVersion())) + .execute(); + + if (count == 0) { + throw new DataChangedException("Failed to update processor, " + + "it may have been updated by another user or deleted"); + } + + return fetch(context, processor.getId()); + }).map(RECORD_TO_PROCESSOR_MAPPER) + .orElseThrow(() -> new RuntimeException("Error fetching updated processor")); } @Override @@ -149,8 +149,7 @@ public int logicalDeleteByProcessorId(final int processorId) { return JooqUtil.transactionResult(processorDbConnProvider, context -> { // Logically delete all the child filters first processorFilterDao.logicalDeleteByProcessorId(processorId, context); - final int count = logicalDeleteByProcessorId(processorId, context); - return count; + return logicalDeleteByProcessorId(processorId, context); }); } catch (final Exception e) { throw new RuntimeException("Error deleting filters and processor for processor id " + processorId, e); @@ -196,7 +195,7 @@ public int physicalDeleteOldProcessors(final Instant deleteThreshold) { totalCount.addAndGet(count); } catch (final DataAccessException e) { if (e.getCause() instanceof final SQLIntegrityConstraintViolationException sqlEx) { - LOGGER.debug("Expected constraint violation exception: " + sqlEx.getMessage(), e); + LOGGER.debug(() -> "Expected constraint violation exception: " + sqlEx.getMessage(), e); } else { throw e; } @@ -208,14 +207,18 @@ public int physicalDeleteOldProcessors(final Instant deleteThreshold) { @Override public Optional fetch(final int id) { - return JooqUtil.contextResult(processorDbConnProvider, context -> context - .select() - .from(PROCESSOR) - .where(PROCESSOR.ID.eq(id)) - .fetchOptional()) + return JooqUtil.contextResult(processorDbConnProvider, context -> fetch(context, id)) .map(RECORD_TO_PROCESSOR_MAPPER); } + private Optional fetch(final DSLContext context, final int id) { + return context + .select() + .from(PROCESSOR) + .where(PROCESSOR.ID.eq(id)) + .fetchOptional(); + } + @Override public Optional fetchByPipelineUuid(final String pipelineUuid) { Objects.requireNonNull(pipelineUuid); diff --git a/stroom-processor/stroom-processor-impl-db/src/main/java/stroom/processor/impl/db/ProcessorFilterDaoImpl.java b/stroom-processor/stroom-processor-impl-db/src/main/java/stroom/processor/impl/db/ProcessorFilterDaoImpl.java index 00a0ba6d76f..b01ae6ed266 100644 --- a/stroom-processor/stroom-processor-impl-db/src/main/java/stroom/processor/impl/db/ProcessorFilterDaoImpl.java +++ b/stroom-processor/stroom-processor-impl-db/src/main/java/stroom/processor/impl/db/ProcessorFilterDaoImpl.java @@ -117,7 +117,7 @@ private ProcessorFilterTracker createTracker(final DSLContext context) { final ProcessorFilterTracker tracker = new ProcessorFilterTracker(); tracker.setVersion(1); tracker.setStatus(ProcessorFilterTrackerStatus.CREATED); - final int id = context + final Integer id = context .insertInto(PROCESSOR_FILTER_TRACKER) .columns( PROCESSOR_FILTER_TRACKER.VERSION, @@ -147,6 +147,7 @@ private ProcessorFilterTracker createTracker(final DSLContext context) { NullSafe.get(tracker.getStatus(), ProcessorFilterTrackerStatus::getPrimitiveValue)) .returning(PROCESSOR_FILTER_TRACKER.ID) .fetchOne(PROCESSOR_FILTER_TRACKER.ID); + Objects.requireNonNull(id); tracker.setId(id); return tracker; } @@ -154,7 +155,7 @@ private ProcessorFilterTracker createTracker(final DSLContext context) { private ProcessorFilter createFilter(final DSLContext context, final ProcessorFilter filter) { filter.setVersion(1); final String data = queryDataXMLSerialiser.serialise(filter.getQueryData()); - final int id = context + final Integer id = context .insertInto(PROCESSOR_FILTER) .columns(PROCESSOR_FILTER.VERSION, PROCESSOR_FILTER.CREATE_TIME_MS, @@ -192,6 +193,7 @@ private ProcessorFilter createFilter(final DSLContext context, final ProcessorFi NullSafe.get(filter.getRunAsUser(), UserRef::getUuid)) .returning(PROCESSOR_FILTER.ID) .fetchOne(PROCESSOR_FILTER.ID); + Objects.requireNonNull(id); filter.setId(id); return filter; } @@ -221,7 +223,7 @@ private ProcessorFilter updateFilter(final DSLContext context, final ProcessorFi "it may have been updated by another user or deleted"); } - return fetch(filter.getId()).orElseThrow(() -> + return fetch(context, filter.getId()).map(this::mapRecord).orElseThrow(() -> new RuntimeException("Error fetching updated processor filter")); } @@ -379,17 +381,19 @@ public Set physicalDeleteOldProcessorFilters(final Instant deleteThresho @Override public Optional fetch(final int id) { - return JooqUtil.contextResult(processorDbConnProvider, context -> - context - .select() - .from(PROCESSOR_FILTER) - .join(PROCESSOR_FILTER_TRACKER) - .on(PROCESSOR_FILTER.FK_PROCESSOR_FILTER_TRACKER_ID.eq(PROCESSOR_FILTER_TRACKER.ID)) - .join(PROCESSOR) - .on(PROCESSOR_FILTER.FK_PROCESSOR_ID.eq(PROCESSOR.ID)) - .where(PROCESSOR_FILTER.ID.eq(id)) - .fetchOptional()) - .map(this::mapRecord); + return JooqUtil.contextResult(processorDbConnProvider, context -> fetch(context, id)).map(this::mapRecord); + } + + private Optional fetch(final DSLContext context, final int id) { + return context + .select() + .from(PROCESSOR_FILTER) + .join(PROCESSOR_FILTER_TRACKER) + .on(PROCESSOR_FILTER.FK_PROCESSOR_FILTER_TRACKER_ID.eq(PROCESSOR_FILTER_TRACKER.ID)) + .join(PROCESSOR) + .on(PROCESSOR_FILTER.FK_PROCESSOR_ID.eq(PROCESSOR.ID)) + .where(PROCESSOR_FILTER.ID.eq(id)) + .fetchOptional(); } @Override diff --git a/stroom-security/stroom-security-impl-db/src/main/java/stroom/security/impl/db/UserDaoImpl.java b/stroom-security/stroom-security-impl-db/src/main/java/stroom/security/impl/db/UserDaoImpl.java index 89dd8241cf4..13055464ff1 100644 --- a/stroom-security/stroom-security-impl-db/src/main/java/stroom/security/impl/db/UserDaoImpl.java +++ b/stroom-security/stroom-security-impl-db/src/main/java/stroom/security/impl/db/UserDaoImpl.java @@ -28,6 +28,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.function.Consumer; @@ -90,7 +91,7 @@ public User create(final User user) { private User create(final DSLContext context, final User user) { user.setVersion(1); user.setUuid(UUID.randomUUID().toString()); - final int id = context + final Integer id = context .insertInto(STROOM_USER) .columns(STROOM_USER.VERSION, STROOM_USER.CREATE_TIME_MS, @@ -116,6 +117,7 @@ private User create(final DSLContext context, final User user) { user.getFullName()) .returning(STROOM_USER.ID) .fetchOne(STROOM_USER.ID); + Objects.requireNonNull(id); user.setId(id); return user; } diff --git a/unreleased_changes/20241023_110006_453__4550.md b/unreleased_changes/20241023_110006_453__4550.md new file mode 100644 index 00000000000..108f0f32662 --- /dev/null +++ b/unreleased_changes/20241023_110006_453__4550.md @@ -0,0 +1,24 @@ +* Issue **#4550** : Fix datasource already in use issue. + + +```sh +# ******************************************************************************** +# Issue title: Datasource already in use +# Issue link: https://github.com/gchq/stroom/issues/4550 +# ******************************************************************************** + +# ONLY the top line will be included as a change entry in the CHANGELOG. +# The entry should be in GitHub flavour markdown and should be written on a SINGLE +# line with no hard breaks. You can have multiple change files for a single GitHub issue. +# The entry should be written in the imperative mood, i.e. 'Fix nasty bug' rather than +# 'Fixed nasty bug'. +# +# Examples of acceptable entries are: +# +# +# * Issue **123** : Fix bug with an associated GitHub issue in this repository +# +# * Issue **namespace/other-repo#456** : Fix bug with an associated GitHub issue in another repository +# +# * Fix bug with no associated GitHub issue. +```