diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 888775aed5c..9528ec9974f 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.function.Consumer; import java.util.function.Predicate; @@ -514,6 +515,11 @@ ConditionalTabletMutator requireSame(TabletMetadata tabletMetadata, ColumnType t ConditionalTabletMutator requireAbsentLogs(); + /** + * Require that a tablet contain all the files in the set + */ + ConditionalTabletMutator requireFiles(Set files); + /** *

* Ample provides the following features on top of the conditional writer to help automate diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java index 9978dbbe7ca..da8e050504a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java @@ -45,7 +45,9 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.iterators.SortedFilesIterator; import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletMutator; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily; @@ -330,6 +332,18 @@ public Ample.ConditionalTabletMutator requireAbsentLogs() { return this; } + @Override + public ConditionalTabletMutator requireFiles(Set files) { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + IteratorSetting is = new IteratorSetting(INITIAL_ITERATOR_PRIO, PresentIterator.class); + for (StoredTabletFile file : files) { + Condition c = new Condition(DataFileColumnFamily.STR_NAME, file.getMetadata()) + .setValue(PresentIterator.VALUE).setIterators(is); + mutation.addCondition(c); + } + return this; + } + @Override public void submit(Ample.RejectionHandler rejectionCheck) { Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java index 0daa221f1f6..cb9492d7e30 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java @@ -110,7 +110,7 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId while (canCommitCompaction(ecid, tablet)) { CompactionMetadata ecm = tablet.getExternalCompactions().get(ecid); - // the compacted files should not exists in the tablet already + // the compacted files should not exist in the tablet already var tablet2 = tablet; newDatafile.ifPresent( newFile -> Preconditions.checkState(!tablet2.getFiles().contains(newFile.insert()), @@ -118,7 +118,8 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { var tabletMutator = tabletsMutator.mutateTablet(getExtent()).requireAbsentOperation() - .requireCompaction(ecid).requireSame(tablet, FILES, LOCATION); + .requireCompaction(ecid).requireSame(tablet, LOCATION) + .requireFiles(commitData.getJobFiles()); if (ecm.getKind() == CompactionKind.USER) { tabletMutator.requireSame(tablet, SELECTED, COMPACTED); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java index 23b293c25e9..0e7587d6336 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java @@ -19,7 +19,6 @@ package org.apache.accumulo.manager.compaction.coordinator.commit; import java.io.Serializable; -import java.util.Collection; import java.util.Set; import java.util.stream.Collectors; @@ -56,7 +55,7 @@ public TableId getTableId() { return KeyExtent.fromThrift(textent).tableId(); } - public Collection getJobFiles() { - return inputPaths.stream().map(StoredTabletFile::of).collect(Collectors.toList()); + public Set getJobFiles() { + return inputPaths.stream().map(StoredTabletFile::of).collect(Collectors.toSet()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index a343b94854c..ca0baf7435c 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -1746,4 +1746,55 @@ public void testErrors() { } } } + + @Test + public void testRequiresFiles() { + var context = cluster.getServerContext(); + + var stf1 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf")); + var stf2 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf")); + var stf3 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf")); + var stf4 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/C0000073.rf")); + var dfv = new DataFileValue(100, 100); + + // Add 3 of the files, skip the 4th file + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation().putFile(stf1, dfv).putFile(stf2, dfv) + .putFile(stf3, dfv).submit(tm -> false); + assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus()); + } + assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles()); + + // Test mutation is accepted when given all files + var time1 = MetadataTime.parse("L50"); + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1, stf2, stf3)) + .putTime(time1).submit(tm -> false); + assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus()); + } + assertEquals(time1, context.getAmple().readTablet(e1).getTime()); + + // Test mutation is accepted when a subset of files is given + var time2 = MetadataTime.parse("L60"); + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1, stf3)).putTime(time2) + .submit(tm -> false); + assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus()); + } + assertEquals(time2, context.getAmple().readTablet(e1).getTime()); + + // Test mutation is rejected when a file is given that the tablet does not have + var time3 = MetadataTime.parse("L60"); + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1, stf4)).putTime(time3) + .submit(tm -> false); + assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus()); + } + // Should be previous time still as the mutation was rejected + assertEquals(time2, context.getAmple().readTablet(e1).getTime()); + } }