From 534a4505f5c6e97255b96bb5c2a33f706f832736 Mon Sep 17 00:00:00 2001 From: Tomasz Bak Date: Thu, 27 Feb 2020 14:20:04 -0800 Subject: [PATCH] Share embedded stack for all JobCriteriaQueryTest tests + do not run in the security manager which add 30sec delay on startup due to slowness in java.io.UnixFileSystem.canonicalize0 (more detailed analysis available here: https://github.com/facebook/nailgun/issues/134) --- build.gradle | 8 +- .../jobmanager/model/job/JobGroupInfo.java | 2 +- .../titus/common/runtime/TitusRuntime.java | 5 + .../titus/common/runtime/TitusRuntimes.java | 5 + .../runtime/internal/DefaultTitusRuntime.java | 13 + .../startup/TitusFederationModule.java | 2 +- .../gateway/startup/TitusGatewayModule.java | 2 +- .../com/netflix/titus/master/TitusMaster.java | 2 +- .../titus/master/TitusRuntimeModule.java | 8 +- .../service/leader/GuiceLeaderActivator.java | 38 +- .../integration/BaseIntegrationTest.java | 30 - .../v3/job/JobCriteriaQueryTest.java | 611 ++++++++++-------- .../v3/job/JobDisruptionBudgetTest.java | 2 +- .../v3/scenario/JobsScenarioBuilder.java | 23 +- .../embedded/cell/EmbeddedTitusCells.java | 8 +- .../cell/gateway/EmbeddedTitusGateway.java | 6 + .../cell/master/EmbeddedTitusMaster.java | 2 +- .../embedded/cloud/SimulatedClouds.java | 10 + 18 files changed, 459 insertions(+), 318 deletions(-) diff --git a/build.gradle b/build.gradle index 644e6197f2..904022cd6e 100644 --- a/build.gradle +++ b/build.gradle @@ -145,8 +145,10 @@ subprojects { testCompile "org.assertj:assertj-core:${assertjVersion}" } + def processorTarget = Runtime.runtime.availableProcessors().intdiv(2) ?: 1 + test { - maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1 + maxParallelForks = processorTarget useJUnit { excludeCategories 'com.netflix.titus.testkit.junit.category.IntegrationTest', 'com.netflix.titus.testkit.junit.category.IntegrationNotParallelizableTest', @@ -168,7 +170,7 @@ subprojects { } task integrationTest(type: Test) { - maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1 + maxParallelForks = processorTarget useJUnit { includeCategories 'com.netflix.titus.testkit.junit.category.IntegrationTest' @@ -208,7 +210,7 @@ subprojects { } task remoteIntegrationTest(type: Test) { - maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1 + maxParallelForks = processorTarget useJUnit { includeCategories 'com.netflix.titus.testkit.junit.category.RemoteIntegrationTest' diff --git a/titus-api/src/main/java/com/netflix/titus/api/jobmanager/model/job/JobGroupInfo.java b/titus-api/src/main/java/com/netflix/titus/api/jobmanager/model/job/JobGroupInfo.java index 979c4027d2..cf6c20bc4b 100644 --- a/titus-api/src/main/java/com/netflix/titus/api/jobmanager/model/job/JobGroupInfo.java +++ b/titus-api/src/main/java/com/netflix/titus/api/jobmanager/model/job/JobGroupInfo.java @@ -84,7 +84,7 @@ public String toString() { } public Builder toBuilder() { - return newBuilder(); + return newBuilder().withStack(stack).withDetail(detail).withSequence(sequence); } public static Builder newBuilder() { diff --git a/titus-common/src/main/java/com/netflix/titus/common/runtime/TitusRuntime.java b/titus-common/src/main/java/com/netflix/titus/common/runtime/TitusRuntime.java index eea8625ab1..5dc813902c 100644 --- a/titus-common/src/main/java/com/netflix/titus/common/runtime/TitusRuntime.java +++ b/titus-common/src/main/java/com/netflix/titus/common/runtime/TitusRuntime.java @@ -70,6 +70,11 @@ public interface TitusRuntime { */ LocalScheduler getLocalScheduler(); + /** + * If true, fatal errors cause JVM termination via System.exit. + */ + boolean isSystemExitOnFailure(); + /** * In a few places in TitusMaster a component may decide to break the bootstrap process or terminate the JVM process * abruptly. This method should be called before that happens, so the {@link SystemAbortListener} implementation can diff --git a/titus-common/src/main/java/com/netflix/titus/common/runtime/TitusRuntimes.java b/titus-common/src/main/java/com/netflix/titus/common/runtime/TitusRuntimes.java index be60a0c63a..caae488491 100644 --- a/titus-common/src/main/java/com/netflix/titus/common/runtime/TitusRuntimes.java +++ b/titus-common/src/main/java/com/netflix/titus/common/runtime/TitusRuntimes.java @@ -41,6 +41,7 @@ public static TitusRuntime internal(Duration localSchedulerLoopInterval) { new LoggingCodePointTracker(), LoggingCodeInvariants.getDefault(), LoggingSystemLogService.getInstance(), + false, LoggingSystemAbortListener.getDefault(), localSchedulerLoopInterval, new DefaultRegistry(), @@ -58,6 +59,7 @@ public static TitusRuntime internal(boolean fitEnabled) { new LoggingCodePointTracker(), LoggingCodeInvariants.getDefault(), LoggingSystemLogService.getInstance(), + false, LoggingSystemAbortListener.getDefault(), LOCAL_SCHEDULER_LOOP_INTERVAL_MS, new DefaultRegistry(), @@ -71,6 +73,7 @@ public static TitusRuntime test() { new LoggingCodePointTracker(), new RecordingCodeInvariants(), LoggingSystemLogService.getInstance(), + false, LoggingSystemAbortListener.getDefault(), LOCAL_SCHEDULER_LOOP_INTERVAL_MS, new DefaultRegistry(), @@ -84,6 +87,7 @@ public static TitusRuntime test(TestClock clock) { new LoggingCodePointTracker(), new RecordingCodeInvariants(), LoggingSystemLogService.getInstance(), + false, LoggingSystemAbortListener.getDefault(), LOCAL_SCHEDULER_LOOP_INTERVAL_MS, new DefaultRegistry(), @@ -97,6 +101,7 @@ public static TitusRuntime test(TestScheduler testScheduler) { new LoggingCodePointTracker(), new RecordingCodeInvariants(), LoggingSystemLogService.getInstance(), + false, LoggingSystemAbortListener.getDefault(), LOCAL_SCHEDULER_LOOP_INTERVAL_MS, new DefaultRegistry(), diff --git a/titus-common/src/main/java/com/netflix/titus/common/runtime/internal/DefaultTitusRuntime.java b/titus-common/src/main/java/com/netflix/titus/common/runtime/internal/DefaultTitusRuntime.java index b4ebfa4f7a..5105548188 100644 --- a/titus-common/src/main/java/com/netflix/titus/common/runtime/internal/DefaultTitusRuntime.java +++ b/titus-common/src/main/java/com/netflix/titus/common/runtime/internal/DefaultTitusRuntime.java @@ -22,6 +22,7 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; import javax.inject.Inject; +import javax.inject.Named; import javax.inject.Singleton; import com.netflix.spectator.api.BasicTag; @@ -50,6 +51,8 @@ @Singleton public class DefaultTitusRuntime implements TitusRuntime { + public static final String SYSTEM_EXIT_ON_FAILURE = "systemExitOnFailure"; + private static final Logger logger = LoggerFactory.getLogger(DefaultTitusRuntime.class); public static final String FIT_ACTIVATION_PROPERTY = "titus.runtime.fit.enabled"; @@ -67,6 +70,7 @@ public class DefaultTitusRuntime implements TitusRuntime { private final CodePointTracker codePointTracker; private final CodeInvariants codeInvariants; private final SystemLogService systemLogService; + private final boolean systemExitOnFailure; private final SystemAbortListener systemAbortListener; private final Registry registry; private final Clock clock; @@ -76,12 +80,14 @@ public class DefaultTitusRuntime implements TitusRuntime { @Inject public DefaultTitusRuntime(CodeInvariants codeInvariants, SystemLogService systemLogService, + @Named(SYSTEM_EXIT_ON_FAILURE) boolean systemExitOnFailure, SystemAbortListener systemAbortListener, Registry registry) { this( new SpectatorCodePointTracker(registry), codeInvariants, systemLogService, + systemExitOnFailure, systemAbortListener, LOCAL_SCHEDULER_LOOP_INTERVAL, registry, @@ -93,6 +99,7 @@ public DefaultTitusRuntime(CodeInvariants codeInvariants, public DefaultTitusRuntime(CodePointTracker codePointTracker, CodeInvariants codeInvariants, SystemLogService systemLogService, + boolean systemExitOnFailure, SystemAbortListener systemAbortListener, Duration localSchedulerLoopInterval, Registry registry, @@ -101,6 +108,7 @@ public DefaultTitusRuntime(CodePointTracker codePointTracker, this.codePointTracker = codePointTracker; this.codeInvariants = codeInvariants; this.systemLogService = systemLogService; + this.systemExitOnFailure = systemExitOnFailure; this.systemAbortListener = systemAbortListener; this.registry = registry; this.clock = clock; @@ -175,6 +183,11 @@ public LocalScheduler getLocalScheduler() { return localScheduler; } + @Override + public boolean isSystemExitOnFailure() { + return systemExitOnFailure; + } + @Override public void beforeAbort(SystemAbortEvent event) { logger.error("System abort requested: {}", event); diff --git a/titus-server-federation/src/main/java/com/netflix/titus/federation/startup/TitusFederationModule.java b/titus-server-federation/src/main/java/com/netflix/titus/federation/startup/TitusFederationModule.java index 85d5ed04be..16244d9274 100644 --- a/titus-server-federation/src/main/java/com/netflix/titus/federation/startup/TitusFederationModule.java +++ b/titus-server-federation/src/main/java/com/netflix/titus/federation/startup/TitusFederationModule.java @@ -95,7 +95,7 @@ public TitusRuntime getTitusRuntime(SystemLogService systemLogService, SystemAbo LoggingCodeInvariants.getDefault(), new SpectatorCodeInvariants(registry.createId("titus.runtime.invariant.violations"), registry) ); - return new DefaultTitusRuntime(codeInvariants, systemLogService, systemAbortListener, registry); + return new DefaultTitusRuntime(codeInvariants, systemLogService, false, systemAbortListener, registry); } @Provides diff --git a/titus-server-gateway/src/main/java/com/netflix/titus/gateway/startup/TitusGatewayModule.java b/titus-server-gateway/src/main/java/com/netflix/titus/gateway/startup/TitusGatewayModule.java index 041947a226..6ccbe3c4ed 100644 --- a/titus-server-gateway/src/main/java/com/netflix/titus/gateway/startup/TitusGatewayModule.java +++ b/titus-server-gateway/src/main/java/com/netflix/titus/gateway/startup/TitusGatewayModule.java @@ -140,7 +140,7 @@ public TitusRuntime getTitusRuntime(SystemLogService systemLogService, SystemAbo LoggingCodeInvariants.getDefault(), new SpectatorCodeInvariants(registry.createId("titus.runtime.invariant.violations"), registry) ); - return new DefaultTitusRuntime(codeInvariants, systemLogService, systemAbortListener, registry); + return new DefaultTitusRuntime(codeInvariants, systemLogService, false, systemAbortListener, registry); } @Provides diff --git a/titus-server-master/src/main/java/com/netflix/titus/master/TitusMaster.java b/titus-server-master/src/main/java/com/netflix/titus/master/TitusMaster.java index bc68be15ee..7d667bfa57 100644 --- a/titus-server-master/src/main/java/com/netflix/titus/master/TitusMaster.java +++ b/titus-server-master/src/main/java/com/netflix/titus/master/TitusMaster.java @@ -55,7 +55,7 @@ public static void main(String[] args) { String resourceDir = TitusMaster.class.getClassLoader().getResource("static").toExternalForm(); LifecycleInjector injector = InjectorBuilder.fromModules( - Modules.override(new TitusRuntimeModule()).with( + Modules.override(new TitusRuntimeModule(true)).with( new AbstractModule() { @Override protected void configure() { diff --git a/titus-server-master/src/main/java/com/netflix/titus/master/TitusRuntimeModule.java b/titus-server-master/src/main/java/com/netflix/titus/master/TitusRuntimeModule.java index 8b247c9266..b1cc6250ce 100644 --- a/titus-server-master/src/main/java/com/netflix/titus/master/TitusRuntimeModule.java +++ b/titus-server-master/src/main/java/com/netflix/titus/master/TitusRuntimeModule.java @@ -71,6 +71,12 @@ public class TitusRuntimeModule extends AbstractModule { public static final String FIT_CONFIGURATION_PREFIX = "titusMaster.runtime.fitActions."; + private final boolean systemExitOnFailure; + + public TitusRuntimeModule(boolean systemExitOnFailure) { + this.systemExitOnFailure = systemExitOnFailure; + } + @Override protected void configure() { // Framework services @@ -95,7 +101,7 @@ public TitusRuntime getTitusRuntime(SystemLogService systemLogService, SystemAbo LoggingCodeInvariants.getDefault(), new SpectatorCodeInvariants(registry.createId("titus.runtime.invariant.violations"), registry) ); - DefaultTitusRuntime titusRuntime = new DefaultTitusRuntime(codeInvariants, systemLogService, systemAbortListener, registry); + DefaultTitusRuntime titusRuntime = new DefaultTitusRuntime(codeInvariants, systemLogService, systemExitOnFailure, systemAbortListener, registry); // Setup FIT component hierarchy FitFramework fitFramework = titusRuntime.getFitFramework(); diff --git a/titus-server-master/src/main/java/com/netflix/titus/master/supervisor/service/leader/GuiceLeaderActivator.java b/titus-server-master/src/main/java/com/netflix/titus/master/supervisor/service/leader/GuiceLeaderActivator.java index 0b39e4b307..80a3467538 100644 --- a/titus-server-master/src/main/java/com/netflix/titus/master/supervisor/service/leader/GuiceLeaderActivator.java +++ b/titus-server-master/src/main/java/com/netflix/titus/master/supervisor/service/leader/GuiceLeaderActivator.java @@ -63,6 +63,7 @@ private enum State { private final Injector injector; private final Clock clock; + private final TitusRuntime titusRuntime; private final ActivationLifecycle activationLifecycle; private volatile boolean leader; @@ -83,6 +84,7 @@ public GuiceLeaderActivator(Injector injector, this.injector = injector; this.activationLifecycle = activationLifecycle; this.clock = titusRuntime.getClock(); + this.titusRuntime = titusRuntime; Registry registry = titusRuntime.getRegistry(); @@ -177,12 +179,16 @@ public void stopBeingLeader() { leader = false; activated = false; - // Various services may have built in-memory state that is currently not easy to revert to initialization state. - // Until we create such a lifecycle feature for each service and all of their references, best thing to do is to - // exit the process and depend on a watcher process to restart us right away. Especially since restart isn't - // very expensive. - logger.error("Exiting due to losing leadership after running as leader"); - SystemExt.forcedProcessExit(1); + if (titusRuntime.isSystemExitOnFailure()) { + // Various services may have built in-memory state that is currently not easy to revert to initialization state. + // Until we create such a lifecycle feature for each service and all of their references, best thing to do is to + // exit the process and depend on a watcher process to restart us right away. Especially since restart isn't + // very expensive. + logger.error("Exiting due to losing leadership after running as leader"); + SystemExt.forcedProcessExit(1); + } else { + deactivate(); + } } @Override @@ -213,15 +219,29 @@ private void activate() { } catch (Exception e) { stopBeingLeader(); - // As stopBeingLeader method not always terminates the process, lets make sure it does. - SystemExt.forcedProcessExit(-1); + if (titusRuntime.isSystemExitOnFailure()) { + // As stopBeingLeader method not always terminates the process, lets make sure it does. + SystemExt.forcedProcessExit(-1); + } + throw e; } } catch (Throwable e) { - SystemExt.forcedProcessExit(-1); + if (titusRuntime.isSystemExitOnFailure()) { + SystemExt.forcedProcessExit(-1); + } + throw e; } this.activated = true; this.activationEndTimestamp = clock.wallTime(); this.activationTime = activationEndTimestamp - activationStartTimestamp; } + + private void deactivate() { + try { + activationLifecycle.deactivate(); + } catch (Exception e) { + e.printStackTrace(); + } + } } diff --git a/titus-server-master/src/test/java/com/netflix/titus/master/integration/BaseIntegrationTest.java b/titus-server-master/src/test/java/com/netflix/titus/master/integration/BaseIntegrationTest.java index 3b38817666..ec0b90dd50 100644 --- a/titus-server-master/src/test/java/com/netflix/titus/master/integration/BaseIntegrationTest.java +++ b/titus-server-master/src/test/java/com/netflix/titus/master/integration/BaseIntegrationTest.java @@ -16,10 +16,7 @@ package com.netflix.titus.master.integration; -import java.security.Permission; - import com.netflix.titus.testkit.junit.category.IntegrationTest; -import org.junit.BeforeClass; import org.junit.experimental.categories.Category; @Category(IntegrationTest.class) @@ -28,31 +25,4 @@ public class BaseIntegrationTest { protected static final long TEST_TIMEOUT_MS = 30_000; protected static final long LONG_TEST_TIMEOUT_MS = 60_000; - - static class PreventSystemExitSecurityManager extends SecurityManager { - @Override - public void checkPermission(Permission perm) { - } - - @Override - public void checkPermission(Permission perm, Object context) { - } - - @Override - public void checkExit(int status) { - if (status != 0) { - String message = "System exit requested with error " + status; - throw new IllegalStateException(message); - } - } - } - - private static final SecurityManager securityManager = new PreventSystemExitSecurityManager(); - - @BeforeClass - public static void setSecurityManager() { - if (System.getSecurityManager() != securityManager) { - System.setSecurityManager(securityManager); - } - } } diff --git a/titus-server-master/src/test/java/com/netflix/titus/master/integration/v3/job/JobCriteriaQueryTest.java b/titus-server-master/src/test/java/com/netflix/titus/master/integration/v3/job/JobCriteriaQueryTest.java index 10e90dbc16..90deb12286 100644 --- a/titus-server-master/src/test/java/com/netflix/titus/master/integration/v3/job/JobCriteriaQueryTest.java +++ b/titus-server-master/src/test/java/com/netflix/titus/master/integration/v3/job/JobCriteriaQueryTest.java @@ -16,19 +16,24 @@ package com.netflix.titus.master.integration.v3.job; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import com.google.common.collect.ImmutableMap; +import com.netflix.titus.api.jobmanager.model.job.Image; import com.netflix.titus.api.jobmanager.model.job.JobDescriptor; import com.netflix.titus.api.jobmanager.model.job.JobFunctions; -import com.netflix.titus.api.jobmanager.model.job.JobModel; +import com.netflix.titus.api.jobmanager.model.job.JobGroupInfo; import com.netflix.titus.api.jobmanager.model.job.JobState; +import com.netflix.titus.api.jobmanager.model.job.Owner; import com.netflix.titus.api.jobmanager.model.job.ext.BatchJobExt; +import com.netflix.titus.api.jobmanager.model.job.ext.ServiceJobExt; import com.netflix.titus.common.util.CollectionsExt; -import com.netflix.titus.common.util.tuple.Triple; import com.netflix.titus.grpc.protogen.Job; import com.netflix.titus.grpc.protogen.JobManagementServiceGrpc; import com.netflix.titus.grpc.protogen.JobQuery; @@ -42,59 +47,151 @@ import com.netflix.titus.master.integration.BaseIntegrationTest; import com.netflix.titus.master.integration.v3.scenario.InstanceGroupScenarioTemplates; import com.netflix.titus.master.integration.v3.scenario.InstanceGroupsScenarioBuilder; +import com.netflix.titus.master.integration.v3.scenario.JobScenarioBuilder; import com.netflix.titus.master.integration.v3.scenario.JobsScenarioBuilder; import com.netflix.titus.master.integration.v3.scenario.ScenarioTemplates; import com.netflix.titus.master.integration.v3.scenario.TaskScenarioBuilder; +import com.netflix.titus.testkit.embedded.cell.EmbeddedTitusCell; import com.netflix.titus.testkit.embedded.cell.master.EmbeddedTitusMaster; +import com.netflix.titus.testkit.embedded.cell.master.EmbeddedTitusMasters; +import com.netflix.titus.testkit.embedded.cloud.SimulatedClouds; import com.netflix.titus.testkit.junit.category.IntegrationTest; import com.netflix.titus.testkit.junit.master.TitusStackResource; -import org.junit.Before; -import org.junit.Rule; +import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.RuleChain; import static com.netflix.titus.master.integration.v3.job.CellAssertions.assertCellInfo; -import static com.netflix.titus.testkit.embedded.cell.EmbeddedTitusCells.basicCell; import static com.netflix.titus.testkit.model.job.JobDescriptorGenerator.batchJobDescriptors; import static com.netflix.titus.testkit.model.job.JobDescriptorGenerator.oneTaskBatchJobDescriptor; import static com.netflix.titus.testkit.model.job.JobDescriptorGenerator.oneTaskServiceJobDescriptor; import static org.assertj.core.api.Assertions.assertThat; /** - * TODO Error codes + * As queries are immutable, the tests share single stack which is preloaded with all the data required for all test cases. + * Furthermore each test may add its own data set. */ @Category(IntegrationTest.class) public class JobCriteriaQueryTest extends BaseIntegrationTest { private static final Page PAGE = Page.newBuilder().setPageNumber(0).setPageSize(100).build(); - private static final String V3_ENGINE_APP = TitusStackResource.V3_ENGINE_APP_PREFIX + 1; - private static final String V3_ENGINE_APP2 = TitusStackResource.V3_ENGINE_APP_PREFIX + 2; - - private final TitusStackResource titusStackResource = new TitusStackResource(basicCell(5)); - - private final JobsScenarioBuilder jobsScenarioBuilder = new JobsScenarioBuilder(titusStackResource); - - private final InstanceGroupsScenarioBuilder instanceGroupsScenarioBuilder = new InstanceGroupsScenarioBuilder(titusStackResource); + private final static TitusStackResource titusStackResource = new TitusStackResource( + EmbeddedTitusCell.aTitusCell() + .withMaster(EmbeddedTitusMasters.basicMaster(SimulatedClouds.basicCloudWithLargeInstances(20)).toBuilder() + .withCellName("embeddedCell") + .withProperty("titus.scheduler.globalTaskLaunchingConstraintEvaluatorEnabled", "false") + // Set to very high value as we do not want to expire it. + .withProperty("titusMaster.jobManager.taskInLaunchedStateTimeoutMs", "30000000") + .withProperty("titusMaster.jobManager.batchTaskInStartInitiatedStateTimeoutMs", "30000000") + .withProperty("titusMaster.jobManager.serviceTaskInStartInitiatedStateTimeoutMs", "30000000") + .build() + ) + .withDefaultGateway() + .build() + ); + + private final static JobsScenarioBuilder jobsScenarioBuilder = new JobsScenarioBuilder(titusStackResource); + + private final static InstanceGroupsScenarioBuilder instanceGroupsScenarioBuilder = new InstanceGroupsScenarioBuilder(titusStackResource); + + @ClassRule + public static final RuleChain ruleChain = RuleChain.outerRule(titusStackResource).around(instanceGroupsScenarioBuilder).around(jobsScenarioBuilder); + + private static final String BATCH_OWNER = "batchOwner@netflix.com"; + private static final String BATCH_APPLICATION = "batchApplication"; + private static final String BATCH_CAPACITY_GROUP = "batchCapacityGroup"; + private static final JobGroupInfo BATCH_JOB_GROUP_INFO = JobGroupInfo.newBuilder().withStack("batchStack").withDetail("batchDetail").withSequence("batch001").build(); + private static final String BATCH_IMAGE_NAME = "batchImageName"; + private static final String BATCH_IMAGE_TAG = "batchImageTag"; + + private static final String SERVICE_OWNER = "serviceOwner@netflix.com"; + private static final String SERVICE_APPLICATION = "serviceApplication"; + private static final String SERVICE_CAPACITY_GROUP = "serviceCapacityGroup"; + private static final JobGroupInfo SERVICE_JOB_GROUP_INFO = JobGroupInfo.newBuilder().withStack("serviceStack").withDetail("serviceDetail").withSequence("service001").build(); + private static final String SERVICE_IMAGE_NAME = "serviceImageName"; + private static final String SERVICE_IMAGE_TAG = "serviceImageTag"; + + /** + * Add to jobs created in the setup method. + */ + private static final String PRE_CREATED_JOBS_LABEL = "precreatedJob"; + + private static JobManagementServiceGrpc.JobManagementServiceBlockingStub client; + + private static final List batchJobsWithCreatedTasks = new ArrayList<>(); + private static final List batchTasks = new ArrayList<>(); + private static final List serviceJobsWithCreatedTasks = new ArrayList<>(); + private static final List serviceTasks = new ArrayList<>(); + + private static String finishedBatchJobWithFiveTasks; + + @BeforeClass + public static void setUp() throws Exception { + instanceGroupsScenarioBuilder.synchronizeWithCloud().template(InstanceGroupScenarioTemplates.basicCloudActivation()); + client = titusStackResource.getGateway().getV3BlockingGrpcClient(); - @Rule - public final RuleChain ruleChain = RuleChain.outerRule(titusStackResource).around(instanceGroupsScenarioBuilder).around(jobsScenarioBuilder); + // Jobs with launched tasks + JobDescriptor batchJobDescriptor = oneTaskBatchJobDescriptor().toBuilder() + .withOwner(Owner.newBuilder().withTeamEmail(BATCH_OWNER).build()) + .withApplicationName(BATCH_APPLICATION) + .withCapacityGroup(BATCH_CAPACITY_GROUP) + .withJobGroupInfo(BATCH_JOB_GROUP_INFO) + .withContainer(oneTaskBatchJobDescriptor().getContainer().toBuilder() + .withImage(Image.newBuilder().withName(BATCH_IMAGE_NAME).withTag(BATCH_IMAGE_TAG).build()) + .build() + ) + .withAttributes(Collections.singletonMap(PRE_CREATED_JOBS_LABEL, "true")) + .build(); + jobsScenarioBuilder.schedule(batchJobDescriptor, 3, + jobScenarioBuilder -> jobScenarioBuilder + .template(ScenarioTemplates.launchJob()) + .inJob(job -> batchJobsWithCreatedTasks.add(job.getId())) + ); + batchJobsWithCreatedTasks.forEach(jobId -> { + String taskId = jobsScenarioBuilder.takeJob(jobId).getTaskByIndex(0).getTask().getId(); + batchTasks.add(taskId); + }); + JobDescriptor serviceJobDescriptor = oneTaskServiceJobDescriptor().toBuilder() + .withOwner(Owner.newBuilder().withTeamEmail(SERVICE_OWNER).build()) + .withApplicationName(SERVICE_APPLICATION) + .withCapacityGroup(SERVICE_CAPACITY_GROUP) + .withJobGroupInfo(SERVICE_JOB_GROUP_INFO) + .withContainer(oneTaskServiceJobDescriptor().getContainer().toBuilder() + .withImage(Image.newBuilder().withName(SERVICE_IMAGE_NAME).withTag(SERVICE_IMAGE_TAG).build()) + .build() + ) + .withAttributes(Collections.singletonMap(PRE_CREATED_JOBS_LABEL, "true")) + .build(); + jobsScenarioBuilder.schedule(serviceJobDescriptor, 3, + jobScenarioBuilder -> jobScenarioBuilder + .template(ScenarioTemplates.launchJob()) + .inJob(job -> serviceJobsWithCreatedTasks.add(job.getId())) + ); + serviceJobsWithCreatedTasks.forEach(jobId -> { + String taskId = jobsScenarioBuilder.takeJob(jobId).getTaskByIndex(0).getTask().getId(); + serviceTasks.add(taskId); + }); - private JobManagementServiceGrpc.JobManagementServiceBlockingStub client; + // Finished job with 5 tasks + int numberOfTasks = 5; + JobDescriptor jobDescriptor = oneTaskBatchJobDescriptor() + .but(jd -> jd.getExtensions().toBuilder().withSize(numberOfTasks).build()); - @Before - public void setUp() throws Exception { - instanceGroupsScenarioBuilder.synchronizeWithCloud().template(InstanceGroupScenarioTemplates.basicCloudActivation()); - client = titusStackResource.getGateway().getV3BlockingGrpcClient(); + jobsScenarioBuilder.schedule(jobDescriptor, 1, jobScenarioBuilder -> jobScenarioBuilder + .template(ScenarioTemplates.launchJob()) + .allTasks(ScenarioTemplates.completeTask()) + .expectJobUpdateEvent(job -> job.getStatus().getState() == JobState.Finished, "Expected job to complete") + .inJob(job -> finishedBatchJobWithFiveTasks = job.getId()) + ); } @Test(timeout = 30_000) - public void testFindJobAndTaskByJobIdsV3() throws Exception { - jobsScenarioBuilder.schedule(oneTaskBatchJobDescriptor(), 3, jobScenarioBuilder -> jobScenarioBuilder.template(ScenarioTemplates.launchJob())); - - String job0 = jobsScenarioBuilder.takeJob(0).getJobId(); - String job2 = jobsScenarioBuilder.takeJob(2).getJobId(); + public void testFindJobAndTaskByJobIdsV3() { + String job0 = batchJobsWithCreatedTasks.get(0); + String job2 = batchJobsWithCreatedTasks.get(2); // Jobs JobQueryResult jobQueryResult = client.findJobs(JobQuery.newBuilder().putFilteringCriteria("jobIds", job0 + ',' + job2).setPage(PAGE).build()); @@ -107,11 +204,9 @@ public void testFindJobAndTaskByJobIdsV3() throws Exception { } @Test(timeout = 30_000) - public void testFindJobAndTaskByTaskIdsV3() throws Exception { - jobsScenarioBuilder.schedule(oneTaskBatchJobDescriptor(), 3, jobScenarioBuilder -> jobScenarioBuilder.template(ScenarioTemplates.launchJob())); - - String task0 = jobsScenarioBuilder.takeJob(0).getTaskByIndex(0).getTask().getId(); - String task2 = jobsScenarioBuilder.takeJob(2).getTaskByIndex(0).getTask().getId(); + public void testFindJobAndTaskByTaskIdsV3() { + String task0 = jobsScenarioBuilder.takeJob(batchJobsWithCreatedTasks.get(0)).getTaskByIndex(0).getTask().getId(); + String task2 = jobsScenarioBuilder.takeJob(batchJobsWithCreatedTasks.get(2)).getTaskByIndex(0).getTask().getId(); // Jobs JobQueryResult jobQueryResult = client.findJobs(JobQuery.newBuilder().putFilteringCriteria("taskIds", task0 + ',' + task2).setPage(PAGE).build()); @@ -124,61 +219,47 @@ public void testFindJobAndTaskByTaskIdsV3() throws Exception { } @Test(timeout = 60_000) - public void testFindArchivedTasksByTaskIdsV3() throws Exception { - int numberOfTasks = 5; - JobDescriptor jobDescriptor = oneTaskBatchJobDescriptor() - .but(jd -> jd.getExtensions().toBuilder().withSize(numberOfTasks).build()); + public void testFindArchivedTasksByTaskIdsV3() { + TaskQueryResult taskQueryResult = client.findTasks(TaskQuery.newBuilder() + .putFilteringCriteria("jobIds", finishedBatchJobWithFiveTasks) + .putFilteringCriteria("taskStates", com.netflix.titus.grpc.protogen.TaskStatus.TaskState.Finished.name()) + .setPage(PAGE) + .build() + ); - jobsScenarioBuilder.schedule(jobDescriptor, 1, jobScenarioBuilder -> jobScenarioBuilder - .template(ScenarioTemplates.launchJob()) - .allTasks(ScenarioTemplates.completeTask()) - .expectJobUpdateEvent(job -> job.getStatus().getState() == JobState.Finished, "Expected job to complete") - .findTasks(TaskQuery.newBuilder() - .putFilteringCriteria("jobIds", jobsScenarioBuilder.takeJobId(0)) - .putFilteringCriteria("taskStates", TaskStatus.TaskState.Finished.name()) - .setPage(PAGE) - .build(), - tasks -> tasks.size() == numberOfTasks && tasks.stream().allMatch(task -> task.getStatus().getState() == TaskStatus.TaskState.Finished))); + List tasks = taskQueryResult.getItemsList(); + assertThat(tasks).hasSize(5); + assertThat(tasks).allMatch(task -> task.getStatus().getState() == TaskStatus.TaskState.Finished); } @Test(timeout = 30_000) - public void testSearchByJobTypeV3() throws Exception { - jobsScenarioBuilder.schedule(oneTaskBatchJobDescriptor(), 1, jobScenarioBuilder -> jobScenarioBuilder.template(ScenarioTemplates.launchJob())); - jobsScenarioBuilder.schedule(oneTaskServiceJobDescriptor(), 1, jobScenarioBuilder -> jobScenarioBuilder.template(ScenarioTemplates.launchJob())); - - String batchJobId = jobsScenarioBuilder.takeJob(0).getJobId(); - String batchTaskId = jobsScenarioBuilder.takeJob(0).getTaskByIndex(0).getTask().getId(); - String serviceJobId = jobsScenarioBuilder.takeJob(1).getJobId(); - String serviceTaskId = jobsScenarioBuilder.takeJob(1).getTaskByIndex(0).getTask().getId(); - + public void testSearchByJobTypeV3() { // Batch only (jobs) JobQueryResult batchQueryJobs = client.findJobs(JobQuery.newBuilder().putFilteringCriteria("jobType", "batch").setPage(PAGE).build()); - assertThat(batchQueryJobs.getItemsList()).hasSize(1); - Job batchQueryJobsItem = batchQueryJobs.getItems(0); - assertThat(batchQueryJobsItem.getId()).isEqualTo(batchJobId); + Set batchJobIds = batchQueryJobs.getItemsList().stream().map(Job::getId).collect(Collectors.toSet()); + assertThat(batchJobIds).containsAll(batchJobsWithCreatedTasks); // Batch only (tasks) TaskQueryResult batchQueryTasks = client.findTasks(TaskQuery.newBuilder().putFilteringCriteria("jobType", "batch").setPage(PAGE).build()); - assertThat(batchQueryTasks.getItemsList()).hasSize(1); - assertThat(batchQueryTasks.getItems(0).getId()).isEqualTo(batchTaskId); + Set batchTaskIds = batchQueryTasks.getItemsList().stream().map(Task::getId).collect(Collectors.toSet()); + assertThat(batchTaskIds).containsAll(batchTasks); // Service only (jobs) JobQueryResult serviceQueryJobs = client.findJobs(JobQuery.newBuilder().putFilteringCriteria("jobType", "service").setPage(PAGE).build()); - assertThat(serviceQueryJobs.getItemsList()).hasSize(1); - Job serviceQueryJobsItem = serviceQueryJobs.getItems(0); - assertThat(serviceQueryJobsItem.getId()).isEqualTo(serviceJobId); + Set serviceJobIds = serviceQueryJobs.getItemsList().stream().map(Job::getId).collect(Collectors.toSet()); + assertThat(serviceJobIds).containsAll(serviceJobsWithCreatedTasks); // Service only (tasks) TaskQueryResult serviceQueryTasks = client.findTasks(TaskQuery.newBuilder().putFilteringCriteria("jobType", "service").setPage(PAGE).build()); - assertThat(serviceQueryTasks.getItemsList()).hasSize(1); - assertThat(serviceQueryTasks.getItems(0).getId()).isEqualTo(serviceTaskId); + Set serviceTaskIds = serviceQueryTasks.getItemsList().stream().map(Task::getId).collect(Collectors.toSet()); + assertThat(serviceTaskIds).containsAll(serviceTasks); } @Test(timeout = 30_000) public void testSearchByJobState() throws Exception { - JobDescriptor jobDescriptor = batchJobDescriptors().getValue().toBuilder().withApplicationName(V3_ENGINE_APP).build(); - jobsScenarioBuilder.schedule(jobDescriptor, jobScenarioBuilder -> jobScenarioBuilder.template(ScenarioTemplates.launchJob())); - jobsScenarioBuilder.schedule(jobDescriptor, jobScenarioBuilder -> jobScenarioBuilder + JobDescriptor jobDescriptor = batchJobDescriptors().getValue().toBuilder().withApplicationName("testSearchByJobState").build(); + String acceptedJobId = jobsScenarioBuilder.scheduleAndReturnJob(jobDescriptor, jobScenarioBuilder -> jobScenarioBuilder.template(ScenarioTemplates.launchJob())).getId(); + String killInitiatedJobId = jobsScenarioBuilder.scheduleAndReturnJob(jobDescriptor, jobScenarioBuilder -> jobScenarioBuilder .template(ScenarioTemplates.launchJob()) .allTasks(taskScenarioBuilder -> { taskScenarioBuilder.getTaskExecutionHolder().delayStateTransition(taskState -> Long.MAX_VALUE); @@ -186,383 +267,368 @@ public void testSearchByJobState() throws Exception { }) .killJob() .expectJobUpdateEvent(job -> job.getStatus().getState() == JobState.KillInitiated, "Expected state: " + JobState.KillInitiated) - ); + ).getId(); - String acceptedJobId = jobsScenarioBuilder.takeJob(0).getJobId(); - String acceptedTaskId = jobsScenarioBuilder.takeJob(0).getTaskByIndex(0).getTask().getId(); - String killInitiatedJobId = jobsScenarioBuilder.takeJob(1).getJobId(); - String killInitiatedTaskId = jobsScenarioBuilder.takeJob(1).getTaskByIndex(0).getTask().getId(); + String acceptedTaskId = jobsScenarioBuilder.takeJob(acceptedJobId).getTaskByIndex(0).getTask().getId(); + String killInitiatedTaskId = jobsScenarioBuilder.takeJob(killInitiatedJobId).getTaskByIndex(0).getTask().getId(); // Indexes are recomputed after events are sent, so if we run findJobs/findTasks immediately, they may use stale index. Thread.sleep(10); + JobQuery.Builder jobQueryBuilder = JobQuery.newBuilder() + .putFilteringCriteria("applicationName", "testSearchByJobState") + .setPage(PAGE); + TaskQuery.Builder taskQueryBuilder = TaskQuery.newBuilder() + .putFilteringCriteria("applicationName", "testSearchByJobState") + .setPage(PAGE); + // Jobs (Accepted) - JobQueryResult acceptedJobQueryResult = client.findJobs(JobQuery.newBuilder().putFilteringCriteria("jobState", "Accepted").setPage(PAGE).build()); + JobQueryResult acceptedJobQueryResult = client.findJobs(jobQueryBuilder.putFilteringCriteria("jobState", "Accepted").build()); assertThat(acceptedJobQueryResult.getItemsList()).hasSize(1); Job acceptedJobQueryResultItem = acceptedJobQueryResult.getItems(0); assertThat(acceptedJobQueryResultItem.getId()).isEqualTo(acceptedJobId); // Jobs (KillInitiated) - JobQueryResult killInitJobQueryResult = client.findJobs(JobQuery.newBuilder().putFilteringCriteria("jobState", "KillInitiated").setPage(PAGE).build()); + JobQueryResult killInitJobQueryResult = client.findJobs(jobQueryBuilder.putFilteringCriteria("jobState", "KillInitiated").setPage(PAGE).build()); assertThat(killInitJobQueryResult.getItemsList()).hasSize(1); Job killInitJobQueryResultItem = killInitJobQueryResult.getItems(0); assertThat(killInitJobQueryResultItem.getId()).isEqualTo(killInitiatedJobId); // Tasks (Accepted) - TaskQueryResult acceptedTaskQueryResult = client.findTasks(TaskQuery.newBuilder().putFilteringCriteria("jobState", "Accepted").setPage(PAGE).build()); + TaskQueryResult acceptedTaskQueryResult = client.findTasks(taskQueryBuilder.putFilteringCriteria("jobState", "Accepted").setPage(PAGE).build()); assertThat(acceptedTaskQueryResult.getItemsList()).hasSize(1); assertThat(acceptedTaskQueryResult.getItems(0).getId()).isEqualTo(acceptedTaskId); // Tasks (KillInitiated) - TaskQueryResult killInitTaskQueryResult = client.findTasks(TaskQuery.newBuilder().putFilteringCriteria("jobState", "KillInitiated").setPage(PAGE).build()); + TaskQueryResult killInitTaskQueryResult = client.findTasks(taskQueryBuilder.putFilteringCriteria("jobState", "KillInitiated").setPage(PAGE).build()); assertThat(killInitTaskQueryResult.getItemsList()).hasSize(1); assertThat(killInitTaskQueryResult.getItems(0).getId()).isEqualTo(killInitiatedTaskId); } @Test(timeout = 30_000) - public void testSearchByTaskStateV3() throws Exception { - jobsScenarioBuilder.schedule(oneTaskBatchJobDescriptor(), jobScenarioBuilder -> jobScenarioBuilder.template(ScenarioTemplates.startJob(TaskStatus.TaskState.Launched))); - jobsScenarioBuilder.schedule(oneTaskBatchJobDescriptor(), jobScenarioBuilder -> jobScenarioBuilder.template(ScenarioTemplates.startJob(TaskStatus.TaskState.StartInitiated))); - jobsScenarioBuilder.schedule(oneTaskBatchJobDescriptor(), jobScenarioBuilder -> jobScenarioBuilder.template(ScenarioTemplates.startJob(TaskStatus.TaskState.Started))); - jobsScenarioBuilder.schedule(oneTaskBatchJobDescriptor(), jobScenarioBuilder -> jobScenarioBuilder.template(ScenarioTemplates.startJobAndMoveTasksToKillInitiated(true))); - - testSearchByTaskState("Launched", jobsScenarioBuilder.takeJobId(0), jobsScenarioBuilder.takeTaskId(0, 0)); - testSearchByTaskState("StartInitiated", jobsScenarioBuilder.takeJobId(1), jobsScenarioBuilder.takeTaskId(1, 0)); - testSearchByTaskState("Started", jobsScenarioBuilder.takeJobId(2), jobsScenarioBuilder.takeTaskId(2, 0)); - testSearchByTaskState("KillInitiated", jobsScenarioBuilder.takeJobId(3), jobsScenarioBuilder.takeTaskId(3, 0)); + public void testSearchByTaskStateV3() { + Function, String> jobSubmitter = template -> + jobsScenarioBuilder.scheduleAndReturnJob( + oneTaskBatchJobDescriptor().toBuilder().withApplicationName("testSearchByTaskStateV3").build(), + jobScenarioBuilder -> jobScenarioBuilder.template(template) + ).getId(); + + String jobLaunchedId = jobSubmitter.apply(ScenarioTemplates.startJob(TaskStatus.TaskState.Launched)); + String startInitiatedJobId = jobSubmitter.apply(ScenarioTemplates.startJob(TaskStatus.TaskState.StartInitiated)); + String startedJobId = jobSubmitter.apply(ScenarioTemplates.startJob(TaskStatus.TaskState.Started)); + String killInitiatedJobId = jobSubmitter.apply(ScenarioTemplates.startJobAndMoveTasksToKillInitiated(true)); + + testSearchByTaskStateV3("Launched", jobLaunchedId, jobsScenarioBuilder.takeTaskId(jobLaunchedId, 0)); + testSearchByTaskStateV3("StartInitiated", startInitiatedJobId, jobsScenarioBuilder.takeTaskId(startInitiatedJobId, 0)); + testSearchByTaskStateV3("Started", startedJobId, jobsScenarioBuilder.takeTaskId(startedJobId, 0)); + testSearchByTaskStateV3("KillInitiated", killInitiatedJobId, jobsScenarioBuilder.takeTaskId(killInitiatedJobId, 0)); + } + + private void testSearchByTaskStateV3(String taskState, String expectedJobId, String expectedTaskId) { + // Job + JobQueryResult jobQueryResult = client.findJobs(JobQuery.newBuilder() + .putFilteringCriteria("applicationName", "testSearchByTaskStateV3") + .putFilteringCriteria("taskStates", taskState) + .setPage(PAGE) + .build() + ); + assertThat(jobQueryResult.getItemsList()).hasSize(1); + Job jobQueryResultItem = jobQueryResult.getItems(0); + assertThat(jobQueryResultItem.getId()).isEqualTo(expectedJobId); + + // Task + TaskQueryResult taskQueryResult = client.findTasks(TaskQuery.newBuilder() + .putFilteringCriteria("applicationName", "testSearchByTaskStateV3") + .putFilteringCriteria("taskStates", taskState) + .setPage(PAGE) + .build() + ); + assertThat(taskQueryResult.getItemsList()).hasSize(1); + assertThat(taskQueryResult.getItems(0).getId()).isEqualTo(expectedTaskId); } @Test(timeout = 30_000) - public void testSearchByTaskReasonInFinishedJobV3() throws Exception { + public void testSearchByTaskReasonInFinishedJobV3() { JobDescriptor jobDescriptor = JobFunctions.changeBatchJobSize(oneTaskBatchJobDescriptor(), 2); - jobsScenarioBuilder.schedule(jobDescriptor, 1, jobScenarioBuilder -> jobScenarioBuilder + String jobId = jobsScenarioBuilder.scheduleAndReturnJob(jobDescriptor, jobScenarioBuilder -> jobScenarioBuilder .template(ScenarioTemplates.launchJob()) .inTask(0, TaskScenarioBuilder::failTaskExecution) .inTask(1, taskScenarioBuilder -> taskScenarioBuilder.template(ScenarioTemplates.completeTask())) .expectJobUpdateEvent(job -> job.getStatus().getState() == JobState.Finished, "Expected job to complete") - .findTasks(TaskQuery.newBuilder() - .putFilteringCriteria("jobIds", jobsScenarioBuilder.takeJobId(0)) - .putFilteringCriteria("taskStates", TaskStatus.TaskState.Finished.name()) - .putFilteringCriteria("taskStateReasons", "normal") - .setPage(PAGE) - .build(), - tasks -> tasks.size() == 1 && tasks.get(0).getStatus().getReasonCode().equals("normal") - ) - .findTasks(TaskQuery.newBuilder() - .putFilteringCriteria("jobIds", jobsScenarioBuilder.takeJobId(0)) - .putFilteringCriteria("taskStates", TaskStatus.TaskState.Finished.name()) - .putFilteringCriteria("taskStateReasons", "failed") - .setPage(PAGE) - .build(), - tasks -> tasks.size() == 1 && tasks.get(0).getStatus().getReasonCode().equals("failed") - ) - ); - } + ).getId(); - private void testSearchByTaskState(String taskState, String expectedJobId, String expectedTaskId) { - // Job - JobQueryResult jobQueryResult = client.findJobs(JobQuery.newBuilder().putFilteringCriteria("taskStates", taskState).setPage(PAGE).build()); - assertThat(jobQueryResult.getItemsList()).hasSize(1); - Job jobQueryResultItem = jobQueryResult.getItems(0); - assertThat(jobQueryResultItem.getId()).isEqualTo(expectedJobId); + List task0List = client.findTasks(TaskQuery.newBuilder() + .putFilteringCriteria("jobIds", jobId) + .putFilteringCriteria("taskStates", TaskStatus.TaskState.Finished.name()) + .putFilteringCriteria("taskStateReasons", "failed") + .setPage(PAGE) + .build() + ).getItemsList(); + assertThat(task0List).hasSize(1); + assertThat(task0List.get(0).getStatus().getReasonCode()).isEqualTo("failed"); - // Task - TaskQueryResult taskQueryResult = client.findTasks(TaskQuery.newBuilder().putFilteringCriteria("taskStates", taskState).setPage(PAGE).build()); - assertThat(taskQueryResult.getItemsList()).hasSize(1); - assertThat(taskQueryResult.getItems(0).getId()).isEqualTo(expectedTaskId); + List task1List = client.findTasks(TaskQuery.newBuilder() + .putFilteringCriteria("jobIds", jobId) + .putFilteringCriteria("taskStates", TaskStatus.TaskState.Finished.name()) + .putFilteringCriteria("taskStateReasons", "normal") + .setPage(PAGE) + .build() + ).getItemsList(); + assertThat(task1List).hasSize(1); + assertThat(task1List.get(0).getStatus().getReasonCode()).isEqualTo("normal"); } @Test(timeout = 30_000) - public void testSearchByOwnerV3() throws Exception { - JobDescriptor jobDescriptor1 = oneTaskBatchJobDescriptor().toBuilder().withOwner( - JobModel.newOwner().withTeamEmail("user1@netflix.com").build() - ).build(); - JobDescriptor jobDescriptor2 = oneTaskBatchJobDescriptor().toBuilder().withOwner( - JobModel.newOwner().withTeamEmail("user2@netflix.com").build() - ).build(); - testSearchByAttributeValue(jobDescriptor1, jobDescriptor2, "owner", "user1@netflix.com", "user2@netflix.com"); + public void testSearchByOwnerV3() { + testBatchSearchBy("owner", BATCH_OWNER); + testServiceSearchBy("owner", SERVICE_OWNER); } @Test(timeout = 30_000) - public void testSearchByAppNameV3() throws Exception { - JobDescriptor jobDescriptor1 = batchJobDescriptors().getValue().toBuilder().withApplicationName(V3_ENGINE_APP).build(); - JobDescriptor jobDescriptor2 = batchJobDescriptors().getValue().toBuilder().withApplicationName(V3_ENGINE_APP2).build(); - testSearchByAttributeValue(jobDescriptor1, jobDescriptor2, "appName", V3_ENGINE_APP, V3_ENGINE_APP2); + public void testSearchByAppNameV3() { + testBatchSearchBy("appName", BATCH_APPLICATION); + testServiceSearchBy("appName", SERVICE_APPLICATION); } @Test(timeout = 30_000) - public void testSearchByApplicationNameV3() throws Exception { - JobDescriptor jobDescriptor1 = batchJobDescriptors().getValue().toBuilder().withApplicationName(V3_ENGINE_APP).build(); - JobDescriptor jobDescriptor2 = batchJobDescriptors().getValue().toBuilder().withApplicationName(V3_ENGINE_APP2).build(); - testSearchByAttributeValue(jobDescriptor1, jobDescriptor2, "applicationName", V3_ENGINE_APP, V3_ENGINE_APP2); + public void testSearchByApplicationNameV3() { + testBatchSearchBy("applicationName", BATCH_APPLICATION); + testServiceSearchBy("applicationName", SERVICE_APPLICATION); } @Test(timeout = 30_000) - public void testSearchByCapacityGroupV3() throws Exception { - JobDescriptor jobDescriptor1 = oneTaskBatchJobDescriptor().toBuilder().withCapacityGroup("capacity1").build(); - JobDescriptor jobDescriptor2 = oneTaskBatchJobDescriptor().toBuilder().withCapacityGroup("capacity2").build(); - testSearchByAttributeValue(jobDescriptor1, jobDescriptor2, "capacityGroup", "capacity1", "capacity2"); + public void testSearchByCapacityGroupV3() { + testBatchSearchBy("capacityGroup", BATCH_CAPACITY_GROUP); + testServiceSearchBy("capacityGroup", SERVICE_CAPACITY_GROUP); } @Test(timeout = 30_000) - public void testSearchByJobGroupInfoV3() throws Exception { - JobDescriptor jobDescriptor1 = oneTaskBatchJobDescriptor().toBuilder() - .withJobGroupInfo(JobModel.newJobGroupInfo() - .withStack("stack1") - .withDetail("detail1") - .withSequence("001") - .build()) - .build(); - JobDescriptor jobDescriptor2 = oneTaskBatchJobDescriptor().toBuilder() - .withJobGroupInfo(JobModel.newJobGroupInfo() - .withStack("stack2") - .withDetail("detail2") - .withSequence("002") - .build()) - .build(); - testSearchByAttributeValue( - jobDescriptor1, - jobDescriptor2, - Triple.of("jobGroupStack", "stack1", "stack2"), - Triple.of("jobGroupDetail", "detail1", "detail2"), - Triple.of("jobGroupSequence", "001", "002") - ); + public void testSearchByJobGroupInfoV3() { + testBatchSearchBy("jobGroupStack", BATCH_JOB_GROUP_INFO.getStack()); + testBatchSearchBy("jobGroupDetail", BATCH_JOB_GROUP_INFO.getDetail()); + + testServiceSearchBy("jobGroupStack", SERVICE_JOB_GROUP_INFO.getStack()); + testServiceSearchBy("jobGroupDetail", SERVICE_JOB_GROUP_INFO.getDetail()); + + for (String jobId : CollectionsExt.merge(batchJobsWithCreatedTasks, serviceJobsWithCreatedTasks)) { + testSearchByJobGroupSequence( + jobId, + jobsScenarioBuilder.takeJob(jobId).getJob().getJobDescriptor().getJobGroupInfo().getSequence() + ); + } + } + + private void testBatchSearchBy(String queryKey, String queryValue) { + List batchJobs = client.findJobs(newJobQuery(queryKey, queryValue)).getItemsList(); + assertThat(batchJobs).hasSize(batchJobsWithCreatedTasks.size()); + assertThat(batchJobs.stream().map(Job::getId)).containsAll(batchJobsWithCreatedTasks); + } + + private void testServiceSearchBy(String queryKey, String queryValue) { + List serviceJobs = client.findJobs(newJobQuery(queryKey, queryValue)).getItemsList(); + assertThat(serviceJobs).hasSize(serviceJobsWithCreatedTasks.size()); + assertThat(serviceJobs.stream().map(Job::getId)).containsAll(serviceJobsWithCreatedTasks); + } + + private void testSearchByJobGroupSequence(String expectedJobId, String sequence) { + List jobIds = client.findJobs(newJobQuery("jobGroupSequence", sequence)).getItemsList(); + assertThat(jobIds).hasSize(1); + assertThat(jobIds.get(0).getId()).isEqualTo(expectedJobId); } @Test(timeout = 30_000) - public void testSearchByImageV3() throws Exception { - JobDescriptor jobDescriptor1 = oneTaskBatchJobDescriptor().but(j -> j.getContainer().toBuilder().withImage( - JobModel.newImage().withName("image1").withTag("tag1").build() - )); - JobDescriptor jobDescriptor2 = oneTaskBatchJobDescriptor().but(j -> j.getContainer().toBuilder().withImage( - JobModel.newImage().withName("image2").withTag("tag2").build() - )); - testSearchByAttributeValue(jobDescriptor1, jobDescriptor2, - Triple.of("imageName", "image1", "image2"), - Triple.of("imageTag", "tag1", "tag2") - ); + public void testSearchByImageV3() { + testBatchSearchBy("imageName", BATCH_IMAGE_NAME); + testBatchSearchBy("imageTag", BATCH_IMAGE_TAG); + testServiceSearchBy("imageName", SERVICE_IMAGE_NAME); + testServiceSearchBy("imageTag", SERVICE_IMAGE_TAG); } @Test(timeout = 30_000) - public void testSearchByJobDescriptorAttributesV3() throws Exception { + public void testSearchByJobDescriptorAttributesV3() { + List jobIds = new ArrayList<>(); for (int i = 0; i < 3; i++) { JobDescriptor jobDescriptor = oneTaskBatchJobDescriptor().toBuilder() + .withApplicationName("testSearchByJobDescriptorAttributesV3") .withAttributes(CollectionsExt.asMap( String.format("job%d.key1", i), "value1", String.format("job%d.key2", i), "value2" )) .build(); - jobsScenarioBuilder.schedule(jobDescriptor, jobScenarioBuilder -> jobScenarioBuilder.template(ScenarioTemplates.launchJob())); + String jobId = jobsScenarioBuilder.scheduleAndReturnJob(jobDescriptor, jobScenarioBuilder -> jobScenarioBuilder.template(ScenarioTemplates.launchJob())).getId(); + jobIds.add(jobId); } - String job0 = jobsScenarioBuilder.takeJob(0).getJobId(); - String task0 = jobsScenarioBuilder.takeJob(0).getTaskByIndex(0).getTask().getId(); - String job1 = jobsScenarioBuilder.takeJob(1).getJobId(); - String task1 = jobsScenarioBuilder.takeJob(1).getTaskByIndex(0).getTask().getId(); + String job0 = jobIds.get(0); + String task0 = jobsScenarioBuilder.takeJob(job0).getTaskByIndex(0).getTask().getId(); + String job1 = jobIds.get(1); + String task1 = jobsScenarioBuilder.takeJob(job1).getTaskByIndex(0).getTask().getId(); // Jobs + JobQuery.Builder jobQueryBuilder = JobQuery.newBuilder() + .putFilteringCriteria("applicationName", "testSearchByJobDescriptorAttributesV3") + .setPage(PAGE); assertContainsJobs( - client.findJobs(JobQuery.newBuilder() + client.findJobs(jobQueryBuilder .putFilteringCriteria("attributes", "job0.key1,job1.key1") .putFilteringCriteria("attributes.op", "or") - .setPage(PAGE).build() + .build() ), job0, job1 ); assertContainsJobs( - client.findJobs(JobQuery.newBuilder() + client.findJobs(jobQueryBuilder .putFilteringCriteria("attributes", "job0.key1:value1,job0.key1:value2") .putFilteringCriteria("attributes.op", "or") - .setPage(PAGE).build() + .build() ), job0 ); assertContainsJobs( - client.findJobs(JobQuery.newBuilder() + client.findJobs(jobQueryBuilder .putFilteringCriteria("attributes", "job0.key1:value1,job0.key2:value2") .putFilteringCriteria("attributes.op", "and") - .setPage(PAGE).build() + .build() ), job0 ); // Tasks + TaskQuery.Builder taskQueryBuilder = TaskQuery.newBuilder() + .putFilteringCriteria("applicationName", "testSearchByJobDescriptorAttributesV3") + .setPage(PAGE); assertContainsTasks( - client.findTasks(TaskQuery.newBuilder() + client.findTasks(taskQueryBuilder .putFilteringCriteria("attributes", "job0.key1,job1.key1") .putFilteringCriteria("attributes.op", "or") - .setPage(PAGE).build() + .build() ), task0, task1 ); assertContainsTasks( - client.findTasks(TaskQuery.newBuilder() + client.findTasks(taskQueryBuilder .putFilteringCriteria("attributes", "job0.key1:value1,job0.key1:value2") .putFilteringCriteria("attributes.op", "or") - .setPage(PAGE).build() + .build() ), task0 ); assertContainsTasks( - client.findTasks(TaskQuery.newBuilder() + client.findTasks(taskQueryBuilder .putFilteringCriteria("attributes", "job0.key1:value1,job0.key2:value2") .putFilteringCriteria("attributes.op", "and") - .setPage(PAGE).build() + .build() ), task0 ); } @Test(timeout = 30_000) - public void testSearchByCellV3() throws Exception { + public void testSearchByCellV3() { final int numberOfJobs = 3; String[] expectedJobIds = new String[numberOfJobs]; String[] expectedTaskIds = new String[numberOfJobs]; for (int i = 0; i < numberOfJobs; i++) { - jobsScenarioBuilder.schedule(oneTaskBatchJobDescriptor(), jobScenarioBuilder -> jobScenarioBuilder.template(ScenarioTemplates.launchJob())); - expectedJobIds[i] = jobsScenarioBuilder.takeJob(i).getJobId(); - expectedTaskIds[i] = jobsScenarioBuilder.takeJob(i).getTaskByIndex(0).getTask().getId(); + String jobId = jobsScenarioBuilder.scheduleAndReturnJob( + oneTaskBatchJobDescriptor().toBuilder().withApplicationName("testSearchByCellV3").build(), + jobScenarioBuilder -> jobScenarioBuilder.template(ScenarioTemplates.launchJob()) + ).getId(); + expectedJobIds[i] = jobId; + expectedTaskIds[i] = jobsScenarioBuilder.takeJob(jobId).getTaskByIndex(0).getTask().getId(); } // Jobs - JobQueryResult jobs1 = client.findJobs(JobQuery.newBuilder() + JobQuery.Builder jobQueryBuilder = JobQuery.newBuilder() + .putFilteringCriteria("applicationName", "testSearchByCellV3") + .setPage(PAGE); + JobQueryResult jobs1 = client.findJobs(jobQueryBuilder .putFilteringCriteria("attributes", "titus.cell,titus.stack") .putFilteringCriteria("attributes.op", "or") - .setPage(PAGE) .build() ); assertContainsJobs(jobs1, expectedJobIds); jobs1.getItemsList().forEach(job -> assertCellInfo(job, EmbeddedTitusMaster.CELL_NAME)); - JobQueryResult jobs2 = client.findJobs(JobQuery.newBuilder() + JobQueryResult jobs2 = client.findJobs(jobQueryBuilder .putFilteringCriteria("attributes", "titus.cell") .putFilteringCriteria("attributes.op", "and") - .setPage(PAGE) .build() ); assertContainsJobs(jobs2, expectedJobIds); jobs2.getItemsList().forEach(job -> assertCellInfo(job, EmbeddedTitusMaster.CELL_NAME)); - JobQueryResult jobs3 = client.findJobs(JobQuery.newBuilder() + JobQueryResult jobs3 = client.findJobs(jobQueryBuilder .putFilteringCriteria("attributes", String.format("titus.cell:%1$s,titus.stack:%1$s", EmbeddedTitusMaster.CELL_NAME)) .putFilteringCriteria("attributes.op", "or") - .setPage(PAGE) .build() ); assertContainsJobs(jobs3, expectedJobIds); jobs3.getItemsList().forEach(job -> assertCellInfo(job, EmbeddedTitusMaster.CELL_NAME)); - JobQueryResult jobs4 = client.findJobs(JobQuery.newBuilder() + JobQueryResult jobs4 = client.findJobs(jobQueryBuilder .putFilteringCriteria("attributes", String.format("titus.cell:%1$s", EmbeddedTitusMaster.CELL_NAME)) .putFilteringCriteria("attributes.op", "and") - .setPage(PAGE) .build() ); assertContainsJobs(jobs4, expectedJobIds); jobs4.getItemsList().forEach(job -> assertCellInfo(job, EmbeddedTitusMaster.CELL_NAME)); // Tasks - TaskQueryResult tasks1 = client.findTasks(TaskQuery.newBuilder() + TaskQuery.Builder taskQueryBuilder = TaskQuery.newBuilder() + .putFilteringCriteria("applicationName", "testSearchByCellV3") + .setPage(PAGE); + TaskQueryResult tasks1 = client.findTasks(taskQueryBuilder .putFilteringCriteria("attributes", "titus.cell,titus.stack") .putFilteringCriteria("attributes.op", "or") - .setPage(PAGE) .build() ); assertContainsTasks(tasks1, expectedTaskIds); tasks1.getItemsList().forEach(task -> assertCellInfo(task, EmbeddedTitusMaster.CELL_NAME)); - TaskQueryResult tasks2 = client.findTasks(TaskQuery.newBuilder() + TaskQueryResult tasks2 = client.findTasks(taskQueryBuilder .putFilteringCriteria("attributes", "titus.cell") .putFilteringCriteria("attributes.op", "and") - .setPage(PAGE) .build() ); assertContainsTasks(tasks2, expectedTaskIds); tasks2.getItemsList().forEach(task -> assertCellInfo(task, EmbeddedTitusMaster.CELL_NAME)); - TaskQueryResult tasks3 = client.findTasks(TaskQuery.newBuilder() + TaskQueryResult tasks3 = client.findTasks(taskQueryBuilder .putFilteringCriteria("attributes", String.format("titus.cell:%1$s,titus.stack:%1$s", EmbeddedTitusMaster.CELL_NAME)) .putFilteringCriteria("attributes.op", "or") - .setPage(PAGE).build() + .build() ); assertContainsTasks(tasks3, expectedTaskIds); tasks3.getItemsList().forEach(task -> assertCellInfo(task, EmbeddedTitusMaster.CELL_NAME)); - final TaskQueryResult tasks4 = client.findTasks(TaskQuery.newBuilder() + final TaskQueryResult tasks4 = client.findTasks(taskQueryBuilder .putFilteringCriteria("attributes", String.format("titus.cell:%1$s", EmbeddedTitusMaster.CELL_NAME)) .putFilteringCriteria("attributes.op", "and") - .setPage(PAGE).build() + .build() ); assertContainsTasks(tasks4, expectedTaskIds); tasks4.getItemsList().forEach(task -> assertCellInfo(task, EmbeddedTitusMaster.CELL_NAME)); } - private void testSearchByAttributeValue(JobDescriptor jobDescriptor1, - JobDescriptor jobDescriptor2, - String attributeName, - String job1Value, - String job2Value) throws Exception { - testSearchByAttributeValue(jobDescriptor1, jobDescriptor2, Triple.of(attributeName, job1Value, job2Value)); - } - - private void testSearchByAttributeValue(JobDescriptor jobDescriptor1, - JobDescriptor jobDescriptor2, - Triple... attributeValue1Value2Triples) throws Exception { - jobsScenarioBuilder.schedule(jobDescriptor1, jobScenarioBuilder -> jobScenarioBuilder.template(ScenarioTemplates.launchJob())); - jobsScenarioBuilder.schedule(jobDescriptor2, jobScenarioBuilder -> jobScenarioBuilder.template(ScenarioTemplates.launchJob())); - - String job0 = jobsScenarioBuilder.takeJob(0).getJobId(); - String task0 = jobsScenarioBuilder.takeJob(0).getTaskByIndex(0).getTask().getId(); - String job1 = jobsScenarioBuilder.takeJob(1).getJobId(); - String task1 = jobsScenarioBuilder.takeJob(1).getTaskByIndex(0).getTask().getId(); - - for (Triple next : attributeValue1Value2Triples) { - String attributeName = next.getFirst(); - String job1Value = next.getSecond(); - String job2Value = next.getThird(); - - // Jobs - JobQueryResult jobQueryResult1 = client.findJobs(JobQuery.newBuilder().putFilteringCriteria(attributeName, job1Value).setPage(PAGE).build()); - assertThat(jobQueryResult1.getItemsList()).hasSize(1); - final Job jobQueryResult1Item = jobQueryResult1.getItems(0); - assertThat(jobQueryResult1Item.getId()).isEqualTo(job0); - - JobQueryResult jobQueryResult2 = client.findJobs(JobQuery.newBuilder().putFilteringCriteria(attributeName, job2Value).setPage(PAGE).build()); - assertThat(jobQueryResult2.getItemsList()).hasSize(1); - final Job jobQueryResult2Item = jobQueryResult2.getItems(0); - assertThat(jobQueryResult2Item.getId()).isEqualTo(job1); - - // Tasks - TaskQueryResult taskQueryResult1 = client.findTasks(TaskQuery.newBuilder().putFilteringCriteria(attributeName, job1Value).setPage(PAGE).build()); - assertThat(taskQueryResult1.getItemsList()).hasSize(1); - assertThat(taskQueryResult1.getItems(0).getId()).isEqualTo(task0); - - TaskQueryResult taskQueryResult2 = client.findTasks(TaskQuery.newBuilder().putFilteringCriteria(attributeName, job2Value).setPage(PAGE).build()); - assertThat(taskQueryResult2.getItemsList()).hasSize(1); - assertThat(taskQueryResult2.getItems(0).getId()).isEqualTo(task1); - } - } - @Test(timeout = 30_000) - public void testPagination() throws Exception { - // Create a mix of batch and service jobs. - jobsScenarioBuilder.schedule(oneTaskBatchJobDescriptor(), 3, jobScenarioBuilder -> jobScenarioBuilder.template(ScenarioTemplates.startJob(TaskStatus.TaskState.Started))); - jobsScenarioBuilder.schedule(oneTaskServiceJobDescriptor(), 3, jobScenarioBuilder -> jobScenarioBuilder.template(ScenarioTemplates.startJob(TaskStatus.TaskState.Started))); - + public void testPagination() { + // We have 3 batch and 3 service jobs. Page firstPageOf5 = Page.newBuilder().setPageNumber(0).setPageSize(5).build(); Page secondPageOf5 = Page.newBuilder().setPageNumber(1).setPageSize(5).build(); // Jobs - JobQueryResult jobQueryResult = client.findJobs(JobQuery.newBuilder().setPage(firstPageOf5).build()); + JobQuery.Builder jobQueryBuilder = JobQuery.newBuilder().putFilteringCriteria("attributes", PRE_CREATED_JOBS_LABEL); + + JobQueryResult jobQueryResult = client.findJobs(jobQueryBuilder.setPage(firstPageOf5).build()); assertThat(jobQueryResult.getItemsList()).hasSize(5); checkPage(jobQueryResult.getPagination(), firstPageOf5, 2, 6, true); - JobQueryResult jobQueryResult2 = client.findJobs(JobQuery.newBuilder().setPage(secondPageOf5).build()); + JobQueryResult jobQueryResult2 = client.findJobs(jobQueryBuilder.setPage(secondPageOf5).build()); assertThat(jobQueryResult2.getItemsList()).hasSize(1); checkPage(jobQueryResult2.getPagination(), secondPageOf5, 2, 6, false); @@ -572,11 +638,12 @@ public void testPagination() throws Exception { assertThat(foundJobIds).hasSize(6); // Tasks - TaskQueryResult taskQueryResult = client.findTasks(TaskQuery.newBuilder().setPage(firstPageOf5).build()); + TaskQuery.Builder taskQueryBuilder = TaskQuery.newBuilder().putFilteringCriteria("attributes", PRE_CREATED_JOBS_LABEL); + TaskQueryResult taskQueryResult = client.findTasks(taskQueryBuilder.setPage(firstPageOf5).build()); assertThat(taskQueryResult.getItemsList()).hasSize(5); checkPage(taskQueryResult.getPagination(), firstPageOf5, 2, 6, true); - TaskQueryResult taskQueryResult2 = client.findTasks(TaskQuery.newBuilder().setPage(secondPageOf5).build()); + TaskQueryResult taskQueryResult2 = client.findTasks(taskQueryBuilder.setPage(secondPageOf5).build()); assertThat(taskQueryResult2.getItemsList()).hasSize(1); checkPage(taskQueryResult2.getPagination(), secondPageOf5, 2, 6, false); @@ -594,29 +661,34 @@ private void checkPage(Pagination pagination, Page current, int totalPages, int } @Test(timeout = 30_000) - public void testFieldsFiltering() throws Exception { + public void testFieldsFiltering() { JobDescriptor jobDescriptor = oneTaskBatchJobDescriptor().toBuilder() + .withApplicationName("testFieldsFiltering") .withAttributes(ImmutableMap.of("keyA", "valueA", "keyB", "valueB")) .build(); jobsScenarioBuilder.schedule(jobDescriptor, jobScenarioBuilder -> jobScenarioBuilder.template(ScenarioTemplates.startTasks())); // Check jobs - List foundJobs = client.findJobs(JobQuery.newBuilder().setPage(PAGE) + List foundJobs = client.findJobs(JobQuery.newBuilder() + .putFilteringCriteria("applicationName", "testFieldsFiltering") .addFields("status") .addFields("jobDescriptor.attributes.keyA") + .setPage(PAGE) .build() ).getItemsList(); assertThat(foundJobs).hasSize(1); assertThat(foundJobs.get(0).getId()).isNotEmpty(); // Always present assertThat(foundJobs.get(0).getStatus().getReasonMessage()).isNotEmpty(); - final com.netflix.titus.grpc.protogen.JobDescriptor foundJobDescriptor = foundJobs.get(0).getJobDescriptor(); + com.netflix.titus.grpc.protogen.JobDescriptor foundJobDescriptor = foundJobs.get(0).getJobDescriptor(); assertThat(foundJobDescriptor.getAttributesMap()).isNotEmpty(); assertThat(foundJobDescriptor.getAttributesMap()).containsEntry("keyA", "valueA"); // Check tasks - List foundTasks = client.findTasks(TaskQuery.newBuilder().setPage(PAGE) + List foundTasks = client.findTasks(TaskQuery.newBuilder() + .putFilteringCriteria("applicationName", "testFieldsFiltering") .addFields("status") .addFields("statusHistory") + .setPage(PAGE) .build() ).getItemsList(); assertThat(foundTasks).hasSize(1); @@ -626,6 +698,13 @@ public void testFieldsFiltering() throws Exception { assertThat(foundTasks.get(0).getTaskContextMap()).isEmpty(); } + private JobQuery newJobQuery(String... criteria) { + return JobQuery.newBuilder() + .putAllFilteringCriteria(CollectionsExt.asMap(criteria)) + .setPage(PAGE) + .build(); + } + private void assertContainsJobs(JobQueryResult queryResult, String... jobIds) { assertThat(queryResult.getItemsCount()).isEqualTo(jobIds.length); Set returnedJobIds = queryResult.getItemsList().stream().map(Job::getId).collect(Collectors.toSet()); diff --git a/titus-server-master/src/test/java/com/netflix/titus/master/integration/v3/job/JobDisruptionBudgetTest.java b/titus-server-master/src/test/java/com/netflix/titus/master/integration/v3/job/JobDisruptionBudgetTest.java index 78f6d86c2d..f54e30bd34 100644 --- a/titus-server-master/src/test/java/com/netflix/titus/master/integration/v3/job/JobDisruptionBudgetTest.java +++ b/titus-server-master/src/test/java/com/netflix/titus/master/integration/v3/job/JobDisruptionBudgetTest.java @@ -69,7 +69,7 @@ public void setUp() throws Exception { } @Test(timeout = TEST_TIMEOUT_MS) - public void testDisruptionBudgetUpdate() throws Exception { + public void testDisruptionBudgetUpdate() { JobDescriptor jobWithSelfManaged = changeDisruptionBudget(oneTaskServiceJobDescriptor(), NUMBER_OF_HEALTHY); jobsScenarioBuilder.schedule(jobWithSelfManaged, jobScenarioBuilder -> jobScenarioBuilder diff --git a/titus-server-master/src/test/java/com/netflix/titus/master/integration/v3/scenario/JobsScenarioBuilder.java b/titus-server-master/src/test/java/com/netflix/titus/master/integration/v3/scenario/JobsScenarioBuilder.java index e526e0060f..262b8c9fd8 100644 --- a/titus-server-master/src/test/java/com/netflix/titus/master/integration/v3/scenario/JobsScenarioBuilder.java +++ b/titus-server-master/src/test/java/com/netflix/titus/master/integration/v3/scenario/JobsScenarioBuilder.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -110,11 +111,16 @@ public void stop() { } public JobsScenarioBuilder schedule(JobDescriptor jobDescriptor, - Function jobScenario) throws Exception { + Function jobScenario) { TestStreamObserver responseObserver = new TestStreamObserver<>(); client.createJob(GrpcJobManagementModelConverters.toGrpcJobDescriptor(jobDescriptor), responseObserver); - JobId jobId = responseObserver.takeNext(TIMEOUT_MS, TimeUnit.MILLISECONDS); + JobId jobId; + try { + jobId = responseObserver.takeNext(TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (Exception e) { + throw new IllegalStateException(e); + } Preconditions.checkNotNull(jobId, "Job create operation not completed in time"); TestStreamObserver eventStream = new TestStreamObserver<>(); @@ -142,6 +148,15 @@ public JobsScenarioBuilder schedule(JobDescriptor jobDescriptor, return this; } + public Job scheduleAndReturnJob(JobDescriptor jobDescriptor, + Function jobScenario) { + AtomicReference jobRef = new AtomicReference<>(); + schedule(jobDescriptor, js -> jobScenario.apply(js).inJob(jobRef::set)); + + Preconditions.checkNotNull(jobRef.get(), "Job not set after scheduling"); + return jobRef.get(); + } + public JobScenarioBuilder takeJob(String jobId) { return jobScenarioBuilders.stream().filter(j -> j.getJobId().equals(jobId)).findFirst().orElseThrow(() -> new IllegalArgumentException("Job not found: " + jobId)); } @@ -159,6 +174,10 @@ public String takeTaskId(int jobIdx, int taskIdx) { return takeJob(jobIdx).getTaskByIndex(taskIdx).getTask().getId(); } + public String takeTaskId(String jobId, int taskIdx) { + return takeJob(jobId).getTaskByIndex(taskIdx).getTask().getId(); + } + public JobsScenarioBuilder assertJobs(Predicate> predicate) { List jobs = jobScenarioBuilders.stream().map(JobScenarioBuilder::getJob).collect(Collectors.toList()); Preconditions.checkState(predicate.test(jobs), "Jobs collection predicate evaluation fails (job size=%s)", jobs.size()); diff --git a/titus-testkit/src/main/java/com/netflix/titus/testkit/embedded/cell/EmbeddedTitusCells.java b/titus-testkit/src/main/java/com/netflix/titus/testkit/embedded/cell/EmbeddedTitusCells.java index ec17925b48..90b3e871d8 100644 --- a/titus-testkit/src/main/java/com/netflix/titus/testkit/embedded/cell/EmbeddedTitusCells.java +++ b/titus-testkit/src/main/java/com/netflix/titus/testkit/embedded/cell/EmbeddedTitusCells.java @@ -34,9 +34,15 @@ public static EmbeddedTitusCell basicCell(int desired) { return basicCell(EmbeddedTitusMaster.CELL_NAME, desired); } + public static EmbeddedTitusCell basicCell(SimulatedCloud simulatedCloud) { + return basicCell(EmbeddedTitusMaster.CELL_NAME, simulatedCloud); + } + public static EmbeddedTitusCell basicCell(String cellName, int desired) { - SimulatedCloud simulatedCloud = SimulatedClouds.basicCloud(desired); + return basicCell(cellName, SimulatedClouds.basicCloud(desired)); + } + private static EmbeddedTitusCell basicCell(String cellName, SimulatedCloud simulatedCloud) { return EmbeddedTitusCell.aTitusCell() .withMaster(EmbeddedTitusMasters.basicMaster(simulatedCloud).toBuilder() .withCellName(cellName) diff --git a/titus-testkit/src/main/java/com/netflix/titus/testkit/embedded/cell/gateway/EmbeddedTitusGateway.java b/titus-testkit/src/main/java/com/netflix/titus/testkit/embedded/cell/gateway/EmbeddedTitusGateway.java index 30c7cf5dd1..79b9cd76ba 100644 --- a/titus-testkit/src/main/java/com/netflix/titus/testkit/embedded/cell/gateway/EmbeddedTitusGateway.java +++ b/titus-testkit/src/main/java/com/netflix/titus/testkit/embedded/cell/gateway/EmbeddedTitusGateway.java @@ -18,8 +18,10 @@ import java.util.Collections; import java.util.Properties; +import java.util.concurrent.TimeUnit; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.google.inject.AbstractModule; import com.google.inject.Module; import com.google.inject.Provides; @@ -125,6 +127,7 @@ public int getGrpcPort() { } public EmbeddedTitusGateway boot() { + Stopwatch timer = Stopwatch.createStarted(); logger.info("Starting Titus Gateway"); injector = InjectorBuilder.fromModules( @@ -164,6 +167,9 @@ public AdmissionSanitizer getJobSanitizer(TitusValidatorConfigura } }) ).createInjector(); + + logger.info("Embedded TitusGateway started in {}ms", timer.elapsed(TimeUnit.MILLISECONDS)); + return this; } diff --git a/titus-testkit/src/main/java/com/netflix/titus/testkit/embedded/cell/master/EmbeddedTitusMaster.java b/titus-testkit/src/main/java/com/netflix/titus/testkit/embedded/cell/master/EmbeddedTitusMaster.java index a3d14bf301..7bc3ceeb5d 100644 --- a/titus-testkit/src/main/java/com/netflix/titus/testkit/embedded/cell/master/EmbeddedTitusMaster.java +++ b/titus-testkit/src/main/java/com/netflix/titus/testkit/embedded/cell/master/EmbeddedTitusMaster.java @@ -201,7 +201,7 @@ public EmbeddedTitusMaster boot() { opportunisticCpuAvailability.clear(); injector = InjectorBuilder.fromModules( - Modules.override(new TitusRuntimeModule()).with(new AbstractModule() { + Modules.override(new TitusRuntimeModule(false)).with(new AbstractModule() { @Override protected void configure() { bind(Archaius2ConfigurationLogger.class).asEagerSingleton(); diff --git a/titus-testkit/src/main/java/com/netflix/titus/testkit/embedded/cloud/SimulatedClouds.java b/titus-testkit/src/main/java/com/netflix/titus/testkit/embedded/cloud/SimulatedClouds.java index 54dc71b98f..3bb61bc4fc 100644 --- a/titus-testkit/src/main/java/com/netflix/titus/testkit/embedded/cloud/SimulatedClouds.java +++ b/titus-testkit/src/main/java/com/netflix/titus/testkit/embedded/cloud/SimulatedClouds.java @@ -34,6 +34,16 @@ public static SimulatedCloud basicCloud(int desired) { return simulatedCloud; } + public static SimulatedCloud basicCloudWithLargeInstances(int desired) { + SimulatedCloud simulatedCloud = new SimulatedCloud(); + simulatedCloud.createAgentInstanceGroups( + SimulatedAgentGroupDescriptor.awsInstanceGroup("critical1", AwsInstanceType.M5_Metal, desired), + SimulatedAgentGroupDescriptor.awsInstanceGroup("flex1", AwsInstanceType.R5_Metal, desired), + SimulatedAgentGroupDescriptor.awsInstanceGroup("flexGpu", AwsInstanceType.P3_16XLarge, desired) + ); + return simulatedCloud; + } + public static SimulatedCloud twoPartitionsPerTierStack(int partitionDesired) { SimulatedCloud simulatedCloud = new SimulatedCloud(); simulatedCloud.createAgentInstanceGroups(