Skip to content

Commit

Permalink
Close scanner in AmpleImpl.getExternalCompactionFinalStates() (apache…
Browse files Browse the repository at this point in the history
…#5283)

* AmpleImpl.getExternalCompactionFinalStates() creates a Scanner and returns a Stream off of that Scanner. This makes sure that the Scanner is closed once the Stream is closed and makes sure those Streams are closed in the calling code
  • Loading branch information
DomGarguilo authored Jan 30, 2025
1 parent 7679f0c commit c3df72e
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ public Stream<ExternalCompactionFinalState> getExternalCompactionFinalStates() {

scanner.setRange(ExternalCompactionSection.getRange());
int pLen = ExternalCompactionSection.getRowPrefix().length();
return scanner.stream()
return scanner.stream().onClose(scanner::close)
.map(e -> ExternalCompactionFinalState.fromJson(
ExternalCompactionId.of(e.getKey().getRowData().toString().substring(pLen)),
e.getValue().toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,8 @@ private void processPending() {
}

private void notifyTservers() {
try {
Iterator<ExternalCompactionFinalState> finalStates =
context.getAmple().getExternalCompactionFinalStates().iterator();
try (var finalStatesStream = context.getAmple().getExternalCompactionFinalStates()) {
Iterator<ExternalCompactionFinalState> finalStates = finalStatesStream.iterator();
while (finalStates.hasNext()) {
ExternalCompactionFinalState state = finalStates.next();
LOG.debug("Found external compaction in final state: {}, queueing for tserver notification",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
Expand Down Expand Up @@ -106,15 +107,18 @@ private void detectDeadCompactions() {
});

// Determine which compactions are currently committing and remove those
context.getAmple().getExternalCompactionFinalStates()
.map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> {
if (tabletCompactions.remove(ecid) != null) {
log.trace("Removed compaction {} that is committing", ecid);
}
if (this.deadCompactions.remove(ecid) != null) {
log.trace("Removed {} from the dead compaction map, it's committing", ecid);
}
});
try (
var externalCompactionFinalStates = context.getAmple().getExternalCompactionFinalStates()) {
externalCompactionFinalStates.map(ExternalCompactionFinalState::getExternalCompactionId)
.forEach(ecid -> {
if (tabletCompactions.remove(ecid) != null) {
log.trace("Removed compaction {} that is committing", ecid);
}
if (this.deadCompactions.remove(ecid) != null) {
log.trace("Removed {} from the dead compaction map, it's committing", ecid);
}
});
}

tabletCompactions.forEach((ecid, extent) -> {
var count = this.deadCompactions.merge(ecid, 1L, Long::sum);
Expand Down

0 comments on commit c3df72e

Please sign in to comment.