Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#4550 Fix datasource already in use issue #4557

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,27 +113,27 @@ public Processor create(final Processor processor) {
@Override
public Processor update(final Processor processor) {
return JooqUtil.contextResult(
processorDbConnProvider,
context -> {
final int count = context
.update(PROCESSOR)
.set(PROCESSOR.VERSION, PROCESSOR.VERSION.plus(1))
.set(PROCESSOR.UPDATE_TIME_MS, processor.getUpdateTimeMs())
.set(PROCESSOR.UPDATE_USER, processor.getUpdateUser())
.set(PROCESSOR.ENABLED, processor.isEnabled())
.set(PROCESSOR.DELETED, processor.isDeleted())
.where(PROCESSOR.ID.eq(processor.getId()))
.and(PROCESSOR.VERSION.eq(processor.getVersion()))
.execute();

if (count == 0) {
throw new DataChangedException("Failed to update processor, " +
"it may have been updated by another user or deleted");
}

return fetch(processor.getId()).orElseThrow(() ->
new RuntimeException("Error fetching updated processor"));
});
processorDbConnProvider,
context -> {
final int count = context
.update(PROCESSOR)
.set(PROCESSOR.VERSION, PROCESSOR.VERSION.plus(1))
.set(PROCESSOR.UPDATE_TIME_MS, processor.getUpdateTimeMs())
.set(PROCESSOR.UPDATE_USER, processor.getUpdateUser())
.set(PROCESSOR.ENABLED, processor.isEnabled())
.set(PROCESSOR.DELETED, processor.isDeleted())
.where(PROCESSOR.ID.eq(processor.getId()))
.and(PROCESSOR.VERSION.eq(processor.getVersion()))
.execute();

if (count == 0) {
throw new DataChangedException("Failed to update processor, " +
"it may have been updated by another user or deleted");
}

return fetch(context, processor.getId());
}).map(RECORD_TO_PROCESSOR_MAPPER)
.orElseThrow(() -> new RuntimeException("Error fetching updated processor"));
}

