Skip to content

Commit

Permalink
Update elasticity MergeTablets to use MERGED marker (apache#3975)
Browse files Browse the repository at this point in the history
The MERGED marker is set on the last tablet of the merged range after
metadata has been updated so that it is possible to know whether or not
the files were already fenced. If the marker exists then the fencing for
the last tablet can be skipped as the process was restarted.
  • Loading branch information
cshannon authored Nov 28, 2023
1 parent 51004f6 commit c615abf
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,10 @@ interface TabletUpdates<T> {
* is not empty
*/
T deleteAll(Set<Key> keys);

T setMerged();

T deleteMerged();
}

interface TabletMutator extends TabletUpdates<TabletMutator> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SCANS;
Expand Down Expand Up @@ -273,6 +274,18 @@ public TabletMetadataBuilder deleteAll(Set<Key> keys) {
throw new UnsupportedOperationException();
}

@Override
public TabletMetadataBuilder setMerged() {
fetched.add(MERGED);
internalBuilder.setMerged();
return this;
}

@Override
public TabletMetadataBuilder deleteMerged() {
throw new UnsupportedOperationException();
}

/**
* @param extraFetched Anything that was put on the builder will automatically be added to the
* fetched set. However, for the case where something was not put and it needs to be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
Expand Down Expand Up @@ -329,6 +330,18 @@ public T deleteAll(Set<Key> keys) {
return getThis();
}

@Override
public T setMerged() {
MergedColumnFamily.MERGED_COLUMN.put(mutation, MergedColumnFamily.MERGED_VALUE);
return getThis();
}

@Override
public T deleteMerged() {
MergedColumnFamily.MERGED_COLUMN.putDelete(mutation);
return getThis();
}

public void setCloseAfterMutate(AutoCloseable closeable) {
this.closeAfterMutate = closeable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.HOSTING_REQUESTED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LAST;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -287,13 +288,13 @@ public void testMergedColumn() {
Mutation mutation = TabletColumnFamily.createPrevRowMutation(extent);
MERGED_COLUMN.put(mutation, MERGED_VALUE);
TabletMetadata tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(),
EnumSet.of(ColumnType.MERGED), true, false);
EnumSet.of(MERGED), true, false);
assertTrue(tm.hasMerged());

// Column not set
mutation = TabletColumnFamily.createPrevRowMutation(extent);
tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(),
EnumSet.of(ColumnType.MERGED), true, false);
tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), EnumSet.of(MERGED),
true, false);
assertFalse(tm.hasMerged());

// MERGED Column not fetched
Expand All @@ -309,9 +310,8 @@ public void testUnkownColFamily() {
Mutation mutation = TabletColumnFamily.createPrevRowMutation(extent);

mutation.put("1234567890abcdefg", "xyz", "v1");
assertThrows(IllegalStateException.class,
() -> TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(),
EnumSet.of(ColumnType.MERGED), true, false));
assertThrows(IllegalStateException.class, () -> TabletMetadata
.convertRow(toRowMap(mutation).entrySet().iterator(), EnumSet.of(MERGED), true, false));
}

private SortedMap<Key,Value> toRowMap(Mutation mutation) {
Expand Down Expand Up @@ -353,7 +353,7 @@ public void testBuilder() {
.putLocation(Location.future(ser1)).putFile(sf1, dfv1).putFile(sf2, dfv2)
.putCompactionId(23).putBulkFile(rf1, 25).putBulkFile(rf2, 35).putFlushId(27)
.putDirName("dir1").putScan(sf3).putScan(sf4).putCompacted(17).putCompacted(23)
.build(ECOMP, HOSTING_REQUESTED);
.build(ECOMP, HOSTING_REQUESTED, MERGED);

assertEquals(extent, tm.getExtent());
assertEquals(TabletHostingGoal.NEVER, tm.getHostingGoal());
Expand All @@ -367,6 +367,7 @@ public void testBuilder() {
assertEquals(Set.of(), tm.getExternalCompactions().keySet());
assertEquals(Set.of(17L, 23L), tm.getCompacted());
assertFalse(tm.getHostingRequested());
assertFalse(tm.hasMerged());
assertThrows(IllegalStateException.class, tm::getOperationId);
assertThrows(IllegalStateException.class, tm::getSuspend);
assertThrows(IllegalStateException.class, tm::getTime);
Expand Down Expand Up @@ -402,7 +403,7 @@ public void testBuilder() {

TabletMetadata tm3 = TabletMetadata.builder(extent).putExternalCompaction(ecid1, ecm)
.putSuspension(ser1, 45L).putTime(new MetadataTime(479, TimeType.LOGICAL)).putWal(le1)
.putWal(le2).setHostingRequested().putSelectedFiles(selFiles).build();
.putWal(le2).setHostingRequested().putSelectedFiles(selFiles).setMerged().build();

assertEquals(Set.of(ecid1), tm3.getExternalCompactions().keySet());
assertEquals(Set.of(sf1, sf2), tm3.getExternalCompactions().get(ecid1).getJobFiles());
Expand All @@ -416,6 +417,7 @@ public void testBuilder() {
assertEquals(159L, tm3.getSelectedFiles().getFateTxId());
assertFalse(tm3.getSelectedFiles().initiallySelectedAll());
assertEquals(selFiles.getMetadataValue(), tm3.getSelectedFiles().getMetadataValue());
assertTrue(tm3.hasMerged());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,18 @@ public Repo<Manager> call(long tid, Manager manager) throws Exception {
for (var tabletMeta : tabletsMetadata) {
MergeTablets.validateTablet(tabletMeta, fateStr, opid, data.tableId);

var tabletMutator = tabletsMutator.mutateTablet(tabletMeta.getExtent())
.requireOperation(opid).requireAbsentLocation();

// do not delete the last tablet
if (Objects.equals(tabletMeta.getExtent().endRow(), lastEndRow)) {
// Clear the merged marker after we are finished on the last tablet
tabletMutator.deleteMerged();
tabletMutator.submit((tm) -> !tm.hasMerged());
submitted++;
break;
}

var tabletMutator = tabletsMutator.mutateTablet(tabletMeta.getExtent())
.requireOperation(opid).requireAbsentLocation();

tabletMeta.getKeyValues().keySet().forEach(key -> {
log.trace("{} deleting {}", fateStr, key);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.HOSTING_GOAL;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME;
import static org.apache.accumulo.manager.tableOps.merge.DeleteRows.verifyAccepted;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -56,7 +56,6 @@
import org.apache.accumulo.manager.tableOps.ManagerRepo;
import org.apache.accumulo.server.gc.AllVolumesDirectory;
import org.apache.accumulo.server.tablets.TabletTime;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -90,7 +89,8 @@ public Repo<Manager> call(long tid, Manager manager) throws Exception {

try (var tabletsMetadata = manager.getContext().getAmple().readTablets()
.forTable(range.tableId()).overlapping(range.prevEndRow(), range.endRow())
.fetch(OPID, LOCATION, HOSTING_GOAL, FILES, TIME, DIR, ECOMP, PREV_ROW, LOGS).build()) {
.fetch(OPID, LOCATION, HOSTING_GOAL, FILES, TIME, DIR, ECOMP, PREV_ROW, LOGS, MERGED)
.build()) {

int tabletsSeen = 0;

Expand Down Expand Up @@ -149,10 +149,8 @@ public Repo<Manager> call(long tid, Manager manager) throws Exception {
// Check if the last tablet was already updated, this could happen if a process died and this
// code is running a 2nd time. If running a 2nd time it possible the last tablet was updated and
// only a subset of the other tablets were deleted. If the last tablet was never updated, then
// its prev row should be the greatest.
Comparator<Text> prevRowComparator = Comparator.nullsFirst(Text::compareTo);
if (prevRowComparator.compare(firstTabletMeta.getPrevEndRow(), lastTabletMeta.getPrevEndRow())
< 0) {
// the merged marker should not exist
if (!lastTabletMeta.hasMerged()) {
// update the last tablet
try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) {
var lastExtent = lastTabletMeta.getExtent();
Expand All @@ -177,6 +175,10 @@ public Repo<Manager> call(long tid, Manager manager) throws Exception {
tabletMutator.putHostingGoal(DeleteRows.getMergeHostingGoal(range, goals));
tabletMutator.putPrevEndRow(firstTabletMeta.getPrevEndRow());

// Set merged marker on the last tablet when we are finished
// so we know that we already updated metadata if the process restarts
tabletMutator.setMerged();

// if the tablet no longer exists (because changed prev end row, then the update was
// successful.
tabletMutator.submit(Ample.RejectionHandler.acceptAbsentTablet());
Expand Down

0 comments on commit c615abf

Please sign in to comment.