Skip to content

Commit

Permalink
Handle concurrent requests to prevent multiple mirroring executions
Browse files Browse the repository at this point in the history
  • Loading branch information
ikhoon committed Oct 2, 2024
1 parent 20690ea commit ee8ccc7
Show file tree
Hide file tree
Showing 17 changed files with 364 additions and 85 deletions.
21 changes: 21 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"dependencies": {
"cronstrue": "^2.50.0"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -81,6 +82,7 @@
import com.linecorp.centraldogma.common.Entry;
import com.linecorp.centraldogma.common.EntryType;
import com.linecorp.centraldogma.common.Markup;
import com.linecorp.centraldogma.common.RedundantChangeException;
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.centraldogma.internal.Jackson;
import com.linecorp.centraldogma.internal.Util;
Expand Down Expand Up @@ -255,13 +257,7 @@ MirrorResult mirrorRemoteToLocal(
final String mirrorStatePath = localPath() + MIRROR_STATE_FILE_NAME;
final Revision localRev = localRepo().normalizeNow(Revision.HEAD);
if (!needsFetch(headBranchRef, mirrorStatePath, localRev)) {
final String abbrId = headBranchRef.getObjectId().abbreviate(OBJECT_ID_ABBREV_STRING_LENGTH).name();
final String message = String.format("Repository '%s/%s' already at %s, %s#%s",
localRepo().parent().name(), localRepo().name(), abbrId,
remoteRepoUri(), remoteBranch());
// The local repository is up-to date.
logger.debug(message);
return newMirrorResult(MirrorStatus.UP_TO_DATE, message);
return newMirrorResultForUpToDate(headBranchRef);
}

// Update the head commit ID again because there's a chance a commit is pushed between the
Expand Down Expand Up @@ -357,11 +353,28 @@ MirrorResult mirrorRemoteToLocal(
}
});

final CommitResult commitResult = executor.execute(Command.push(
MIRROR_AUTHOR, localRepo().parent().name(), localRepo().name(),
Revision.HEAD, summary, detail, Markup.PLAINTEXT, changes.values())).join();
final String description = summary + ", Revision: " + commitResult.revision();
return newMirrorResult(MirrorStatus.SUCCESS, description);
try {
final CommitResult commitResult = executor.execute(Command.push(
MIRROR_AUTHOR, localRepo().parent().name(), localRepo().name(),
Revision.HEAD, summary, detail, Markup.PLAINTEXT, changes.values())).join();
final String description = summary + ", Revision: " + commitResult.revision();
return newMirrorResult(MirrorStatus.SUCCESS, description);
} catch (CompletionException e) {
if (e.getCause() instanceof RedundantChangeException) {
return newMirrorResultForUpToDate(headBranchRef);
}
throw e;
}
}

private MirrorResult newMirrorResultForUpToDate(Ref headBranchRef) {
final String abbrId = headBranchRef.getObjectId().abbreviate(OBJECT_ID_ABBREV_STRING_LENGTH).name();
final String message = String.format("Repository '%s/%s' already at %s, %s#%s",
localRepo().parent().name(), localRepo().name(), abbrId,
remoteRepoUri(), remoteBranch());
// The local repository is up-to date.
logger.debug(message);
return newMirrorResult(MirrorStatus.UP_TO_DATE, message);
}

