Skip to content

Commit

Permalink
Recreated compaction coordinator unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Jan 24, 2024
1 parent e468fd3 commit 9e25f56
Show file tree
Hide file tree
Showing 4 changed files with 383 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,8 @@ public enum Property {
"The number of threads used to inspect tablets files to find split points.", "4.0.0"),

MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE("manager.compaction.major.service.queue.size",
// ELASTICITY_TODO: It might be good to note that there is a priority queue per compactor
// resource group
"10000", PropertyType.COUNT, "The max size of the priority queue.", "4.0"),
// properties that are specific to scan server behavior
@Experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -940,8 +940,7 @@ public void run() {
// Start the Manager's Fate Service
fateServiceHandler = new FateServiceHandler(this);
managerClientHandler = new ManagerClientServiceHandler(this);
compactionCoordinator =
new CompactionCoordinator(context, tserverSet, security, nextEvent, fateRefs);
compactionCoordinator = new CompactionCoordinator(context, security, fateRefs);
// Start the Manager's Client service
// Ensure that calls before the manager gets the lock fail
ManagerClientService.Iface haProxy =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
import org.apache.accumulo.core.tabletserver.thrift.InputFile;
import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind;
Expand All @@ -100,7 +101,6 @@
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.core.volume.Volume;
import org.apache.accumulo.manager.EventCoordinator;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.compaction.coordinator.commit.CommitCompaction;
import org.apache.accumulo.manager.compaction.coordinator.commit.CompactionCommitData;
Expand All @@ -109,7 +109,6 @@
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.compaction.CompactionConfigStorage;
import org.apache.accumulo.server.compaction.CompactionPluginUtils;
import org.apache.accumulo.server.manager.LiveTServerSet;
import org.apache.accumulo.server.security.SecurityOperation;
import org.apache.accumulo.server.tablets.TabletNameGenerator;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -144,37 +143,34 @@ public class CompactionCoordinator
* is the most authoritative source of what external compactions are currently running, but it
* does not have the stats that this map has.
*/
protected static final Map<ExternalCompactionId,RunningCompaction> RUNNING_CACHE =
protected final Map<ExternalCompactionId,RunningCompaction> RUNNING_CACHE =
new ConcurrentHashMap<>();

/* Map of group name to last time compactor called to get a compaction job */
// ELASTICITY_TODO need to clean out groups that are no longer configured..
private static final Map<String,Long> TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>();
private final Map<CompactorGroupId,Long> TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>();

private final ServerContext ctx;
private final SecurityOperation security;
private final CompactionJobQueues jobQueues;
private final EventCoordinator eventCoordinator;
private final AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances;
// Exposed for tests
protected volatile Boolean shutdown = false;

private final ScheduledThreadPoolExecutor schedExecutor;

private final Cache<ExternalCompactionId,RunningCompaction> completed;
private LoadingCache<Long,CompactionConfig> compactionConfigCache;
private final Cache<Path,Integer> checked_tablet_dir_cache;
private final LoadingCache<Long,CompactionConfig> compactionConfigCache;
private final Cache<Path,Integer> tabletDirCache;
private final DeadCompactionDetector deadCompactionDetector;

private final QueueMetrics queueMetrics;

public CompactionCoordinator(ServerContext ctx, LiveTServerSet tservers,
SecurityOperation security, EventCoordinator eventCoordinator,
public CompactionCoordinator(ServerContext ctx, SecurityOperation security,
AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances) {
this.ctx = ctx;
this.schedExecutor = this.ctx.getScheduledExecutor();
this.security = security;
this.eventCoordinator = eventCoordinator;

this.jobQueues = new CompactionJobQueues(
ctx.getConfiguration().getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE));
Expand All @@ -200,9 +196,8 @@ public CompactionCoordinator(ServerContext ctx, LiveTServerSet tservers,
return path.toUri().toString().length();
};

checked_tablet_dir_cache =
ctx.getCaches().createNewBuilder(CacheName.COMPACTION_DIR_CACHE, true)
.maximumWeight(10485760L).weigher(weigher).build();
tabletDirCache = ctx.getCaches().createNewBuilder(CacheName.COMPACTION_DIR_CACHE, true)
.maximumWeight(10485760L).weigher(weigher).build();

deadCompactionDetector = new DeadCompactionDetector(this.ctx, this, schedExecutor);
// At this point the manager does not have its lock so no actions should be taken yet
Expand Down Expand Up @@ -270,7 +265,7 @@ public void run() {
// tservers. Its no longer doing that. May be best to remove the loop and make the remaining
// task a scheduled one.

LOG.info("Starting loop to check tservers for compaction summaries");
LOG.info("Starting loop to check for compactors not checking in");
while (!shutdown) {
long start = System.currentTimeMillis();

Expand Down Expand Up @@ -330,13 +325,13 @@ public TExternalCompactionJob getCompactionJob(TInfo tinfo, TCredentials credent
throw new AccumuloSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED).asThriftException();
}
final String group = groupName.intern();
LOG.trace("getCompactionJob called for group {} by compactor {}", group, compactorAddress);
TIME_COMPACTOR_LAST_CHECKED.put(group, System.currentTimeMillis());
CompactorGroupId groupId = CompactorGroupIdImpl.groupId(groupName);
LOG.trace("getCompactionJob called for group {} by compactor {}", groupId, compactorAddress);
TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis());

TExternalCompactionJob result = null;

CompactionJobQueues.MetaJob metaJob = jobQueues.poll(CompactorGroupIdImpl.groupId(groupName));
CompactionJobQueues.MetaJob metaJob = jobQueues.poll(groupId);

while (metaJob != null) {

Expand All @@ -361,23 +356,24 @@ public TExternalCompactionJob getCompactionJob(TInfo tinfo, TCredentials credent
// It is possible that by the time this added that the the compactor that made this request
// is dead. In this cases the compaction is not actually running.
RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()),
new RunningCompaction(result, compactorAddress, group));
new RunningCompaction(result, compactorAddress, groupName));
LOG.debug("Returning external job {} to {} with {} files", result.externalCompactionId,
compactorAddress, ecm.getJobFiles().size());
break;
} else {
LOG.debug("Unable to reserve compaction job for {}, pulling another off the queue ",
metaJob.getTabletMetadata().getExtent());
LOG.debug(
"Unable to reserve compaction job for {}, pulling another off the queue for group {}",
metaJob.getTabletMetadata().getExtent(), groupName);
metaJob = jobQueues.poll(CompactorGroupIdImpl.groupId(groupName));
}
}

