Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/2.1' into 3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
DomGarguilo committed Jan 30, 2025
2 parents 23810d2 + c3df72e commit e0478b4
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ public Stream<ExternalCompactionFinalState> getExternalCompactionFinalStates() {

scanner.setRange(ExternalCompactionSection.getRange());
int pLen = ExternalCompactionSection.getRowPrefix().length();
return scanner.stream()
return scanner.stream().onClose(scanner::close)
.map(e -> ExternalCompactionFinalState.fromJson(
ExternalCompactionId.of(e.getKey().getRowData().toString().substring(pLen)),
e.getValue().toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,8 @@ private void processPending() {
}

private void notifyTservers() {
try {
Iterator<ExternalCompactionFinalState> finalStates =
context.getAmple().getExternalCompactionFinalStates().iterator();
try (var finalStatesStream = context.getAmple().getExternalCompactionFinalStates()) {
Iterator<ExternalCompactionFinalState> finalStates = finalStatesStream.iterator();
while (finalStates.hasNext()) {
ExternalCompactionFinalState state = finalStates.next();
LOG.debug("Found external compaction in final state: {}, queueing for tserver notification",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
Expand Down Expand Up @@ -106,15 +107,18 @@ private void detectDeadCompactions() {
});

// Determine which compactions are currently committing and remove those
context.getAmple().getExternalCompactionFinalStates()
.map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> {
if (tabletCompactions.remove(ecid) != null) {
log.trace("Removed compaction {} that is committing", ecid);
}
if (this.deadCompactions.remove(ecid) != null) {
log.trace("Removed {} from the dead compaction map, it's committing", ecid);
}
});
try (
var externalCompactionFinalStates = context.getAmple().getExternalCompactionFinalStates()) {
externalCompactionFinalStates.map(ExternalCompactionFinalState::getExternalCompactionId)
.forEach(ecid -> {
if (tabletCompactions.remove(ecid) != null) {
log.trace("Removed compaction {} that is committing", ecid);
}
if (this.deadCompactions.remove(ecid) != null) {
log.trace("Removed {} from the dead compaction map, it's committing", ecid);
}
});
}

tabletCompactions.forEach((ecid, extent) -> {
var count = this.deadCompactions.merge(ecid, 1L, Long::sum);
Expand Down

0 comments on commit e0478b4

Please sign in to comment.