private boolean needsFetch(Ref headBranchRef, String mirrorStatePath, Revision localRev)
Expand All @@ -377,7 +390,8 @@ private boolean needsFetch(Ref headBranchRef, String mirrorStatePath, Revision l

final ObjectId headCommitId = headBranchRef.getObjectId();
if (headCommitId.name().equals(localSourceRevision)) {
return false;
// TODO(ikhoon): Revert
return true;
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@

import io.micrometer.core.instrument.simple.SimpleMeterRegistry;

class DefaultMirroringServiceTest {
class MirrorSchedulingServiceTest {

@TempDir
static File temporaryFolder;
Expand Down Expand Up @@ -92,7 +92,7 @@ protected MirrorResult mirrorRemoteToLocal(File workDir, CommandExecutor executo

when(mr.mirrors()).thenReturn(CompletableFuture.completedFuture(ImmutableList.of(mirror)));

final DefaultMirroringService service = new DefaultMirroringService(
final MirrorSchedulingService service = new MirrorSchedulingService(
temporaryFolder, pm, new SimpleMeterRegistry(), 1, 1, 1);
final CommandExecutor executor = mock(CommandExecutor.class);
service.start(executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
import com.linecorp.centraldogma.server.internal.api.auth.RequiresRoleDecorator.RequiresRoleDecoratorFactory;
import com.linecorp.centraldogma.server.internal.api.converter.HttpApiRequestConverter;
import com.linecorp.centraldogma.server.internal.mirror.DefaultMirroringServicePlugin;
import com.linecorp.centraldogma.server.internal.mirror.MirrorRunner;
import com.linecorp.centraldogma.server.internal.replication.ZooKeeperCommandExecutor;
import com.linecorp.centraldogma.server.internal.storage.project.DefaultProjectManager;
import com.linecorp.centraldogma.server.internal.storage.project.ProjectApiManager;
Expand Down Expand Up @@ -250,6 +251,8 @@ public static CentralDogma forConfig(File configFile) throws IOException {
private ServerStatusManager statusManager;
@Nullable
private InternalProjectInitializer projectInitializer;
@Nullable
private volatile MirrorRunner mirrorRunner;

CentralDogma(CentralDogmaConfig cfg, MeterRegistry meterRegistry) {
this.cfg = requireNonNull(cfg, "cfg");
Expand Down Expand Up @@ -438,7 +441,7 @@ private void doStart() throws Exception {
this.server = server;
this.sessionManager = sessionManager;
} else {
doStop(server, executor, pm, repositoryWorker, purgeWorker, sessionManager);
doStop(server, executor, pm, repositoryWorker, purgeWorker, sessionManager, mirrorRunner);
}
}
}
Expand Down Expand Up @@ -813,8 +816,9 @@ private void configureHttpApi(ServerBuilder sb,
.annotatedService(new RepositoryServiceV1(executor, mds));

if (GIT_MIRROR_ENABLED) {
mirrorRunner = new MirrorRunner(projectApiManager, executor, cfg, meterRegistry);
apiV1ServiceBuilder.annotatedService(
new MirroringServiceV1(projectApiManager, executor, cfg.dataDir()))
new MirroringServiceV1(projectApiManager, executor, mirrorRunner))
.annotatedService(new CredentialServiceV1(projectApiManager, executor));
}

Expand Down Expand Up @@ -1022,12 +1026,14 @@ private void doStop() {
final ExecutorService repositoryWorker = this.repositoryWorker;
final ExecutorService purgeWorker = this.purgeWorker;
final SessionManager sessionManager = this.sessionManager;
final MirrorRunner mirrorRunner = this.mirrorRunner;

this.server = null;
this.executor = null;
this.pm = null;
this.repositoryWorker = null;
this.sessionManager = null;
this.mirrorRunner = null;
projectInitializer = null;
if (meterRegistryToBeClosed != null) {
assert meterRegistry instanceof CompositeMeterRegistry;
Expand All @@ -1037,7 +1043,7 @@ private void doStop() {
}

logger.info("Stopping the Central Dogma ..");
if (!doStop(server, executor, pm, repositoryWorker, purgeWorker, sessionManager)) {
if (!doStop(server, executor, pm, repositoryWorker, purgeWorker, sessionManager, mirrorRunner)) {
logger.warn("Stopped the Central Dogma with failure.");
} else {
logger.info("Stopped the Central Dogma successfully.");
Expand All @@ -1048,7 +1054,7 @@ private static boolean doStop(
@Nullable Server server, @Nullable CommandExecutor executor,
@Nullable ProjectManager pm,
@Nullable ExecutorService repositoryWorker, @Nullable ExecutorService purgeWorker,
@Nullable SessionManager sessionManager) {
@Nullable SessionManager sessionManager, @Nullable MirrorRunner mirrorRunner) {

boolean success = true;
try {
Expand Down Expand Up @@ -1117,6 +1123,17 @@ private static boolean doStop(
success = false;
}

try {
if (mirrorRunner != null) {
logger.info("Stopping the mirror runner..");
mirrorRunner.close();
logger.info("Stopped the mirror runner.");
}
} catch (Throwable t) {
success = false;
logger.warn("Failed to stop the mirror runner:", t);
}

try {
if (server != null) {
logger.info("Stopping the RPC server ..");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;

import java.io.File;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import com.cronutils.model.Cron;

import com.linecorp.armeria.server.annotation.Blocking;
import com.linecorp.armeria.server.annotation.ConsumesJson;
import com.linecorp.armeria.server.annotation.Get;
import com.linecorp.armeria.server.annotation.Param;
Expand All @@ -41,10 +38,10 @@
import com.linecorp.centraldogma.server.command.CommandExecutor;
import com.linecorp.centraldogma.server.internal.api.auth.RequiresReadPermission;
import com.linecorp.centraldogma.server.internal.api.auth.RequiresWritePermission;
import com.linecorp.centraldogma.server.internal.mirror.MirrorRunner;
import com.linecorp.centraldogma.server.internal.storage.project.ProjectApiManager;
import com.linecorp.centraldogma.server.mirror.Mirror;
import com.linecorp.centraldogma.server.mirror.MirrorResult;
import com.linecorp.centraldogma.server.mirror.MirroringServicePluginConfig;
import com.linecorp.centraldogma.server.storage.project.Project;
import com.linecorp.centraldogma.server.storage.repository.MetaRepository;

Expand All @@ -59,12 +56,13 @@ public class MirroringServiceV1 extends AbstractService {
// - Add Java APIs to the CentralDogma client

private final ProjectApiManager projectApiManager;
private final File workDir;
private final MirrorRunner mirrorRunner;

public MirroringServiceV1(ProjectApiManager projectApiManager, CommandExecutor executor, File dataDir) {
public MirroringServiceV1(ProjectApiManager projectApiManager, CommandExecutor executor,
MirrorRunner mirrorRunner) {
super(executor);
workDir = new File(dataDir, "_mirrors_manual");
this.projectApiManager = projectApiManager;
this.mirrorRunner = mirrorRunner;
}

/**
Expand Down Expand Up @@ -134,17 +132,8 @@ private CompletableFuture<PushResultDto> createOrUpdate(String projectName,
}

@Post("/projects/{projectName}/mirrors/{mirrorId}/run")
@Blocking
public MirrorResult runMirror(@Param String projectName, @Param String mirrorId) throws Exception {
final Mirror mirror = metaRepo(projectName).mirror(mirrorId).get(10, TimeUnit.SECONDS);
if (mirror.schedule() != null) {
throw new UnsupportedOperationException("The mirror is scheduled to run automatically.");
}

return mirror.mirror(workDir, executor(),
// TODO(ikhoon): Use cfg.pluginConfigMap().get(configType())
MirroringServicePluginConfig.INSTANCE.maxNumFilesPerMirror(),
MirroringServicePluginConfig.INSTANCE.maxNumBytesPerMirror());
public CompletableFuture<MirrorResult> runMirror(@Param String projectName, @Param String mirrorId) throws Exception {
return mirrorRunner.run(projectName, mirrorId);
}

private static MirrorDto convertToMirrorDto(String projectName, Mirror mirror) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ protected final MirrorResult newMirrorResult(MirrorStatus mirrorStatus, @Nullabl
public String toString() {
final ToStringHelper helper = MoreObjects.toStringHelper("")
.omitNullValues()
.add("schedule", CronDescriptor.instance().describe(schedule))
.add("direction", direction)
.add("localProj", localRepo.parent().name())
.add("localRepo", localRepo.name())
Expand All @@ -220,7 +219,9 @@ public String toString() {
.add("remoteBranch", remoteBranch)
.add("gitignore", gitignore)
.add("credential", credential);

if (schedule != null) {
helper.add("schedule", CronDescriptor.instance().describe(schedule));
}
return helper.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public final class DefaultMirroringServicePlugin implements Plugin {

@Nullable
private volatile DefaultMirroringService mirroringService;
private volatile MirrorSchedulingService mirroringService;

@Override
public PluginTarget target() {
Expand All @@ -45,7 +45,7 @@ public PluginTarget target() {
public synchronized CompletionStage<Void> start(PluginContext context) {
requireNonNull(context, "context");

DefaultMirroringService mirroringService = this.mirroringService;
MirrorSchedulingService mirroringService = this.mirroringService;
if (mirroringService == null) {
final CentralDogmaConfig cfg = context.config();
final MirroringServicePluginConfig mirroringServicePluginConfig =
Expand All @@ -63,7 +63,7 @@ public synchronized CompletionStage<Void> start(PluginContext context) {
maxNumFilesPerMirror = MirroringServicePluginConfig.INSTANCE.maxNumFilesPerMirror();
maxNumBytesPerMirror = MirroringServicePluginConfig.INSTANCE.maxNumBytesPerMirror();
}
mirroringService = new DefaultMirroringService(new File(cfg.dataDir(), "_mirrors"),
mirroringService = new MirrorSchedulingService(new File(cfg.dataDir(), "_mirrors"),
context.projectManager(),
context.meterRegistry(),
numThreads,
Expand All @@ -77,7 +77,7 @@ public synchronized CompletionStage<Void> start(PluginContext context) {

@Override
public synchronized CompletionStage<Void> stop(PluginContext context) {
final DefaultMirroringService mirroringService = this.mirroringService;
final MirrorSchedulingService mirroringService = this.mirroringService;
if (mirroringService != null && mirroringService.isStarted()) {
mirroringService.stop();
}
Expand All @@ -90,7 +90,7 @@ public Class<?> configType() {
}

@Nullable
public DefaultMirroringService mirroringService() {
public MirrorSchedulingService mirroringService() {
return mirroringService;
}

Expand Down
Loading

0 comments on commit ee8ccc7

Please sign in to comment.