diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java index c74ddfb49ee..b9e86710758 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java @@ -317,7 +317,7 @@ public Stream getExternalCompactionFinalStates() { scanner.setRange(ExternalCompactionSection.getRange()); int pLen = ExternalCompactionSection.getRowPrefix().length(); - return scanner.stream() + return scanner.stream().onClose(scanner::close) .map(e -> ExternalCompactionFinalState.fromJson( ExternalCompactionId.of(e.getKey().getRowData().toString().substring(pLen)), e.getValue().toString())); diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java index 80309ae2620..6f2d19ec083 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java @@ -216,9 +216,8 @@ private void processPending() { } private void notifyTservers() { - try { - Iterator finalStates = - context.getAmple().getExternalCompactionFinalStates().iterator(); + try (var finalStatesStream = context.getAmple().getExternalCompactionFinalStates()) { + Iterator finalStates = finalStatesStream.iterator(); while (finalStates.hasNext()) { ExternalCompactionFinalState state = finalStates.next(); LOG.debug("Found external compaction in final state: {}, queueing for tserver notification", diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java index b58f06a31ee..7ee177179d5 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java @@ -31,6 +31,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; @@ -106,15 +107,18 @@ private void detectDeadCompactions() { }); // Determine which compactions are currently committing and remove those - context.getAmple().getExternalCompactionFinalStates() - .map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> { - if (tabletCompactions.remove(ecid) != null) { - log.trace("Removed compaction {} that is committing", ecid); - } - if (this.deadCompactions.remove(ecid) != null) { - log.trace("Removed {} from the dead compaction map, it's committing", ecid); - } - }); + try ( + var externalCompactionFinalStates = context.getAmple().getExternalCompactionFinalStates()) { + externalCompactionFinalStates.map(ExternalCompactionFinalState::getExternalCompactionId) + .forEach(ecid -> { + if (tabletCompactions.remove(ecid) != null) { + log.trace("Removed compaction {} that is committing", ecid); + } + if (this.deadCompactions.remove(ecid) != null) { + log.trace("Removed {} from the dead compaction map, it's committing", ecid); + } + }); + } tabletCompactions.forEach((ecid, extent) -> { var count = this.deadCompactions.merge(ecid, 1L, Long::sum);