if (metaJob == null) {
LOG.debug("No jobs found in group {} ", group);
LOG.debug("No jobs found in group {} ", groupName);
}

if (result == null) {
LOG.trace("No jobs found for group {}, returning empty job to compactor {}", group,
LOG.trace("No jobs found for group {}, returning empty job to compactor {}", groupName,
compactorAddress);
result = new TExternalCompactionJob();
}
Expand Down Expand Up @@ -433,7 +429,7 @@ private boolean canReserveCompaction(TabletMetadata tablet, CompactionJob job,

private void checkTabletDir(KeyExtent extent, Path path) {
try {
if (checked_tablet_dir_cache.getIfPresent(path) == null) {
if (tabletDirCache.getIfPresent(path) == null) {
FileStatus[] files = null;
try {
files = ctx.getVolumeManager().listStatus(path);
Expand All @@ -446,14 +442,14 @@ private void checkTabletDir(KeyExtent extent, Path path) {

ctx.getVolumeManager().mkdirs(path);
}
checked_tablet_dir_cache.put(path, 1);
tabletDirCache.put(path, 1);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private CompactionMetadata createExternalCompactionMetadata(CompactionJob job,
protected CompactionMetadata createExternalCompactionMetadata(CompactionJob job,
Set<StoredTabletFile> jobFiles, TabletMetadata tablet, String compactorAddress,
ExternalCompactionId externalCompactionId) {
boolean propDels;
Expand Down Expand Up @@ -487,7 +483,7 @@ private CompactionMetadata createExternalCompactionMetadata(CompactionJob job,

}

private CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob metaJob,
protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob metaJob,
String compactorAddress, ExternalCompactionId externalCompactionId) {

Preconditions.checkArgument(metaJob.getJob().getKind() == CompactionKind.SYSTEM
Expand Down Expand Up @@ -542,8 +538,9 @@ private CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob metaJob
return null;
}

TExternalCompactionJob createThriftJob(String externalCompactionId, CompactionMetadata ecm,
CompactionJobQueues.MetaJob metaJob, Optional<CompactionConfig> compactionConfig) {
protected TExternalCompactionJob createThriftJob(String externalCompactionId,
CompactionMetadata ecm, CompactionJobQueues.MetaJob metaJob,
Optional<CompactionConfig> compactionConfig) {

Set<CompactableFile> selectedFiles;
if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
Expand Down Expand Up @@ -850,7 +847,7 @@ protected Set<ExternalCompactionId> readExternalCompactionIds() {
* The RUNNING_CACHE set may contain external compactions that are not actually running. This
* method periodically cleans those up.
*/
protected void cleanUpRunning() {
public void cleanUpRunning() {

// grab a snapshot of the ids in the set before reading the metadata table. This is done to
// avoid removing things that are added while reading the metadata.
Expand Down Expand Up @@ -946,6 +943,11 @@ public void cancel(TInfo tinfo, TCredentials credentials, String externalCompact
cancelCompactionOnCompactor(runningCompaction.getCompactorAddress(), externalCompactionId);
}

/* Method exists to be called from test */
public CompactionJobQueues getJobQueues() {
return jobQueues;
}

/* Method exists to be overridden in test to hide static method */
protected List<RunningCompaction> getCompactionsRunningOnCompactors() {
return ExternalCompactionUtil.getCompactionsRunningOnCompactors(this.ctx);
Expand Down
Loading

0 comments on commit 9e25f56

Please sign in to comment.