@Override
Expand All @@ -149,8 +149,7 @@ public int logicalDeleteByProcessorId(final int processorId) {
return JooqUtil.transactionResult(processorDbConnProvider, context -> {
// Logically delete all the child filters first
processorFilterDao.logicalDeleteByProcessorId(processorId, context);
final int count = logicalDeleteByProcessorId(processorId, context);
return count;
return logicalDeleteByProcessorId(processorId, context);
});
} catch (final Exception e) {
throw new RuntimeException("Error deleting filters and processor for processor id " + processorId, e);
Expand Down Expand Up @@ -196,7 +195,7 @@ public int physicalDeleteOldProcessors(final Instant deleteThreshold) {
totalCount.addAndGet(count);
} catch (final DataAccessException e) {
if (e.getCause() instanceof final SQLIntegrityConstraintViolationException sqlEx) {
LOGGER.debug("Expected constraint violation exception: " + sqlEx.getMessage(), e);
LOGGER.debug(() -> "Expected constraint violation exception: " + sqlEx.getMessage(), e);
} else {
throw e;
}
Expand All @@ -208,14 +207,18 @@ public int physicalDeleteOldProcessors(final Instant deleteThreshold) {

@Override
public Optional<Processor> fetch(final int id) {
return JooqUtil.contextResult(processorDbConnProvider, context -> context
.select()
.from(PROCESSOR)
.where(PROCESSOR.ID.eq(id))
.fetchOptional())
return JooqUtil.contextResult(processorDbConnProvider, context -> fetch(context, id))
.map(RECORD_TO_PROCESSOR_MAPPER);
}

private Optional<Record> fetch(final DSLContext context, final int id) {
return context
.select()
.from(PROCESSOR)
.where(PROCESSOR.ID.eq(id))
.fetchOptional();
}

@Override
public Optional<Processor> fetchByPipelineUuid(final String pipelineUuid) {
Objects.requireNonNull(pipelineUuid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private ProcessorFilterTracker createTracker(final DSLContext context) {
final ProcessorFilterTracker tracker = new ProcessorFilterTracker();
tracker.setVersion(1);
tracker.setStatus(ProcessorFilterTrackerStatus.CREATED);
final int id = context
final Integer id = context
.insertInto(PROCESSOR_FILTER_TRACKER)
.columns(
PROCESSOR_FILTER_TRACKER.VERSION,
Expand Down Expand Up @@ -147,14 +147,15 @@ private ProcessorFilterTracker createTracker(final DSLContext context) {
NullSafe.get(tracker.getStatus(), ProcessorFilterTrackerStatus::getPrimitiveValue))
.returning(PROCESSOR_FILTER_TRACKER.ID)
.fetchOne(PROCESSOR_FILTER_TRACKER.ID);
Objects.requireNonNull(id);
tracker.setId(id);
return tracker;
}

private ProcessorFilter createFilter(final DSLContext context, final ProcessorFilter filter) {
filter.setVersion(1);
final String data = queryDataXMLSerialiser.serialise(filter.getQueryData());
final int id = context
final Integer id = context
.insertInto(PROCESSOR_FILTER)
.columns(PROCESSOR_FILTER.VERSION,
PROCESSOR_FILTER.CREATE_TIME_MS,
Expand Down Expand Up @@ -192,6 +193,7 @@ private ProcessorFilter createFilter(final DSLContext context, final ProcessorFi
NullSafe.get(filter.getRunAsUser(), UserRef::getUuid))
.returning(PROCESSOR_FILTER.ID)
.fetchOne(PROCESSOR_FILTER.ID);
Objects.requireNonNull(id);
filter.setId(id);
return filter;
}
Expand Down Expand Up @@ -221,7 +223,7 @@ private ProcessorFilter updateFilter(final DSLContext context, final ProcessorFi
"it may have been updated by another user or deleted");
}

return fetch(filter.getId()).orElseThrow(() ->
return fetch(context, filter.getId()).map(this::mapRecord).orElseThrow(() ->
new RuntimeException("Error fetching updated processor filter"));
}

Expand Down Expand Up @@ -379,17 +381,19 @@ public Set<String> physicalDeleteOldProcessorFilters(final Instant deleteThresho

@Override
public Optional<ProcessorFilter> fetch(final int id) {
return JooqUtil.contextResult(processorDbConnProvider, context ->
context
.select()
.from(PROCESSOR_FILTER)
.join(PROCESSOR_FILTER_TRACKER)
.on(PROCESSOR_FILTER.FK_PROCESSOR_FILTER_TRACKER_ID.eq(PROCESSOR_FILTER_TRACKER.ID))
.join(PROCESSOR)
.on(PROCESSOR_FILTER.FK_PROCESSOR_ID.eq(PROCESSOR.ID))
.where(PROCESSOR_FILTER.ID.eq(id))
.fetchOptional())
.map(this::mapRecord);
return JooqUtil.contextResult(processorDbConnProvider, context -> fetch(context, id)).map(this::mapRecord);
}

private Optional<Record> fetch(final DSLContext context, final int id) {
return context
.select()
.from(PROCESSOR_FILTER)
.join(PROCESSOR_FILTER_TRACKER)
.on(PROCESSOR_FILTER.FK_PROCESSOR_FILTER_TRACKER_ID.eq(PROCESSOR_FILTER_TRACKER.ID))
.join(PROCESSOR)
.on(PROCESSOR_FILTER.FK_PROCESSOR_ID.eq(PROCESSOR.ID))
.where(PROCESSOR_FILTER.ID.eq(id))
.fetchOptional();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
Expand Down Expand Up @@ -90,7 +91,7 @@ public User create(final User user) {
private User create(final DSLContext context, final User user) {
user.setVersion(1);
user.setUuid(UUID.randomUUID().toString());
final int id = context
final Integer id = context
.insertInto(STROOM_USER)
.columns(STROOM_USER.VERSION,
STROOM_USER.CREATE_TIME_MS,
Expand All @@ -116,6 +117,7 @@ private User create(final DSLContext context, final User user) {
user.getFullName())
.returning(STROOM_USER.ID)
.fetchOne(STROOM_USER.ID);
Objects.requireNonNull(id);
user.setId(id);
return user;
}
Expand Down
24 changes: 24 additions & 0 deletions unreleased_changes/20241023_110006_453__4550.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
* Issue **#4550** : Fix datasource already in use issue.


```sh
# ********************************************************************************
# Issue title: Datasource already in use
# Issue link: https://github.com/gchq/stroom/issues/4550
# ********************************************************************************

# ONLY the top line will be included as a change entry in the CHANGELOG.
# The entry should be in GitHub flavour markdown and should be written on a SINGLE
# line with no hard breaks. You can have multiple change files for a single GitHub issue.
# The entry should be written in the imperative mood, i.e. 'Fix nasty bug' rather than
# 'Fixed nasty bug'.
#
# Examples of acceptable entries are:
#
#
# * Issue **123** : Fix bug with an associated GitHub issue in this repository
#
# * Issue **namespace/other-repo#456** : Fix bug with an associated GitHub issue in another repository
#
# * Fix bug with no associated GitHub issue.
```
Loading