diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/DataDroppedPayloadSetter.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/DataDroppedPayloadSetter.java index d8efd9cd8..45bb82da0 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/DataDroppedPayloadSetter.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/DataDroppedPayloadSetter.java @@ -79,17 +79,17 @@ protected void setPayload(final long intervalSecs) { if (dropped != null) totalDropped += dropped.value(); else - logger.warn("Unexpected to get null dropped counter for metric " + m.getMetricGroupId().id()); + logger.warn("Unexpected to get null dropped counter for metric {}", m.getMetricGroupId().id()); if (onNext != null) totalOnNext += onNext.value(); else - logger.warn("Unexpected to get null onNext counter for metric " + m.getMetricGroupId().id()); + logger.warn("Unexpected to get null onNext counter for metric {}", m.getMetricGroupId().id()); } final StatusPayloads.DataDropCounts dataDrop = new StatusPayloads.DataDropCounts(totalOnNext, totalDropped); try { heartbeat.addSingleUsePayload("" + StatusPayloads.Type.IncomingDataDrop, objectMapper.writeValueAsString(dataDrop)); } catch (JsonProcessingException e) { - logger.warn("Error writing json for dataDrop payload: " + e.getMessage()); + logger.warn("Error writing json for dataDrop payload: {}", e.getMessage()); } dropCountGauge.set(dataDrop.getDroppedCount()); onNextCountGauge.set(dataDrop.getOnNextCount()); diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/DownloadJob.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/DownloadJob.java index 87670fda8..740eef51d 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/DownloadJob.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/DownloadJob.java @@ -30,9 +30,9 @@ public class DownloadJob { private static final Logger logger = LoggerFactory.getLogger(DownloadJob.class); - private URL jobArtifactUrl; - private String jobName; - private String locationToStore; + private final URL jobArtifactUrl; + private final String jobName; + private final String locationToStore; public DownloadJob( URL jobArtifactUrl, String jobName, @@ -49,9 +49,9 @@ public static void main(String[] args) throws MalformedURLException { System.exit(1); } - logger.info("parameters, jobArtifactUrl: " + args[0]); - logger.info("parameters, jobName: " + args[1]); - logger.info("parameters, locationToStore: " + args[2]); + logger.info("parameters, jobArtifactUrl: {}", args[0]); + logger.info("parameters, jobName: {}", args[1]); + logger.info("parameters, locationToStore: {}", args[2]); new DownloadJob(new URL(args[0]), args[1], args[2]).execute(); } @@ -63,21 +63,21 @@ public void execute() { Path path = Paths.get(locationToStore, jobName, "lib"); - logger.info("Started writing job to tmp directory: " + path); + logger.info("Started writing job to tmp directory: {}", path); // download file to /tmp, then add file location try (InputStream is = jobArtifactUrl.openStream()) { Files.createDirectories(path); try (OutputStream os = Files.newOutputStream(Paths.get(path.toString(), jarName))) { byte[] bytes = new byte[2048]; - int read = 0; + int read; while ((read = is.read(bytes)) >= 0) { os.write(bytes, 0, read); } } } catch (IOException e1) { - logger.error("Failed to write job to local store at path: " + path, e1); + logger.error("Failed to write job to local store at path: {}", path, e1); throw new RuntimeException(e1); } - logger.info("Finished writing job to tmp directory: " + path); + logger.info("Finished writing job to tmp directory: {}", path); } } diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/ExecuteStageRequestService.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/ExecuteStageRequestService.java index 7bbb57a43..7f7faef54 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/ExecuteStageRequestService.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/ExecuteStageRequestService.java @@ -70,14 +70,10 @@ public ExecuteStageRequestService( public void start() { subscription = executeStageRequestObservable // map to request with status observer - .map(new Func1() { - @Override - public TrackedExecuteStageRequest call( - WrappedExecuteStageRequest executeRequest) { - PublishSubject statusSubject = PublishSubject.create(); - tasksStatusObserver.onNext(statusSubject); - return new TrackedExecuteStageRequest(executeRequest, statusSubject); - } + .map(executeRequest -> { + PublishSubject statusSubject = PublishSubject.create(); + tasksStatusObserver.onNext(statusSubject); + return new TrackedExecuteStageRequest(executeRequest, statusSubject); }) // get provider from jar, return tracked MantisJob .flatMap(new Func1>() { @@ -98,7 +94,7 @@ public Observable call(TrackedExecuteStageRequest executeReque cl = userCodeClassLoader.asClassLoader(); if (jobProviderClass.isPresent()) { - logger.info("loading job main class " + jobProviderClass.get()); + logger.info("loading job main class {}", jobProviderClass.get()); final MantisJobProvider jobProvider = InstantiationUtil.instantiate( jobProviderClass.get(), MantisJobProvider.class, cl); mantisJob = jobProvider.getJobInstance(); @@ -143,7 +139,7 @@ public void onError(Throwable e) { @Override public void onNext(final ExecutionDetails executionDetails) { - logger.info("Executing stage for job ID: " + executionDetails.getExecuteStageRequest().getRequest().getJobId()); + logger.info("Executing stage for job ID: {}", executionDetails.getExecuteStageRequest().getRequest().getJobId()); Thread t = new Thread("mantis-worker-thread-" + executionDetails.getExecuteStageRequest().getRequest().getJobId()) { @Override public void run() { diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/ExecutionDetails.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/ExecutionDetails.java index f109aba60..6724d0e94 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/ExecutionDetails.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/ExecutionDetails.java @@ -27,11 +27,11 @@ @SuppressWarnings("rawtypes") // suppressed due to unknown mantis job typ public class ExecutionDetails { - private ClassLoader classLoader; - private WrappedExecuteStageRequest executeStageRequest; - private Observer status; - private Job mantisJob; - private List parameters; + private final ClassLoader classLoader; + private final WrappedExecuteStageRequest executeStageRequest; + private final Observer status; + private final Job mantisJob; + private final List parameters; public ExecutionDetails(WrappedExecuteStageRequest executeStageRequest, Observer status, Job mantisJob, ClassLoader classLoader, List parameters) { diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/Heartbeat.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/Heartbeat.java index 3de7348c2..f6fa9e535 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/Heartbeat.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/Heartbeat.java @@ -57,7 +57,7 @@ class Heartbeat { } void setPayload(String name, String value) { - logger.info("Setting payload " + name); + logger.info("Setting payload {}", name); if (name != null && !name.isEmpty() && value != null) payloads.put(name, value); } @@ -73,9 +73,9 @@ void addSingleUsePayload(String name, String value) { Status getCurrentHeartbeatStatus() { List payloadList = new ArrayList<>(); - logger.debug("#Payloads = " + payloads.size()); + logger.debug("#Payloads = {}", payloads.size()); for (Map.Entry entry : payloads.entrySet()) { - logger.debug("Adding payload " + entry.getKey() + " with value " + entry.getValue()); + logger.debug("Adding payload {} with value {}", entry.getKey(), entry.getValue()); payloadList.add(new Status.Payload(entry.getKey(), entry.getValue())); } List singleUsePlds = new ArrayList<>(); diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/MantisWorker.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/MantisWorker.java index 96cf00bfb..5345b33c0 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/MantisWorker.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/MantisWorker.java @@ -60,12 +60,12 @@ public class MantisWorker extends BaseService { private static final Logger logger = LoggerFactory.getLogger(MantisWorker.class); @Argument(alias = "p", description = "Specify a configuration file", required = false) private static String propFile = "worker.properties"; - private CountDownLatch blockUntilShutdown = new CountDownLatch(1); + private final CountDownLatch blockUntilShutdown = new CountDownLatch(1); + private final List mantisServices = new LinkedList<>(); // static { // RxNetty.useNativeTransportIfApplicable(); // } - private List mantisServices = new LinkedList(); public MantisWorker(ConfigurationFactory configFactory, io.mantisrx.server.master.client.config.ConfigurationFactory coreConfigFactory) { this(configFactory, Optional.empty()); @@ -102,12 +102,7 @@ public String toString() { final MantisMasterGateway gateway = highAvailabilityServices.getMasterClientApi(); // shutdown hook - Thread t = new Thread() { - @Override - public void run() { - shutdown(); - } - }; + Thread t = new Thread(this::shutdown); t.setDaemon(true); Runtime.getRuntime().addShutdownHook(t); @@ -209,7 +204,7 @@ private static Properties loadProperties(String propFile) { * * @param resourceName the name of the resource. It can either be a file name, or a path. * @return An {@link java.io.InputStream} instance that represents the found resource. Null otherwise. - * @throws java.io.FileNotFoundException + * @throws java.io.FileNotFoundException if the resource is not found. */ private static InputStream findResourceAsStream(String resourceName) throws FileNotFoundException { File resource = new File(resourceName); @@ -242,7 +237,7 @@ public static void main(String[] args) { worker.start(); } catch (Exception e) { // unexpected to get runtime exception, will exit - logger.error("Unexpected error: " + e.getMessage(), e); + logger.error("Unexpected error: {}", e.getMessage(), e); System.exit(2); } } @@ -264,14 +259,14 @@ public void startUp() { logger.info("Starting Mantis Worker"); RxNetty.useMetricListenersFactory(new MantisNettyEventsListenerFactory()); for (Service service : mantisServices) { - logger.info("Starting service: " + service); + logger.info("Starting service: {}", service); try { service.start(); } catch (Throwable e) { - logger.error(String.format("Failed to start service %s: %s", service, e.getMessage()), e); + logger.error("Failed to start service {}: {}", service, e.getMessage(), e); throw e; } - logger.info("Started service: " + service); + logger.info("Started service: {}", service); } logger.info("Started Mantis Worker successfully"); diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/ResourceUsagePayloadSetter.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/ResourceUsagePayloadSetter.java index 8f4a735f0..5a0b168f3 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/ResourceUsagePayloadSetter.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/ResourceUsagePayloadSetter.java @@ -145,7 +145,7 @@ private void setPayloadAndMetrics() { try { heartbeat.addSingleUsePayload("" + StatusPayloads.Type.ResourceUsage, objectMapper.writeValueAsString(usage)); } catch (JsonProcessingException e) { - logger.warn("Error writing json for resourceUsage payload: " + e.getMessage()); + logger.warn("Error writing json for resourceUsage payload: {}", e.getMessage()); } cpuLimitGauge.set(Math.round(usage.getCpuLimit() * 100.0)); cpuUsageCurrGauge.set(Math.round(usage.getCpuUsageCurrent() * 100.0)); @@ -168,7 +168,7 @@ private void setPayloadAndMetrics() { } catch (Exception e) { logger.error("Failed to compute resource usage", e); } finally { - logger.debug("scheduling next metrics report with delay=" + delay); + logger.debug("scheduling next metrics report with delay = {}", delay); executor.schedule(this::setPayloadAndMetrics, delay, TimeUnit.SECONDS); } diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/RunningWorker.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/RunningWorker.java index 0c02cc730..a30709f0e 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/RunningWorker.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/RunningWorker.java @@ -44,15 +44,15 @@ public class RunningWorker { private static final Logger logger = LoggerFactory.getLogger(RunningWorker.class); private final int totalStagesNet; - private Action0 onTerminateCallback; - private Action0 onCompleteCallback; - private Action1 onErrorCallback; - private CountDownLatch blockUntilTerminate = new CountDownLatch(1); - private Job job; - private SchedulingInfo schedulingInfo; - private StageConfig stage; - private Observer jobStatus; - private String jobId; + private final Action0 onTerminateCallback; + private final Action0 onCompleteCallback; + private final Action1 onErrorCallback; + private final CountDownLatch blockUntilTerminate = new CountDownLatch(1); + private final Job job; + private final SchedulingInfo schedulingInfo; + private final StageConfig stage; + private final Observer jobStatus; + private final String jobId; private final int stageNum; private final int workerNum; private final int workerIndex; @@ -88,37 +88,21 @@ public RunningWorker(Builder builder) { this.stageTotalWorkersObservable = builder.stageTotalWorkersObservable; this.jobSchedulingInfoObservable = builder.jobSchedulingInfoObservable; - this.onTerminateCallback = new Action0() { - @Override - public void call() { - blockUntilTerminate.countDown(); - } - }; - this.onCompleteCallback = new Action0() { - @Override - public void call() { - logger.info("JobId: " + jobId + " stage: " + stageNum + ", completed"); - // setup a timeout to call forced exit as sure way to exit - new Thread() { - @Override - public void run() { - try { - sleep(3000); - System.exit(1); - } catch (Exception e) { - logger.error("Ignoring exception during exit: " + e.getMessage(), e); - } - } - }.start(); - signalCompleted(); - } - }; - this.onErrorCallback = new Action1() { - @Override - public void call(Throwable t) { - signalFailed(t); - } + this.onTerminateCallback = blockUntilTerminate::countDown; + this.onCompleteCallback = () -> { + logger.info("JobId: {} stage: {}, completed", jobId, stageNum); + // setup a timeout to call forced exit as sure way to exit + new Thread(() -> { + try { + Thread.sleep(3000); + System.exit(1); + } catch (Exception e) { + logger.error("Ignoring exception during exit: {}", e.getMessage(), e); + } + }).start(); + signalCompleted(); }; + this.onErrorCallback = this::signalFailed; } private String getWorkerStringPrefix(int stageNum, int index, int number) { @@ -126,8 +110,7 @@ private String getWorkerStringPrefix(int stageNum, int index, int number) { } public void signalStartedInitiated() { - logger.info("JobId: " + jobId + ", stage: " + stageNum + " workerIndex: " + workerIndex + " workerNumber: " + workerNum + "," - + " signaling started initiated"); + logger.info("JobId: {}, stage: {} workerIndex: {} workerNumber: {}, signaling started initiated", jobId, stageNum, workerIndex, workerNum); vmTaskStatusObserver.onNext(new VirtualMachineTaskStatus( new WorkerId(jobId, workerIndex, workerNum).getId(), VirtualMachineTaskStatus.TYPE.STARTED, jobName + ", " + @@ -141,16 +124,14 @@ public void signalStartedInitiated() { } public void signalStarted() { - logger.info("JobId: " + jobId + ", " + getWorkerStringPrefix(stageNum, workerIndex, workerNum) - + " signaling started"); + logger.info("JobId: {}, {} signaling started", jobId, getWorkerStringPrefix(stageNum, workerIndex, workerNum)); jobStatus.onNext(new Status(jobId, stageNum, workerIndex, workerNum, TYPE.INFO, getWorkerStringPrefix(stageNum, workerIndex, workerNum) + " running", MantisJobState.Started)); } public void signalCompleted() { - logger.info("JobId: " + jobId + ", stage: " + stageNum + " workerIndex: " + workerIndex + " workerNumber: " + workerNum + "," - + " signaling completed"); + logger.info("JobId: {}, stage: {} workerIndex: {} workerNumber: {}, signaling completed", jobId, stageNum, workerIndex, workerNum); jobStatus.onNext(new Status(jobId, stageNum, workerIndex, workerNum, TYPE.INFO, getWorkerStringPrefix(stageNum, workerIndex, workerNum) + " completed", MantisJobState.Completed)); @@ -164,8 +145,7 @@ TYPE.INFO, getWorkerStringPrefix(stageNum, workerIndex, workerNum) + " completed } public void signalFailed(Throwable t) { - logger.info("JobId: " + jobId + ", stage: " + stageNum + " workerIndex: " + workerIndex + " workerNumber: " + workerNum + "," - + " signaling failed"); + logger.info("JobId: {}, stage: {} workerIndex: {} workerNumber: {}, signaling failed", jobId, stageNum, workerIndex, workerNum); logger.error("Worker failure detected, shutting down job", t); jobStatus.onNext(new Status(jobId, stageNum, workerIndex, workerNum, TYPE.INFO, getWorkerStringPrefix(stageNum, workerIndex, workerNum) + " failed. error: " + t.getMessage(), diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/TrackedExecuteStageRequest.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/TrackedExecuteStageRequest.java index 348f53a1f..8e5f6aaf0 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/TrackedExecuteStageRequest.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/TrackedExecuteStageRequest.java @@ -23,8 +23,8 @@ public class TrackedExecuteStageRequest { - private WrappedExecuteStageRequest executeRequest; - private Observer status; + private final WrappedExecuteStageRequest executeRequest; + private final Observer status; public TrackedExecuteStageRequest(WrappedExecuteStageRequest executeRequest, Observer status) { diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/VirtualMachineWorkerServiceLocalImpl.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/VirtualMachineWorkerServiceLocalImpl.java index 46d3487eb..726657236 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/VirtualMachineWorkerServiceLocalImpl.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/VirtualMachineWorkerServiceLocalImpl.java @@ -40,7 +40,6 @@ import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.mesos.MesosExecutorDriver; import org.slf4j.Logger; @@ -48,7 +47,6 @@ import rx.Observable; import rx.Observer; import rx.functions.Action0; -import rx.functions.Action1; import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; @@ -59,9 +57,9 @@ public class VirtualMachineWorkerServiceLocalImpl extends BaseService implements private static final Logger logger = LoggerFactory.getLogger(VirtualMachineWorkerServiceLocalImpl.class); private final WorkerTopologyInfo.Data workerInfo; private MesosExecutorDriver mesosDriver; - private ExecutorService executor; - private Observer executeStageRequestObserver; - private Observable vmTaskStatusObservable; + private final ExecutorService executor; + private final Observer executeStageRequestObserver; + private final Observable vmTaskStatusObservable; public VirtualMachineWorkerServiceLocalImpl(final WorkerTopologyInfo.Data workerInfo, Observer executeStageRequestObserver, @@ -69,13 +67,10 @@ public VirtualMachineWorkerServiceLocalImpl(final WorkerTopologyInfo.Data worker this.workerInfo = workerInfo; this.executeStageRequestObserver = executeStageRequestObserver; this.vmTaskStatusObservable = vmTaskStatusObservable; - executor = Executors.newSingleThreadExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "vm_worker_mesos_executor_thread"); - t.setDaemon(true); - return t; - } + executor = Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r, "vm_worker_mesos_executor_thread"); + t.setDaemon(true); + return t; }); } @@ -113,7 +108,7 @@ private WrappedExecuteStageRequest createExecuteStageRequest() throws MalformedU jobJarUrl, workerInfo.getStageNumber(), workerInfo.getNumStages(), ports, timeoutToReportStartSec, workerInfo.getMetricsPort(), params, schedInfo, MantisJobDurationType.Transient, 0L, 0L, new WorkerPorts(Arrays.asList(7151, 7152, 7153, 7154, 7155)), Optional.empty()); - return new WrappedExecuteStageRequest(PublishSubject.create(), executeStageRequest); + return new WrappedExecuteStageRequest(PublishSubject.create(), executeStageRequest); } private void setupRequestFailureHandler(long waitSeconds, Observable requestObservable, @@ -134,8 +129,7 @@ public void onError(Throwable e) { @Override public void onNext(List booleans) { - logger.info("onNext called for request failure handler with items: " + - ((booleans == null) ? "-1" : booleans.size())); + logger.info("onNext called for request failure handler with items: {}", (booleans == null) ? "-1" : booleans.size()); if ((booleans == null) || booleans.isEmpty()) errorHandler.call(); } @@ -145,38 +139,30 @@ public void onNext(List booleans) { @Override public void start() { logger.info("Starting VirtualMachineWorkerServiceLocalImpl"); - Schedulers.newThread().createWorker().schedule(new Action0() { - @Override - public void call() { - try { - WrappedExecuteStageRequest request = null; - request = createExecuteStageRequest(); - setupRequestFailureHandler(request.getRequest().getTimeoutToReportStart(), request.getRequestSubject(), - new Action0() { - @Override - public void call() { - logger.error("launch error"); - } - }); - logger.info("onNext'ing WrappedExecuteStageRequest: {}", request.toString()); - executeStageRequestObserver.onNext(request); - } catch (MalformedURLException e) { - e.printStackTrace(); - } + Schedulers.newThread().createWorker().schedule(() -> { + try { + WrappedExecuteStageRequest request; + request = createExecuteStageRequest(); + setupRequestFailureHandler(request.getRequest().getTimeoutToReportStart(), request.getRequestSubject(), + () -> logger.error("launch error")); + logger.info("onNext'ing WrappedExecuteStageRequest: {}", request); + executeStageRequestObserver.onNext(request); + } catch (MalformedURLException e) { + logger.error("Error creating WrappedExecuteStageRequest: {}", e.getMessage(), e); } }, 2, TimeUnit.SECONDS); // subscribe to vm task updates on current thread - vmTaskStatusObservable.subscribe(new Action1() { - @Override - public void call(VirtualMachineTaskStatus vmTaskStatus) { - TYPE type = vmTaskStatus.getType(); - if (type == TYPE.COMPLETED) { - logger.info("Got COMPLETED state for " + vmTaskStatus.getTaskId()); - } else if (type == TYPE.STARTED) { - logger.info("Would send RUNNING state to mesos, worker started for " + vmTaskStatus.getTaskId()); - } + vmTaskStatusObservable.subscribe(vmTaskStatus -> { + TYPE type = vmTaskStatus.getType(); + switch (type) { + case COMPLETED: + logger.info("Got COMPLETED state for {}", vmTaskStatus.getTaskId()); + break; + case STARTED: + logger.info("Would send RUNNING state to mesos, worker started for {}", vmTaskStatus.getTaskId()); + break; } }); } diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStage.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStage.java index 5db757d2f..49fd0babd 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStage.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStage.java @@ -129,7 +129,7 @@ public WorkerExecutionOperationsNetworkStage( String locateSpectatorRegistry = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.worker.locate.spectator.registry", "true"); - lookupSpectatorRegistry = Boolean.valueOf(locateSpectatorRegistry); + lookupSpectatorRegistry = Boolean.parseBoolean(locateSpectatorRegistry); scheduledExecutorService = new ScheduledThreadPoolExecutor(1); } @@ -137,12 +137,12 @@ public WorkerExecutionOperationsNetworkStage( * Converts a JobSchedulingInfo object to a simple WorkerMap to be used from within the context. * Static for easier testing. * - * @param jobName - * @param jobId - * @param durationType - * @param js + * @param jobName name of the job. + * @param jobId id of the job. + * @param durationType duration type of the job. + * @param js job scheduling info. * - * @return + * @return WorkerMap to be used within the context. */ static WorkerMap convertJobSchedulingInfoToWorkerMap(String jobName, String jobId, MantisJobDurationType durationType, JobSchedulingInfo js) { Map> stageToWorkerInfoMap = new HashMap<>(); @@ -169,17 +169,16 @@ static WorkerMap convertJobSchedulingInfoToWorkerMap(String jobName, String jobI if (hosts != null) { - List workerInfoList = hosts.values().stream().map((workerHost) -> { - - return generateWorkerInfo(jobName, jobId, stageNo, workerHost.getWorkerIndex(), workerHost.getWorkerNumber(), durationType, workerHost.getHost(), workerHost); - }).collect(Collectors.toList()); + List workerInfoList = hosts.values().stream() + .map((workerHost) -> generateWorkerInfo(jobName, jobId, stageNo, workerHost.getWorkerIndex(), workerHost.getWorkerNumber(), durationType, workerHost.getHost(), workerHost)) + .collect(Collectors.toList()); stageToWorkerInfoMap.put(stageNo, workerInfoList); } } workerMap = new WorkerMap(stageToWorkerInfoMap); } catch (Exception e) { - logger.warn("Exception converting JobSchedulingInfo " + js + " to worker Map " + e.getMessage()); + logger.warn("Exception converting JobSchedulingInfo {} to worker Map {}", js, e.getMessage()); return workerMap; } return workerMap; @@ -230,12 +229,12 @@ private Closeable startSendingHeartbeats(final Observer jobStatus, doubl /** * Converts JobSchedulingInfo to a simpler WorkerMap object to be used within Context * - * @param selfSchedulingInfo - * @param jobName - * @param jobId - * @param durationType + * @param selfSchedulingInfo Observable of JobSchedulingInfo. + * @param jobName name of the job. + * @param jobId id of the job. + * @param durationType duration type of the job. * - * @return + * @return Observable of WorkerMap to be used within the context. */ private Observable createWorkerMapObservable(Observable selfSchedulingInfo, String jobName, String jobId, MantisJobDurationType durationType) { @@ -317,7 +316,7 @@ public void executeStage(final ExecutionDetails setup) throws IOException { setupSubscriptionStateHandler(setup.getExecuteStageRequest().getRequest()); } - logger.info("Running worker info: " + rw); + logger.info("Running worker info: {}", rw); rw.signalStartedInitiated(); @@ -346,7 +345,7 @@ public void executeStage(final ExecutionDetails setup) throws IOException { rw.signalCompleted(); // wait for completion signal to go to the master and us getting killed. Upon timeout, exit. try {Thread.sleep(60000);} catch (InterruptedException ie) { - logger.warn("Unexpected exception sleeping: " + ie.getMessage()); + logger.warn("Unexpected exception sleeping: {}", ie.getMessage()); } System.exit(0); }, createWorkerMapObservable(selfSchedulingInfo, executionRequest.getJobName(), executionRequest.getJobId(), executionRequest.getDurationType()), @@ -364,7 +363,7 @@ public void executeStage(final ExecutionDetails setup) throws IOException { // execute stage if (rw.getStageNum() == 0) { - logger.info("JobId: " + rw.getJobId() + ", executing Job Master"); + logger.info("JobId: {}, executing Job Master", rw.getJobId()); final AutoScaleMetricsConfig autoScaleMetricsConfig = new AutoScaleMetricsConfig(); @@ -384,7 +383,7 @@ public void executeStage(final ExecutionDetails setup) throws IOException { } catch (IllegalArgumentException e) { final String errorMsg = String.format("ERROR: Invalid algorithm value %s for param %s (algo should be one of %s)", autoScaleMetricsConfig, JOB_MASTER_AUTOSCALE_METRIC_SYSTEM_PARAM, - Arrays.stream(AutoScaleMetricsConfig.AggregationAlgo.values()).map(a -> a.name()).collect(Collectors.toList())); + Arrays.stream(AutoScaleMetricsConfig.AggregationAlgo.values()).map(Enum::name).collect(Collectors.toList())); logger.error(errorMsg); throw new RuntimeException(errorMsg); } @@ -406,14 +405,9 @@ public void executeStage(final ExecutionDetails setup) throws IOException { // block until worker terminates rw.waitUntilTerminate(); } else if (rw.getStageNum() == 1 && rw.getTotalStagesNet() == 1) { - logger.info("JobId: " + rw.getJobId() + ", single stage job, executing entire job"); + logger.info("JobId: {}, single stage job, executing entire job", rw.getJobId()); // single stage, execute entire job on this machine - PortSelector portSelector = new PortSelector() { - @Override - public int acquirePort() { - return rw.getPorts().next(); - } - }; + PortSelector portSelector = () -> rw.getPorts().next(); RxMetrics rxMetrics = new RxMetrics(); closeables.add(StageExecutors.executeSingleStageJob(rw.getJob().getSource(), rw.getStage(), rw.getJob().getSink(), portSelector, rxMetrics, rw.getContext(), @@ -425,7 +419,7 @@ public int acquirePort() { // block until worker terminates rw.waitUntilTerminate(); } else { - logger.info("JobId: " + rw.getJobId() + ", executing a multi-stage job, stage: " + rw.getStageNum()); + logger.info("JobId: {}, executing a multi-stage job, stage: {}", rw.getJobId(), rw.getStageNum()); if (rw.getStageNum() == 1) { // execute source stage @@ -440,13 +434,13 @@ remoteObservableName, numWorkersAtStage(selfSchedulingInfo, rw.getJobId(), rw.ge closeables.add(StageExecutors.executeSource(rw.getWorkerIndex(), rw.getJob().getSource(), rw.getStage(), publisher, rw.getContext(), rw.getSourceStageTotalWorkersObservable())); - logger.info("JobId: " + rw.getJobId() + " stage: " + rw.getStageNum() + ", serving remote observable for source with name: " + remoteObservableName); + logger.info("JobId: {} stage: {}, serving remote observable for source with name: {}", rw.getJobId(), rw.getStageNum(), remoteObservableName); RemoteRxServer server = publisher.getServer(); RxMetrics rxMetrics = server.getMetrics(); MetricsRegistry.getInstance().registerAndGet(rxMetrics.getCountersAndGauges()); signalStarted(rw); - logger.info("JobId: " + rw.getJobId() + " stage: " + rw.getStageNum() + ", blocking until source observable completes"); + logger.info("JobId: {} stage: {}, blocking until source observable completes", rw.getJobId(), rw.getStageNum()); server.blockUntilServerShutdown(); } else { // execute intermediate stage or last stage plus sink @@ -506,21 +500,11 @@ private void executeNonSourceStage(Observable selfSchedulingI TYPE.INFO, getWorkerStringPrefix(rw.getStageNum(), rw.getWorkerIndex(), rw.getWorkerNum()) + " running", MantisJobState.Started)); - PortSelector portSelector = new PortSelector() { - @Override - public int acquirePort() { - return workerPort; - } - }; + PortSelector portSelector = () -> workerPort; RxMetrics rxMetrics = new RxMetrics(); MetricsRegistry.getInstance().registerAndGet(rxMetrics.getCountersAndGauges()); final CountDownLatch blockUntilComplete = new CountDownLatch(1); - Action0 countDownLatch = new Action0() { - @Override - public void call() { - blockUntilComplete.countDown(); - } - }; + Action0 countDownLatch = blockUntilComplete::countDown; closeables.add(StageExecutors.executeSink(consumer, rw.getStage(), rw.getJob().getSink(), portSelector, rxMetrics, rw.getContext(), countDownLatch, onSinkSubscribe, onSinkUnsubscribe, @@ -534,7 +518,7 @@ public void call() { acceptSchedulingChanges.set(false); } else { // intermediate stage - logger.info("JobId: " + rw.getJobId() + ", executing intermediate stage: " + rw.getStageNum()); + logger.info("JobId: {}, executing intermediate stage: {}", rw.getJobId(), rw.getStageNum()); int stageNumToExecute = rw.getStageNum(); @@ -548,12 +532,12 @@ public void call() { rw.getContext())); RemoteRxServer server = publisher.getServer(); - logger.info("JobId: " + jobId + " stage: " + stageNumToExecute + ", serving intermediate remote observable with name: " + remoteObservableName); + logger.info("JobId: {} stage: {}, serving intermediate remote observable with name: {}", jobId, stageNumToExecute, remoteObservableName); RxMetrics rxMetrics = server.getMetrics(); MetricsRegistry.getInstance().registerAndGet(rxMetrics.getCountersAndGauges()); // send running signal only after server is started signalStarted(rw); - logger.info("JobId: " + jobId + " stage: " + stageNumToExecute + ", blocking until intermediate observable completes"); + logger.info("JobId: {} stage: {}, blocking until intermediate observable completes", jobId, stageNumToExecute); server.blockUntilServerShutdown(); acceptSchedulingChanges.set(false); } @@ -563,7 +547,7 @@ public void call() { private Observable numWorkersAtStage(Observable selfSchedulingInfo, String jobId, final int stageNum) { //return mantisMasterApi.schedulingChanges(jobId) return selfSchedulingInfo - .distinctUntilChanged((prevJobSchedInfo, currentJobSchedInfo) -> (!prevJobSchedInfo.equals(currentJobSchedInfo)) ? false : true) + .distinctUntilChanged(JobSchedulingInfo::equals) .flatMap((Func1>) schedulingChange -> { Map assignments = schedulingChange.getWorkerAssignments(); if (assignments != null && !assignments.isEmpty()) { @@ -601,11 +585,11 @@ private WorkerConsumer connectToObservableAtPreviousStages(Observable endpoints = new LinkedList<>(); for (WorkerHost host : assignments.getHosts().values()) { if (host.getState() == MantisJobState.Started) { - logger.info("Received scheduling update from master, connect request for host: " + host.getHost() + " port: " + host.getPort() + " state: " + host.getState() + - " adding: " + connectionsPerEndpoint + " connections to host"); + logger.info("Received scheduling update from master, connect request for host: {} port: {} state: {} adding: {} connections to host", + host.getHost(), host.getPort(), host.getState(), connectionsPerEndpoint); for (int i = 1; i <= connectionsPerEndpoint; i++) { - final String endpointId = "stage_" + stageNumToExecute + "_index_" + Integer.toString(workerIndex) + "_partition_" + i; - logger.info("Adding endpoint to endpoint injector to be considered for add, with id: " + endpointId); + final String endpointId = "stage_" + stageNumToExecute + "_index_" + workerIndex + "_partition_" + i; + logger.info("Adding endpoint to endpoint injector to be considered for add, with id: {}", endpointId); endpoints.add(new Endpoint(host.getHost(), host.getPort().get(0), endpointId)); } diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/WorkerIndexHistory.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/WorkerIndexHistory.java index 5bd2b182c..8865d5a52 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/WorkerIndexHistory.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/WorkerIndexHistory.java @@ -22,8 +22,8 @@ public class WorkerIndexHistory { - final Set runningWorkerIndex = new HashSet(); - final Set terminalWorkerIndex = new HashSet(); + private final Set runningWorkerIndex = new HashSet<>(); + private final Set terminalWorkerIndex = new HashSet<>(); public synchronized void addToRunningIndex(int workerIndex) { runningWorkerIndex.add(workerIndex); diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/AutoScaleMetricsConfig.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/AutoScaleMetricsConfig.java index b03638a8b..69718ec25 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/AutoScaleMetricsConfig.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/AutoScaleMetricsConfig.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.regex.Pattern; @@ -166,7 +167,7 @@ public Set generateSourceJobMetricGroups(Set clientIds) { public boolean isSourceJobDropMetric(String metricGroupName, String metricName) { for (Map.Entry entry : sourceJobMetricsPatterns.entrySet()) { if (entry.getValue().matcher(metricGroupName).matches()) { - return sourceJobMetrics.get(entry.getKey()).keySet().contains(metricName); + return sourceJobMetrics.get(entry.getKey()).containsKey(metricName); } } return false; @@ -183,7 +184,7 @@ public boolean equals(Object o) { AutoScaleMetricsConfig that = (AutoScaleMetricsConfig) o; - return userDefinedAutoScaleMetrics != null ? userDefinedAutoScaleMetrics.equals(that.userDefinedAutoScaleMetrics) : that.userDefinedAutoScaleMetrics == null; + return Objects.equals(userDefinedAutoScaleMetrics, that.userDefinedAutoScaleMetrics); } diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/GaugeData.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/GaugeData.java index 5cb625be0..9e2e8a928 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/GaugeData.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/GaugeData.java @@ -30,7 +30,7 @@ class GaugeData { GaugeData(final long when, final List gauges) { this.when = when; for (GaugeMeasurement gauge : gauges) { - this.gauges.put(gauge.getEvent(), (double) gauge.getValue()); + this.gauges.put(gauge.getEvent(), gauge.getValue()); } } diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoScaler.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoScaler.java index 9c83bde8f..ca76e1a67 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoScaler.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoScaler.java @@ -42,6 +42,7 @@ import io.vavr.control.Try; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -99,7 +100,7 @@ public class JobAutoScaler { public static void main(String[] args) { Observable.interval(1, TimeUnit.DAYS) - .doOnNext(x -> System.out.println(x)) + .doOnNext(System.out::println) .take(1) .toBlocking() .last(); @@ -116,19 +117,17 @@ private com.netflix.control.clutch.Event mantisEventToClutchEvent(StageSchedulin void start() { subject - .onBackpressureBuffer(100, () -> { - logger.info("onOverflow triggered, dropping old events"); - }, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) + .onBackpressureBuffer( + 100, () -> logger.info("onOverflow triggered, dropping old events"), + BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) .doOnRequest(x -> logger.info("Scaler requested {} metrics.", x)) - .groupBy(event -> event.getStage()) + .groupBy(Event::getStage) .flatMap(go -> { Integer stage = Optional.ofNullable(go.getKey()).orElse(-1); final StageSchedulingInfo stageSchedulingInfo = schedulingInfo.forStage(stage); logger.info("System Environment:"); - System.getenv().forEach((key, value) -> { - logger.info("{} = {}", key, value); - }); + System.getenv().forEach((key, value) -> logger.info("{} = {}", key, value)); Optional clutchCustomConfiguration = Optional.ofNullable( @@ -253,9 +252,7 @@ void start() { .doOnCompleted(() -> logger.info("onComplete on JobAutoScaler subject")) .doOnError(t -> logger.error("got onError in JobAutoScaler", t)) .doOnSubscribe(() -> logger.info("onSubscribe JobAutoScaler")) - .doOnUnsubscribe(() -> { - logger.info("Unsubscribing for JobAutoScaler of job " + jobId); - }) + .doOnUnsubscribe(() -> logger.info("Unsubscribing for JobAutoScaler of job {}", jobId)) .retry() .subscribe(); } @@ -270,7 +267,7 @@ void start() { * @return A map of stage -> config for Clutch. */ protected Map getClutchConfiguration(String jsonConfig) { - return Try.>of(() -> objectMapper.readValue(jsonConfig, new TypeReference>() {})) + return Try.of(() -> objectMapper.readValue(jsonConfig, new TypeReference>() {})) .getOrElseGet(t -> Try.of(() -> { ClutchConfiguration config = objectMapper.readValue(jsonConfig, new TypeReference() {}); Map configs = new HashMap<>(); @@ -326,7 +323,7 @@ public boolean equals(Object o) { if (Double.compare(event.value, value) != 0) return false; if (numWorkers != event.numWorkers) return false; if (type != event.type) return false; - return message != null ? message.equals(event.message) : event.message == null; + return Objects.equals(message, event.message); } @@ -366,7 +363,7 @@ public class StageScaler { .zipWith(Observable.range(1, Integer.MAX_VALUE), (Func2) (t1, integer) -> integer) .flatMap((Func1>) integer -> { long delay = 2 * (integer > 5 ? 10 : integer); - logger.info("retrying scaleJobStage request after sleeping for " + delay + " secs"); + logger.info("retrying scaleJobStage request after sleeping for {} secs", delay); return Observable.timer(delay, TimeUnit.SECONDS); }); @@ -389,7 +386,7 @@ private void setOutstandingScalingRequest(final Subscription subscription) { public int getDesiredWorkersForScaleUp(final int increment, final int numCurrentWorkers) { final int desiredWorkers; if (!stageSchedulingInfo.getScalingPolicy().isEnabled()) { - logger.warn("Job " + jobId + " stage " + stage + " is not scalable, can't increment #workers by " + increment); + logger.warn("Job {} stage {} is not scalable, can't increment #workers by {}", jobId, stage, increment); return numCurrentWorkers; } if (numCurrentWorkers < 0 || increment < 1) { @@ -418,7 +415,7 @@ public void scaleUpStage(final int numCurrentWorkers, final int desiredWorkers, public int getDesiredWorkersForScaleDown(final int decrement, final int numCurrentWorkers) { final int desiredWorkers; if (!stageSchedulingInfo.getScalingPolicy().isEnabled()) { - logger.warn("Job " + jobId + " stage " + stage + " is not scalable, can't decrement #workers by " + decrement); + logger.warn("Job {} stage {} is not scalable, can't decrement #workers by {}", jobId, stage, decrement); return numCurrentWorkers; } if (numCurrentWorkers < 0 || decrement < 1) { @@ -478,7 +475,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { - logger.error("Unexpected error: " + e.getMessage(), e); + logger.error("Unexpected error: {}", e.getMessage(), e); } @Override @@ -486,8 +483,8 @@ public void onNext(Event event) { final StageScalingPolicy scalingPolicy = stageSchedulingInfo.getScalingPolicy(); long coolDownSecs = scalingPolicy == null ? Long.MAX_VALUE : scalingPolicy.getCoolDownSecs(); boolean scalable = stageSchedulingInfo.getScalable() && scalingPolicy != null && scalingPolicy.isEnabled(); - logger.debug("Will check for autoscaling job " + jobId + " stage " + stage + " due to event: " + event); - if (scalable && scalingPolicy != null) { + logger.debug("Will check for autoscaling job {} stage {} due to event: {}", jobId, stage, event); + if (scalable) { final StageScalingPolicy.Strategy strategy = scalingPolicy.getStrategies().get(event.getType()); if (strategy != null) { double effectiveValue = Util.getEffectiveValue(stageSchedulingInfo, event.getType(), event.getValue()); @@ -499,14 +496,11 @@ public void onNext(Event event) { } stats.add(effectiveValue); if (lastScaledAt < (System.currentTimeMillis() - coolDownSecs * 1000)) { - logger.info(jobId + ", stage " + stage + ": eff=" + - String.format(PercentNumberFormat, effectiveValue) + ", thresh=" + strategy.getScaleUpAbovePct()); + logger.info("{}, stage {}: eff={}, thresh={}", + jobId, stage, String.format(PercentNumberFormat, effectiveValue), strategy.getScaleUpAbovePct()); if (stats.getHighThreshTriggered()) { - logger.info("Attempting to scale up stage " + stage + " of job " + jobId + " by " + - scalingPolicy.getIncrement() + " workers, because " + - event.type + " exceeded scaleUpThreshold of " + - String.format(PercentNumberFormat, strategy.getScaleUpAbovePct()) + " " + - stats.getCurrentHighCount() + " times"); + logger.info("Attempting to scale up stage {} of job {} by {} workers, because {} exceeded scaleUpThreshold of {} {} times", + stage, jobId, scalingPolicy.getIncrement(), event.type, String.format(PercentNumberFormat, strategy.getScaleUpAbovePct()), stats.getCurrentHighCount()); final int numCurrWorkers = event.getNumWorkers(); final int desiredWorkers = scaler.getDesiredWorkersForScaleUp(scalingPolicy.getIncrement(), numCurrWorkers); if (desiredWorkers > numCurrWorkers) { @@ -519,10 +513,8 @@ public void onNext(Event event) { logger.debug("scale up NOOP: desiredWorkers same as current workers"); } } else if (stats.getLowThreshTriggered()) { - logger.info("Attempting to scale down stage " + stage + " of job " + jobId + " by " + - scalingPolicy.getDecrement() + " workers because " + event.getType() + - " is below scaleDownThreshold of " + strategy.getScaleDownBelowPct() + - " " + stats.getCurrentLowCount() + " times"); + logger.info("Attempting to scale down stage {} of job {} by {} workers because {} is below scaleDownThreshold of {} {} times", + stage, jobId, scalingPolicy.getDecrement(), event.getType(), strategy.getScaleDownBelowPct(), stats.getCurrentLowCount()); final int numCurrentWorkers = event.getNumWorkers(); final int desiredWorkers = scaler.getDesiredWorkersForScaleDown(scalingPolicy.getDecrement(), numCurrentWorkers); if (desiredWorkers < numCurrentWorkers) { diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/SourceJobWorkerMetricsSubscription.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/SourceJobWorkerMetricsSubscription.java index 27990a783..fe9fd55fc 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/SourceJobWorkerMetricsSubscription.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/SourceJobWorkerMetricsSubscription.java @@ -67,11 +67,7 @@ protected Observable> getResultsForJobId(Strin protected Map> getSourceJobToClientMap() { Map> results = new HashMap<>(); for (SourceJobParameters.TargetInfo info : targetInfos) { - Set clientIds = results.get(info.sourceJobName); - if (clientIds == null) { - clientIds = new HashSet<>(); - results.put(info.sourceJobName, clientIds); - } + Set clientIds = results.computeIfAbsent(info.sourceJobName, k -> new HashSet<>()); clientIds.add(info.clientId); } return results; diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java index c42d0987a..692bcdb87 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java @@ -43,7 +43,6 @@ import rx.Observer; import rx.Subscriber; import rx.Subscription; -import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Func1; import rx.observers.SerializedObserver; @@ -126,7 +125,7 @@ private class StageMetricDataOperator implements Observable.Operator numStageWorkersFn, final AutoScaleMetricsConfig autoScaleMetricsConfig) { - logger.debug("setting operator for stage " + stage); + logger.debug("setting operator for stage {}", stage); this.stage = stage; this.numStageWorkersFn = numStageWorkersFn; this.autoScaleMetricsConfig = autoScaleMetricsConfig; @@ -164,7 +163,7 @@ public StageMetricDataOperator(final int stage, List candidates = workerHostsByStage.get(stage); if (candidates != null) { candidates.stream().filter(h -> h.getWorkerIndex() == workerIndex).map(WorkerHost::getHost).findFirst().ifPresent(host -> - lookupWorkersByHost(host).stream().forEach(i -> workerResubmitFunc.call(i))); + lookupWorkersByHost(host).stream().forEach(workerResubmitFunc::call)); } }); } @@ -175,7 +174,7 @@ private boolean resubmitOutlierWorkerEnabled() { final String enableOutlierWorkerResubmit = "true"; final boolean resubmitOutlierWorker = - Boolean.valueOf( + Boolean.parseBoolean( ServiceRegistry.INSTANCE.getPropertiesService() .getStringValue(resubmitOutlierWorkerProp, enableOutlierWorkerResubmit)); return resubmitOutlierWorker; @@ -261,114 +260,110 @@ private void addSourceJobDataPoint(final MetricData datapoint) { @Override public Subscriber call(final Subscriber child) { child.add(Schedulers.computation().createWorker().schedulePeriodically( - new Action0() { - @Override - public void call() { + () -> { - List> listofAggregates = new ArrayList<>(); + List> listofAggregates = new ArrayList<>(); - synchronized (workersMap) { - for (Map.Entry entry : workersMap.entrySet()) { - // get the aggregate metric values by metric group per worker - listofAggregates.add(metricAggregator.getAggregates(entry.getValue().getGaugesByMetricGrp())); - } - } - final int numWorkers = numStageWorkersFn.call(stage); - // get the aggregate metric values by metric group for all workers in stage - Map allWorkerAggregates = getAggregates(listofAggregates); - logger.info("Job stage " + stage + " avgResUsage from " + - workersMap.size() + " workers: " + allWorkerAggregates.toString()); - - for (Map.Entry> userDefinedMetric : autoScaleMetricsConfig.getUserDefinedMetrics().entrySet()) { - final String metricGrp = userDefinedMetric.getKey(); - for (String metric : userDefinedMetric.getValue()) { - if (!allWorkerAggregates.containsKey(metricGrp) || !allWorkerAggregates.get(metricGrp).getGauges().containsKey(metric)) { - logger.debug("no gauge data found for UserDefined (metric={})", userDefinedMetric); - } else { - jobAutoScaleObserver.onNext( - new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.UserDefined, stage, - allWorkerAggregates.get(metricGrp).getGauges().get(metric), numWorkers, "")); - } - } - } - if (allWorkerAggregates.containsKey(KAFKA_CONSUMER_FETCH_MGR_METRIC_GROUP)) { - final Map gauges = allWorkerAggregates.get(KAFKA_CONSUMER_FETCH_MGR_METRIC_GROUP).getGauges(); - if (gauges.containsKey(KAFKA_LAG)) { - jobAutoScaleObserver.onNext( - new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.KafkaLag, stage, - gauges.get(KAFKA_LAG), numWorkers, "") - ); - } - if (gauges.containsKey(KAFKA_PROCESSED)) { - jobAutoScaleObserver.onNext( - new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.KafkaProcessed, stage, - gauges.get(KAFKA_PROCESSED), numWorkers, "")); - } - } - if (allWorkerAggregates.containsKey(RESOURCE_USAGE_METRIC_GROUP)) { - // cpuPctUsageCurr is Published as (cpuUsageCurr * 100.0) from ResourceUsagePayloadSetter, reverse transform to retrieve curr cpu usage - double cpuUsageCurr = allWorkerAggregates.get(RESOURCE_USAGE_METRIC_GROUP).getGauges().get(MetricStringConstants.CPU_PCT_USAGE_CURR) / 100.0; - jobAutoScaleObserver.onNext( - new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.CPU, stage, - cpuUsageCurr, numWorkers, "")); - jobAutoScaleObserver.onNext( - new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.Memory, stage, - allWorkerAggregates.get(RESOURCE_USAGE_METRIC_GROUP).getGauges().get(MetricStringConstants.TOT_MEM_USAGE_CURR), numWorkers, "")); - jobAutoScaleObserver.onNext( - new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.Network, stage, - allWorkerAggregates.get(RESOURCE_USAGE_METRIC_GROUP).getGauges().get(MetricStringConstants.NW_BYTES_USAGE_CURR), numWorkers, "")); + synchronized (workersMap) { + for (Map.Entry entry : workersMap.entrySet()) { + // get the aggregate metric values by metric group per worker + listofAggregates.add(metricAggregator.getAggregates(entry.getValue().getGaugesByMetricGrp())); + } + } + final int numWorkers = numStageWorkersFn.call(stage); + // get the aggregate metric values by metric group for all workers in stage + Map allWorkerAggregates = getAggregates(listofAggregates); + logger.info("Job stage {} avgResUsage from {} workers: {}", stage, workersMap.size(), allWorkerAggregates.toString()); + + for (Map.Entry> userDefinedMetric : autoScaleMetricsConfig.getUserDefinedMetrics().entrySet()) { + final String metricGrp = userDefinedMetric.getKey(); + for (String metric : userDefinedMetric.getValue()) { + if (!allWorkerAggregates.containsKey(metricGrp) || !allWorkerAggregates.get(metricGrp).getGauges().containsKey(metric)) { + logger.debug("no gauge data found for UserDefined (metric={})", userDefinedMetric); + } else { jobAutoScaleObserver.onNext( - new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.JVMMemory, stage, - allWorkerAggregates.get(RESOURCE_USAGE_METRIC_GROUP).getGauges().get("jvmMemoryUsedBytes"), numWorkers, "") - ); + new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.UserDefined, stage, + allWorkerAggregates.get(metricGrp).getGauges().get(metric), numWorkers, "")); } + } + } + if (allWorkerAggregates.containsKey(KAFKA_CONSUMER_FETCH_MGR_METRIC_GROUP)) { + final Map gauges = allWorkerAggregates.get(KAFKA_CONSUMER_FETCH_MGR_METRIC_GROUP).getGauges(); + if (gauges.containsKey(KAFKA_LAG)) { + jobAutoScaleObserver.onNext( + new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.KafkaLag, stage, + gauges.get(KAFKA_LAG), numWorkers, "") + ); + } + if (gauges.containsKey(KAFKA_PROCESSED)) { + jobAutoScaleObserver.onNext( + new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.KafkaProcessed, stage, + gauges.get(KAFKA_PROCESSED), numWorkers, "")); + } + } + if (allWorkerAggregates.containsKey(RESOURCE_USAGE_METRIC_GROUP)) { + // cpuPctUsageCurr is Published as (cpuUsageCurr * 100.0) from ResourceUsagePayloadSetter, reverse transform to retrieve curr cpu usage + double cpuUsageCurr = allWorkerAggregates.get(RESOURCE_USAGE_METRIC_GROUP).getGauges().get(MetricStringConstants.CPU_PCT_USAGE_CURR) / 100.0; + jobAutoScaleObserver.onNext( + new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.CPU, stage, + cpuUsageCurr, numWorkers, "")); + jobAutoScaleObserver.onNext( + new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.Memory, stage, + allWorkerAggregates.get(RESOURCE_USAGE_METRIC_GROUP).getGauges().get(MetricStringConstants.TOT_MEM_USAGE_CURR), numWorkers, "")); + jobAutoScaleObserver.onNext( + new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.Network, stage, + allWorkerAggregates.get(RESOURCE_USAGE_METRIC_GROUP).getGauges().get(MetricStringConstants.NW_BYTES_USAGE_CURR), numWorkers, "")); + jobAutoScaleObserver.onNext( + new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.JVMMemory, stage, + allWorkerAggregates.get(RESOURCE_USAGE_METRIC_GROUP).getGauges().get("jvmMemoryUsedBytes"), numWorkers, "") + ); + } - if (allWorkerAggregates.containsKey(DATA_DROP_METRIC_GROUP)) { - final GaugeData gaugeData = allWorkerAggregates.get(DATA_DROP_METRIC_GROUP); - final Map gauges = gaugeData.getGauges(); - if (gauges.containsKey(DROP_PERCENT)) { - jobAutoScaleObserver.onNext( - new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.DataDrop, stage, - gauges.get(DROP_PERCENT), numWorkers, "")); - } - } + if (allWorkerAggregates.containsKey(DATA_DROP_METRIC_GROUP)) { + final GaugeData gaugeData = allWorkerAggregates.get(DATA_DROP_METRIC_GROUP); + final Map gauges = gaugeData.getGauges(); + if (gauges.containsKey(DROP_PERCENT)) { + jobAutoScaleObserver.onNext( + new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.DataDrop, stage, + gauges.get(DROP_PERCENT), numWorkers, "")); + } + } - if (allWorkerAggregates.containsKey(WORKER_STAGE_INNER_INPUT)) { - final GaugeData gaugeData = allWorkerAggregates.get(WORKER_STAGE_INNER_INPUT); - final Map gauges = gaugeData.getGauges(); - if (gauges.containsKey(ON_NEXT_GAUGE)) { - // Divide by 6 to account for 6 second reset by Atlas on counter metric. - jobAutoScaleObserver.onNext( - new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.RPS, stage, - gauges.get(ON_NEXT_GAUGE) / 6.0, numWorkers, "")); - } - } + if (allWorkerAggregates.containsKey(WORKER_STAGE_INNER_INPUT)) { + final GaugeData gaugeData = allWorkerAggregates.get(WORKER_STAGE_INNER_INPUT); + final Map gauges = gaugeData.getGauges(); + if (gauges.containsKey(ON_NEXT_GAUGE)) { + // Divide by 6 to account for 6 second reset by Atlas on counter metric. + jobAutoScaleObserver.onNext( + new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.RPS, stage, + gauges.get(ON_NEXT_GAUGE) / 6.0, numWorkers, "")); + } + } - double sourceJobDrops = 0; - boolean hasSourceJobDropsMetric = false; - Map sourceMetricsRecent = sourceJobMetricsRecent.asMap(); - for (Map.Entry worker : sourceJobWorkersMap.entrySet()) { - Map metricGroups = metricAggregator.getAggregates(worker.getValue().getGaugesByMetricGrp()); - for (Map.Entry group : metricGroups.entrySet()) { - String metricKey = worker.getKey() + ":" + group.getKey(); - for (Map.Entry gauge : group.getValue().getGauges().entrySet()) { - if (sourceMetricsRecent.containsKey(metricKey) && - autoScaleMetricsConfig.isSourceJobDropMetric(group.getKey(), gauge.getKey())) { - sourceJobDrops += gauge.getValue(); - hasSourceJobDropsMetric = true; - } - } + double sourceJobDrops = 0; + boolean hasSourceJobDropsMetric = false; + Map sourceMetricsRecent = sourceJobMetricsRecent.asMap(); + for (Map.Entry worker : sourceJobWorkersMap.entrySet()) { + Map metricGroups = metricAggregator.getAggregates(worker.getValue().getGaugesByMetricGrp()); + for (Map.Entry group : metricGroups.entrySet()) { + String metricKey = worker.getKey() + ":" + group.getKey(); + for (Map.Entry gauge : group.getValue().getGauges().entrySet()) { + if (sourceMetricsRecent.containsKey(metricKey) && + autoScaleMetricsConfig.isSourceJobDropMetric(group.getKey(), gauge.getKey())) { + sourceJobDrops += gauge.getValue(); + hasSourceJobDropsMetric = true; } } - if (hasSourceJobDropsMetric) { - logger.info("Job stage {}, source job drop metrics: {}", stage, sourceJobDrops); - // Divide by 6 to account for 6 second reset by Atlas on counter metric. - jobAutoScaleObserver.onNext( - new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.SourceJobDrop, stage, - sourceJobDrops / 6.0 / numWorkers, numWorkers, "")); - } } - }, metricsIntervalSeconds, metricsIntervalSeconds, TimeUnit.SECONDS + } + if (hasSourceJobDropsMetric) { + logger.info("Job stage {}, source job drop metrics: {}", stage, sourceJobDrops); + // Divide by 6 to account for 6 second reset by Atlas on counter metric. + jobAutoScaleObserver.onNext( + new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.SourceJobDrop, stage, + sourceJobDrops / 6.0 / numWorkers, numWorkers, "")); + } + }, metricsIntervalSeconds, metricsIntervalSeconds, TimeUnit.SECONDS )); return new Subscriber() { @Override @@ -378,13 +373,14 @@ public void onCompleted() { @Override public void onError(Throwable e) { - logger.error("Unexpected error: " + e.getMessage(), e); + logger.error("Unexpected error: {}", e.getMessage(), e); } @Override public void onNext(MetricData metricData) { - logger.debug("Got metric metricData for job " + jobId + " stage " + stage + - ", worker " + metricData.getWorkerNumber() + ": " + metricData); + logger.debug("Got metric metricData for job {} stage {}, worker {}: {}", + jobId, stage, metricData.getWorkerNumber(), metricData); + if (jobId.equals(metricData.getJobId())) { addDataPoint(metricData); } else { @@ -410,7 +406,7 @@ private void start() { logger.info("Starting worker metric handler with autoscale config {}", autoScaleMetricsConfig); metricDataSubject - .groupBy(metricData -> metricData.getStage()) + .groupBy(MetricData::getStage) .lift(new DropOperator<>(WorkerMetricHandler.class.getName())) .doOnNext(go -> { final Integer stage = go.getKey(); diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricSubscription.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricSubscription.java index 3a83a6477..f9d620fed 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricSubscription.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricSubscription.java @@ -29,7 +29,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observer; -import rx.functions.Action1; public class WorkerMetricSubscription { @@ -54,15 +53,12 @@ public WorkerMetricSubscription(final String jobId, WorkerMetricsClient workerMe } metricsClient = workerMetricsClient.getMetricsClientByJobId(jobId, - new SseWorkerConnectionFunction(true, new Action1() { - @Override - public void call(Throwable throwable) { - logger.error("Metric connection error: " + throwable.getMessage()); - try { - Thread.sleep(500); - } catch (InterruptedException ie) { - logger.error("Interrupted waiting for retrying connection"); - } + new SseWorkerConnectionFunction(true, throwable -> { + logger.error("Metric connection error: {}", throwable.getMessage()); + try { + Thread.sleep(500); + } catch (InterruptedException ie) { + logger.error("Interrupted waiting for retrying connection"); } }, metricNamesFilter), new Observer() { diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/clutch/ClutchAutoScaler.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/clutch/ClutchAutoScaler.java index 064a53de4..441473891 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/clutch/ClutchAutoScaler.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/clutch/ClutchAutoScaler.java @@ -53,7 +53,7 @@ public class ClutchAutoScaler implements Observable.Transformer attributes = new FastVector<>(); static { attributes.add(new Attribute("cpu")); @@ -71,7 +71,7 @@ public class ClutchAutoScaler implements Observable.Transformer actionCache = CacheBuilder.newBuilder() .maximumSize(12) @@ -85,7 +85,7 @@ public ClutchAutoScaler(StageSchedulingInfo stageSchedulingInfo, JobAutoScaler.S this.initialSize = initialSize; this.targetScale.set(initialSize); this.config = config; - this.rps.set(Math.round(config.rps)); + this.rps = new AtomicLong(Math.round(config.rps)); this.cooldownTimestamp = new AtomicLong(System.currentTimeMillis() + config.cooldownSeconds.getOrElse(0L) * 1000); @@ -233,7 +233,7 @@ public Observable call(Observable metrics) { .doOnNext(x -> actionCache.put(System.currentTimeMillis(), x - targetScale.get())) .doOnNext(targetScale::set) .doOnNext(__ -> cooldownTimestamp.set(System.currentTimeMillis() + config.cooldownSeconds.getOrElse(0L) * 1000)) - .map(x -> (Object) x); + .map(x -> x); } private class ClutchController implements Observable.Transformer { diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/clutch/ClutchConfiguration.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/clutch/ClutchConfiguration.java index 9065c139d..c2e453547 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/clutch/ClutchConfiguration.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/clutch/ClutchConfiguration.java @@ -18,6 +18,7 @@ import io.mantisrx.server.worker.jobmaster.clutch.rps.ClutchRpsPIDConfig; import io.vavr.control.Option; +import java.util.Objects; public class ClutchConfiguration { @@ -119,38 +120,38 @@ public boolean equals(Object o) { if (Double.compare(this.getRps(), other.getRps()) != 0) return false; final Object this$minSamples = this.getMinSamples(); final Object other$minSamples = other.getMinSamples(); - if (this$minSamples == null ? other$minSamples != null : !this$minSamples.equals(other$minSamples)) + if (!Objects.equals(this$minSamples, other$minSamples)) return false; final Object this$cooldownSeconds = this.getCooldownSeconds(); final Object other$cooldownSeconds = other.getCooldownSeconds(); - if (this$cooldownSeconds == null ? other$cooldownSeconds != null : !this$cooldownSeconds.equals(other$cooldownSeconds)) + if (!Objects.equals(this$cooldownSeconds, other$cooldownSeconds)) return false; final Object this$panicThresholdSeconds = this.getPanicThresholdSeconds(); final Object other$panicThresholdSeconds = other.getPanicThresholdSeconds(); - if (this$panicThresholdSeconds == null ? other$panicThresholdSeconds != null : !this$panicThresholdSeconds.equals(other$panicThresholdSeconds)) + if (!Objects.equals(this$panicThresholdSeconds, other$panicThresholdSeconds)) return false; final Object this$maxAdjustment = this.getMaxAdjustment(); final Object other$maxAdjustment = other.getMaxAdjustment(); - if (this$maxAdjustment == null ? other$maxAdjustment != null : !this$maxAdjustment.equals(other$maxAdjustment)) + if (!Objects.equals(this$maxAdjustment, other$maxAdjustment)) return false; final Object this$cpu = this.getCpu(); final Object other$cpu = other.getCpu(); - if (this$cpu == null ? other$cpu != null : !this$cpu.equals(other$cpu)) return false; + if (!Objects.equals(this$cpu, other$cpu)) return false; final Object this$memory = this.getMemory(); final Object other$memory = other.getMemory(); - if (this$memory == null ? other$memory != null : !this$memory.equals(other$memory)) return false; + if (!Objects.equals(this$memory, other$memory)) return false; final Object this$network = this.getNetwork(); final Object other$network = other.getNetwork(); - if (this$network == null ? other$network != null : !this$network.equals(other$network)) return false; + if (!Objects.equals(this$network, other$network)) return false; final Object this$rpsConfig = this.getRpsConfig(); final Object other$rpsConfig = other.getRpsConfig(); - if (this$rpsConfig == null ? other$rpsConfig != null : !this$rpsConfig.equals(other$rpsConfig)) return false; + if (!Objects.equals(this$rpsConfig, other$rpsConfig)) return false; final Object this$useExperimental = this.getUseExperimental(); final Object other$useExperimental = other.getUseExperimental(); - if (this$useExperimental == null ? other$useExperimental != null : !this$useExperimental.equals(other$useExperimental)) return false; + if (!Objects.equals(this$useExperimental, other$useExperimental)) return false; final Object this$integralDecay = this.getIntegralDecay(); final Object other$integralDecay = other.getIntegralDecay(); - if (this$integralDecay == null ? other$integralDecay != null : !this$integralDecay.equals(other$integralDecay)) + if (!Objects.equals(this$integralDecay, other$integralDecay)) return false; return true; } diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/clutch/ClutchPIDConfig.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/clutch/ClutchPIDConfig.java index e3eb17d98..fc9e1c76e 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/clutch/ClutchPIDConfig.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/clutch/ClutchPIDConfig.java @@ -17,6 +17,7 @@ package io.mantisrx.server.worker.jobmaster.clutch; import io.vavr.Tuple2; +import java.util.Objects; public class ClutchPIDConfig { @@ -59,7 +60,7 @@ public boolean equals(Object o) { if (Double.compare(this.getSetPoint(), other.getSetPoint()) != 0) return false; final Object this$rope = this.getRope(); final Object other$rope = other.getRope(); - if (this$rope == null ? other$rope != null : !this$rope.equals(other$rope)) return false; + if (!Objects.equals(this$rope, other$rope)) return false; if (Double.compare(this.getKp(), other.getKp()) != 0) return false; if (Double.compare(this.getKd(), other.getKd()) != 0) return false; return true; diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/clutch/experimental/MantisClutchConfigurationSelector.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/clutch/experimental/MantisClutchConfigurationSelector.java index 308dc8794..3879bf4f5 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/clutch/experimental/MantisClutchConfigurationSelector.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/clutch/experimental/MantisClutchConfigurationSelector.java @@ -77,11 +77,11 @@ private double getSetpoint(Map sketches, dou // Sanity checking against mins / maxes if (setPoint < minRps) { - logger.info("Setpoint {} was less than minimum {}. Setting to {}.", minRps, minRps); + logger.info("Setpoint {} was less than minimum {}. Setting to {}.", setPoint, minRps, minRps); setPoint = minRps; } if (setPoint > maxRps) { - logger.info("Setpoint {} was greater than maximum {}. Setting to {}.", maxRps, maxRps); + logger.info("Setpoint {} was greater than maximum {}. Setting to {}.", setPoint, maxRps, maxRps); setPoint = maxRps; } @@ -101,7 +101,7 @@ public ClutchConfiguration apply(Map sketche Tuple2 rope = Tuple.of(setPoint * 0.3, 0.0); // Gain - long deltaT = stageSchedulingInfo.getScalingPolicy().getCoolDownSecs() / 30l; + long deltaT = stageSchedulingInfo.getScalingPolicy().getCoolDownSecs() / 30L; //double minMaxMidPoint = stageSchedulingInfo.getScalingPolicy().getMax() - stageSchedulingInfo.getScalingPolicy().getMin(); double dampeningFactor = 0.33; // 0.4 caused a little oscillation too. We'll try 1/3 across each. diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/clutch/rps/RpsClutchConfigurationSelector.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/clutch/rps/RpsClutchConfigurationSelector.java index 58c505eeb..72d18c6de 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/clutch/rps/RpsClutchConfigurationSelector.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/clutch/rps/RpsClutchConfigurationSelector.java @@ -45,7 +45,7 @@ public ClutchConfiguration apply(Map sketche // Gain - number of ticks within the cooldown period. This is the minimum number of times PID output will accumulate // before an action is taken. - long deltaT = getCooldownSecs() / 30l; + long deltaT = getCooldownSecs() / 30L; double kp = 1.0 / Math.max(setPoint, 1.0) / Math.max(getCumulativeIntegralDivisor(getIntegralScaler(), deltaT), 1.0); double ki = 0.0; diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/control/AdaptiveAutoscaler.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/control/AdaptiveAutoscaler.java index f1aa19e6e..23900d8e0 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/control/AdaptiveAutoscaler.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/control/AdaptiveAutoscaler.java @@ -33,7 +33,7 @@ */ public class AdaptiveAutoscaler implements Observable.Transformer { - private static Logger logger = LoggerFactory.getLogger(AdaptiveAutoscaler.class); + private static final Logger logger = LoggerFactory.getLogger(AdaptiveAutoscaler.class); private final AdaptiveAutoscalerConfig config; private final JobAutoScaler.StageScaler scaler; @@ -59,6 +59,6 @@ public Observable call(Observable metrics) { .lift(new MantisStageActuator(this.initialSize, scaler)) .map(Math::round) .doOnNext(targetScale::set) - .map(x -> (Object) x); // TODO: Necessary? + .map(x -> x); // TODO: Necessary? } } diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/control/AdaptiveAutoscalerConfig.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/control/AdaptiveAutoscalerConfig.java index 04b1cab53..680c1ff68 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/control/AdaptiveAutoscalerConfig.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/control/AdaptiveAutoscalerConfig.java @@ -20,6 +20,7 @@ import io.mantisrx.shaded.com.fasterxml.jackson.core.type.TypeReference; import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; +import java.util.Objects; public class AdaptiveAutoscalerConfig { @@ -103,7 +104,7 @@ public boolean equals(Object o) { final AdaptiveAutoscalerConfig other = (AdaptiveAutoscalerConfig) o; final Object this$metric = this.getMetric(); final Object other$metric = other.getMetric(); - if (this$metric == null ? other$metric != null : !this$metric.equals(other$metric)) return false; + if (!Objects.equals(this$metric, other$metric)) return false; if (Double.compare(this.getSetPoint(), other.getSetPoint()) != 0) return false; if (this.isInvert() != other.isInvert()) return false; if (Double.compare(this.getRope(), other.getRope()) != 0) return false; diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/control/actuators/ClutchMantisStageActuator.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/control/actuators/ClutchMantisStageActuator.java index ab027b704..d949644b7 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/control/actuators/ClutchMantisStageActuator.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/control/actuators/ClutchMantisStageActuator.java @@ -25,7 +25,7 @@ public class ClutchMantisStageActuator implements Observable.Transformer, Double> { - private static Logger logger = LoggerFactory.getLogger(MantisStageActuator.class); + private static final Logger logger = LoggerFactory.getLogger(MantisStageActuator.class); private final JobAutoScaler.StageScaler scaler; public ClutchMantisStageActuator(JobAutoScaler.StageScaler scaler) { @@ -41,7 +41,6 @@ protected Double processStep(Tuple3 tup) { scaler.scaleDownStage(tup._3, desiredNumWorkers, reason); } else if (desiredNumWorkers > tup._3) { scaler.scaleUpStage(tup._3, desiredNumWorkers, reason); - } else { } return desiredNumWorkers * 1.0; diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/control/actuators/MantisStageActuator.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/control/actuators/MantisStageActuator.java index 62968c439..67785423b 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/control/actuators/MantisStageActuator.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/control/actuators/MantisStageActuator.java @@ -29,7 +29,7 @@ */ public class MantisStageActuator extends IActuator { - private static Logger logger = LoggerFactory.getLogger(MantisStageActuator.class); + private static final Logger logger = LoggerFactory.getLogger(MantisStageActuator.class); private final JobAutoScaler.StageScaler scaler; private Long lastValue; @@ -48,7 +48,6 @@ protected Double processStep(Double input) { } else if (desiredNumWorkers > this.lastValue) { scaler.scaleUpStage(lastValue.intValue(), desiredNumWorkers.intValue(), reason); this.lastValue = desiredNumWorkers; - } else { } return desiredNumWorkers * 1.0; diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/control/utils/Integrator.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/control/utils/Integrator.java index a9ae8b0cf..7b38af644 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/control/utils/Integrator.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/control/utils/Integrator.java @@ -53,8 +53,8 @@ public void setSum(double val) { @Override protected Double processStep(Double input) { sum += input; - sum = (sum > max) ? max : sum; - sum = (sum < min) ? min : sum; + sum = Math.min(sum, max); + sum = Math.max(sum, min); return sum; } } diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/mesos/MesosExecutorCallbackHandler.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/mesos/MesosExecutorCallbackHandler.java index a153ca1b6..e848ba9fe 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/mesos/MesosExecutorCallbackHandler.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/mesos/MesosExecutorCallbackHandler.java @@ -41,7 +41,7 @@ public class MesosExecutorCallbackHandler implements Executor { private static final Logger logger = LoggerFactory.getLogger(MesosExecutorCallbackHandler.class); - private Observer executeStageRequestObserver; + private final Observer executeStageRequestObserver; private final JsonSerializer serializer = new JsonSerializer(); public MesosExecutorCallbackHandler(Observer executeStageRequestObserver) { @@ -67,7 +67,7 @@ public void frameworkMessage(ExecutorDriver arg0, byte[] arg1) { @Override public void killTask(ExecutorDriver arg0, TaskID task) { - logger.info("Executor going to kill task " + task.getValue()); + logger.info("Executor going to kill task {}", task.getValue()); executeStageRequestObserver.onCompleted(); waitAndExit(); } @@ -76,13 +76,11 @@ private void waitAndExit() { // Allow some time for clean up and the completion report to be sent out before exiting. // Until we define a better way to exit than to assume that the time we wait here is // sufficient before a hard exit, we will live with it. - Thread t = new Thread() { - @Override - public void run() { - try {sleep(2000);} catch (InterruptedException ie) {} - System.exit(0); - } - }; + Thread t = new Thread(() -> { + try { + Thread.sleep(2000);} catch (InterruptedException ie) {} + System.exit(0); + }); t.setDaemon(true); t.start(); } @@ -127,8 +125,8 @@ public void onError(Throwable e) { @Override public void onNext(List booleans) { - logger.info("onNext called for request failure handler with items: " + - ((booleans == null) ? "-1" : booleans.size())); + logger.info("onNext called for request failure handler with items: {}", + (booleans == null) ? "-1" : booleans.size()); if ((booleans == null) || booleans.isEmpty()) errorHandler.call(); } @@ -138,15 +136,11 @@ public void onNext(List booleans) { @Override public void launchTask(final ExecutorDriver driver, final TaskInfo task) { WrappedExecuteStageRequest request = createExecuteStageRequest(task); - logger.info("Worker for task [" + task.getTaskId().getValue() + "] with startTimeout=" + - request.getRequest().getTimeoutToReportStart()); + logger.info("Worker for task [{}] with startTimeout={}", + task.getTaskId().getValue(), request.getRequest().getTimeoutToReportStart()); + setupRequestFailureHandler(request.getRequest().getTimeoutToReportStart(), request.getRequestSubject(), - new Action0() { - @Override - public void call() { - sendLaunchError(driver, task); - } - }); + () -> sendLaunchError(driver, task)); executeStageRequestObserver.onNext(request); } diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/mesos/MesosMetricsCollector.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/mesos/MesosMetricsCollector.java index 26ed822d4..99b67659a 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/mesos/MesosMetricsCollector.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/mesos/MesosMetricsCollector.java @@ -52,7 +52,7 @@ public class MesosMetricsCollector implements MetricsCollector { .zipWith(Observable.range(1, 3), (Func2) (t1, integer) -> integer) .flatMap((Func1>) integer -> { long delay = 2L; - logger.info(": retrying conx after sleeping for " + delay + " secs"); + logger.info(": retrying conx after sleeping for {} secs", delay); return Observable.timer(delay, TimeUnit.SECONDS); }); @@ -86,12 +86,12 @@ private String getUsageJson() { return RxNetty .createHttpRequest(HttpClientRequest.createGet(url), new HttpClient.HttpClientConfig.Builder() .setFollowRedirect(true).followRedirect(MAX_REDIRECTS).build()) - .lift(new OperatorOnErrorResumeNextViaFunction<>(t -> Observable.error(t))) + .lift(new OperatorOnErrorResumeNextViaFunction<>(Observable::error)) .timeout(GET_TIMEOUT_SECS, TimeUnit.SECONDS) .retryWhen(retryLogic) .flatMap((Func1, Observable>) r -> r.getContent()) .map(o -> o.toString(Charset.defaultCharset())) - .doOnError(throwable -> logger.warn("Can't get resource usage from mesos slave endpoint (" + url + ") - " + throwable.getMessage(), throwable)) + .doOnError(throwable -> logger.warn("Can't get resource usage from mesos slave endpoint ({}) - {}", url, throwable.getMessage(), throwable)) .toBlocking() .firstOrDefault(""); } diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/mesos/VirtualMachineTaskStatus.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/mesos/VirtualMachineTaskStatus.java index 82c845c60..e5dac9e93 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/mesos/VirtualMachineTaskStatus.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/mesos/VirtualMachineTaskStatus.java @@ -18,9 +18,9 @@ public class VirtualMachineTaskStatus { - private String taskId; - private TYPE type; - private String message; + private final String taskId; + private final TYPE type; + private final String message; public VirtualMachineTaskStatus(String taskId, TYPE type, String message) { this.taskId = taskId; this.type = type; @@ -40,6 +40,6 @@ public TYPE getType() { } public enum TYPE { - STARTED, COMPLETED, ERROR; + STARTED, COMPLETED, ERROR } } diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/mesos/VirualMachineWorkerServiceMesosImpl.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/mesos/VirualMachineWorkerServiceMesosImpl.java index e1622729b..adf726933 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/mesos/VirualMachineWorkerServiceMesosImpl.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/mesos/VirualMachineWorkerServiceMesosImpl.java @@ -22,7 +22,6 @@ import io.mantisrx.server.worker.mesos.VirtualMachineTaskStatus.TYPE; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; import org.apache.mesos.MesosExecutorDriver; import org.apache.mesos.Protos; import org.apache.mesos.Protos.TaskID; @@ -32,28 +31,24 @@ import org.slf4j.LoggerFactory; import rx.Observable; import rx.Observer; -import rx.functions.Action1; public class VirualMachineWorkerServiceMesosImpl extends BaseService implements VirtualMachineWorkerService { private static final Logger logger = LoggerFactory.getLogger(VirualMachineWorkerServiceMesosImpl.class); private MesosExecutorDriver mesosDriver; - private ExecutorService executor; - private Observer executeStageRequestObserver; - private Observable vmTaskStatusObservable; + private final ExecutorService executor; + private final Observer executeStageRequestObserver; + private final Observable vmTaskStatusObservable; public VirualMachineWorkerServiceMesosImpl(Observer executeStageRequestObserver, Observable vmTaskStatusObservable) { this.executeStageRequestObserver = executeStageRequestObserver; this.vmTaskStatusObservable = vmTaskStatusObservable; - executor = Executors.newSingleThreadExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "vm_worker_mesos_executor_thread"); - t.setDaemon(true); - return t; - } + executor = Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r, "vm_worker_mesos_executor_thread"); + t.setDaemon(true); + return t; }); } @@ -63,33 +58,31 @@ public void start() { mesosDriver = new MesosExecutorDriver(new MesosExecutorCallbackHandler(executeStageRequestObserver)); // launch driver on background thread logger.info("launch driver on background thread"); - executor.execute(new Runnable() { - @Override - public void run() { - try { - mesosDriver.run(); - } catch (Exception e) { - logger.error("Failed to register Mantis Worker with Mesos executor callbacks", e); - } + executor.execute(() -> { + try { + mesosDriver.run(); + } catch (Exception e) { + logger.error("Failed to register Mantis Worker with Mesos executor callbacks", e); } }); // subscribe to vm task updates on current thread logger.info("subscribe to vm task updates on current thread"); - vmTaskStatusObservable.subscribe(new Action1() { - @Override - public void call(VirtualMachineTaskStatus vmTaskStatus) { - TYPE type = vmTaskStatus.getType(); - if (type == TYPE.COMPLETED) { - Protos.Status status = mesosDriver.sendStatusUpdate(TaskStatus.newBuilder() - .setTaskId(TaskID.newBuilder().setValue(vmTaskStatus.getTaskId()).build()) - .setState(TaskState.TASK_FINISHED).build()); - logger.info("Sent COMPLETED state to mesos, driver status=" + status); - } else if (type == TYPE.STARTED) { - Protos.Status status = mesosDriver.sendStatusUpdate(TaskStatus.newBuilder() - .setTaskId(TaskID.newBuilder().setValue(vmTaskStatus.getTaskId()).build()) - .setState(TaskState.TASK_RUNNING).build()); - logger.info("Sent RUNNING state to mesos, driver status=" + status); - } + vmTaskStatusObservable.subscribe(vmTaskStatus -> { + Protos.Status status; + TYPE type = vmTaskStatus.getType(); + switch (type) { + case COMPLETED: + status = mesosDriver.sendStatusUpdate(TaskStatus.newBuilder() + .setTaskId(TaskID.newBuilder().setValue(vmTaskStatus.getTaskId()).build()) + .setState(TaskState.TASK_FINISHED).build()); + logger.info("Sent COMPLETED state to mesos, driver status={}", status); + break; + case STARTED: + status = mesosDriver.sendStatusUpdate(TaskStatus.newBuilder() + .setTaskId(TaskID.newBuilder().setValue(vmTaskStatus.getTaskId()).build()) + .setState(TaskState.TASK_RUNNING).build()); + logger.info("Sent RUNNING state to mesos, driver status={}", status); + break; } }); } diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/scheduling/JobSchedulingTracker.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/scheduling/JobSchedulingTracker.java index f3f9c8483..66801ee78 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/scheduling/JobSchedulingTracker.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/scheduling/JobSchedulingTracker.java @@ -20,7 +20,6 @@ import io.mantisrx.server.core.JobSchedulingInfo; import io.mantisrx.server.core.WorkerAssignments; import io.mantisrx.server.core.WorkerHost; -import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +31,7 @@ public class JobSchedulingTracker { private static final Logger logger = LoggerFactory.getLogger(JobSchedulingTracker.class); - private Observable schedulingChangesForJobId; + private final Observable schedulingChangesForJobId; public JobSchedulingTracker(Observable schedulingChangesForJobId) { this.schedulingChangesForJobId = schedulingChangesForJobId; @@ -41,63 +40,38 @@ public JobSchedulingTracker(Observable schedulingChangesForJo public Observable startedWorkersPerIndex(int stageNumber) { Observable workerIndexChanges = workerIndexChanges(stageNumber); return workerIndexChanges - .filter(new Func1() { - @Override - public Boolean call(WorkerIndexChange newWorkerChange) { - return (newWorkerChange.getNewState().getState() - == MantisJobState.Started); - } - }); + .filter(newWorkerChange -> (newWorkerChange.getNewState().getState() + == MantisJobState.Started)); } public Observable workerIndexChanges(int stageNumber) { return workerChangesForStage(stageNumber, schedulingChangesForJobId) // flatmap over all numbered workers - .flatMap(new Func1>() { - @Override - public Observable call(WorkerAssignments assignments) { - logger.info("Received scheduling update from master: " + assignments); - return Observable.from(assignments.getHosts().values()); - } + .flatMap((Func1>) assignments -> { + logger.info("Received scheduling update from master: {}", assignments); + return Observable.from(assignments.getHosts().values()); }) // group by index - .groupBy(new Func1() { - @Override - public Integer call(WorkerHost workerHost) { - return workerHost.getWorkerIndex(); - } - }) + .groupBy(WorkerHost::getWorkerIndex) // - .flatMap(new Func1, Observable>() { - @Override - public Observable call( - final GroupedObservable workerIndexGroup) { - // seed sequence, to support buffer by 2 - return - workerIndexGroup.startWith(new WorkerHost(null, -1, null, null, -1, -1, -1)) - .buffer(2, 1) // create pair to compare prev and curr - .filter(new Func1, Boolean>() { - @Override - public Boolean call(List currentAndPrevious) { - if (currentAndPrevious.size() < 2) { - return false; // not a pair, last element - // has already been evaluated on last iteration - // for example: 1,2,3,4,5 = (1,2),(2,3),(3,4),(4,5),(5) - } - WorkerHost previous = currentAndPrevious.get(0); - WorkerHost current = currentAndPrevious.get(1); - return (previous.getWorkerNumber() != current.getWorkerNumber()); - } - }) - .map(new Func1, WorkerIndexChange>() { - @Override - public WorkerIndexChange call(List list) { - return new WorkerIndexChange(workerIndexGroup.getKey(), - list.get(1), list.get(0)); - } - }); - } + .flatMap((Func1, Observable>) workerIndexGroup -> { + // seed sequence, to support buffer by 2 + return + workerIndexGroup.startWith(new WorkerHost(null, -1, null, null, -1, -1, -1)) + .buffer(2, 1) // create pair to compare prev and curr + .filter(currentAndPrevious -> { + if (currentAndPrevious.size() < 2) { + return false; // not a pair, last element + // has already been evaluated on last iteration + // for example: 1,2,3,4,5 = (1,2),(2,3),(3,4),(4,5),(5) + } + WorkerHost previous = currentAndPrevious.get(0); + WorkerHost current = currentAndPrevious.get(1); + return (previous.getWorkerNumber() != current.getWorkerNumber()); + }) + .map(list -> new WorkerIndexChange(workerIndexGroup.getKey(), + list.get(1), list.get(0))); }); } @@ -105,23 +79,15 @@ private Observable workerChangesForStage(final int stageNumbe Observable schedulingUpdates) { return schedulingUpdates // pull out worker assignments from jobSchedulingInfo - .flatMap(new Func1>() { - @Override - public Observable call(JobSchedulingInfo schedulingChange) { - Map assignments = schedulingChange.getWorkerAssignments(); - if (assignments != null && !assignments.isEmpty()) { - return Observable.from(assignments.values()); - } else { - return Observable.empty(); - } + .flatMap((Func1>) schedulingChange -> { + Map assignments = schedulingChange.getWorkerAssignments(); + if (assignments != null && !assignments.isEmpty()) { + return Observable.from(assignments.values()); + } else { + return Observable.empty(); } }) // return only changes from previous stage - .filter(new Func1() { - @Override - public Boolean call(WorkerAssignments assignments) { - return (assignments.getStage() == stageNumber); - } - }); + .filter(assignments -> (assignments.getStage() == stageNumber)); } } diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/scheduling/WorkerIndexChange.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/scheduling/WorkerIndexChange.java index 9ea56d3ff..c115feb27 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/scheduling/WorkerIndexChange.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/scheduling/WorkerIndexChange.java @@ -21,9 +21,9 @@ public class WorkerIndexChange { - private int workerIndex; - private WorkerHost newState; - private WorkerHost oldState; + private final int workerIndex; + private final WorkerHost newState; + private final WorkerHost oldState; public WorkerIndexChange(int workerIndex, WorkerHost newState, WorkerHost oldState) { diff --git a/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/DataDroppedPayloadSetterTest.java b/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/DataDroppedPayloadSetterTest.java index 532629093..b3f8e9e01 100644 --- a/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/DataDroppedPayloadSetterTest.java +++ b/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/DataDroppedPayloadSetterTest.java @@ -37,7 +37,7 @@ public class DataDroppedPayloadSetterTest { private static final Logger logger = LoggerFactory.getLogger(DataDroppedPayloadSetterTest.class); @Test - public void testAggregateDropOperatorMetrics() throws Exception { + public void testAggregateDropOperatorMetrics() { SpectatorRegistryFactory.setRegistry(new DefaultRegistry()); Heartbeat heartbeat = new Heartbeat("job-1", 1, 1, 1); DataDroppedPayloadSetter payloadSetter = new DataDroppedPayloadSetter(heartbeat); diff --git a/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/HeartbeatTest.java b/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/HeartbeatTest.java index eeb770374..db23414cc 100644 --- a/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/HeartbeatTest.java +++ b/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/HeartbeatTest.java @@ -16,33 +16,40 @@ package io.mantisrx.server.worker; +import static org.junit.Assert.assertEquals; + import io.mantisrx.server.core.Status; import io.mantisrx.server.core.StatusPayloads; import java.util.List; -import junit.framework.Assert; import org.junit.Test; +import org.slf4j.Logger; public class HeartbeatTest { + private static final Logger logger = org.slf4j.LoggerFactory.getLogger(HeartbeatTest.class); @Test - public void testSingleUsePayloads() throws Exception { + public void testSingleUsePayloads() { Heartbeat heartbeat = new Heartbeat("Jobcluster-123", 1, 0, 0); heartbeat.setPayload("" + StatusPayloads.Type.SubscriptionState, "true"); int val1 = 10; int val2 = 12; heartbeat.addSingleUsePayload("" + StatusPayloads.Type.IncomingDataDrop, "" + val1); heartbeat.addSingleUsePayload("" + StatusPayloads.Type.IncomingDataDrop, "" + val2); - final Status currentHeartbeatStatus = heartbeat.getCurrentHeartbeatStatus(); - List payloads = currentHeartbeatStatus.getPayloads(); - Assert.assertEquals(2, payloads.size()); + List payloads = heartbeat.getCurrentHeartbeatStatus().getPayloads(); + + logger.debug("Current Payloads: {}", payloads); + assertEquals(2, payloads.size()); + int value = 0; for (Status.Payload p : payloads) { if (StatusPayloads.Type.valueOf(p.getType()) == StatusPayloads.Type.IncomingDataDrop) value = Integer.parseInt(p.getData()); } - Assert.assertEquals(val2, value); + assertEquals(val2, value); + payloads = heartbeat.getCurrentHeartbeatStatus().getPayloads(); - Assert.assertEquals(1, payloads.size()); + logger.debug("Payloads after draining single-use payloads: {}", payloads); + assertEquals(1, payloads.size()); } } diff --git a/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStageTest.java b/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStageTest.java index 81b29cb02..0f49c7eb9 100644 --- a/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStageTest.java +++ b/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStageTest.java @@ -16,7 +16,8 @@ package io.mantisrx.server.worker; -import static junit.framework.Assert.assertEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import io.mantisrx.runtime.MantisJobDurationType; @@ -61,7 +62,7 @@ public void convertJobSchedulingInfoToWorkerMapTest() { List workersForStage1 = workerMap.getWorkersForStage(1); - assertTrue(workersForStage1 != null); + assertNotNull(workersForStage1); assertEquals(2, workersForStage1.size()); for (int i = 0; i < workersForStage1.size(); i++) { @@ -75,7 +76,7 @@ public void convertJobSchedulingInfoToWorkerMapTest() { List workersForStage2 = workerMap.getWorkersForStage(2); - assertTrue(workersForStage2 != null); + assertNotNull(workersForStage2); assertEquals(4, workersForStage2.size()); for (int i = 0; i < workersForStage2.size(); i++) { @@ -156,16 +157,11 @@ WorkerAssignments createWorkerAssignments(int stageNo, int noWorkers) { @Test public void deferTest() throws InterruptedException { - Subscription subscribe1 = getObs4().subscribeOn(Schedulers.io()).subscribe((t) -> { - System.out.println("In 1 -> " + t); - }); + Subscription subscribe1 = getObs4().subscribeOn(Schedulers.io()).subscribe((t) -> System.out.println("In 1 -> " + t)); Thread.sleep(5000); - Subscription subscribe2 = getObs4().subscribeOn(Schedulers.io()).subscribe((t) -> { - System.out.println("In 2 -> " + t); - }); - + Subscription subscribe2 = getObs4().subscribeOn(Schedulers.io()).subscribe((t) -> System.out.println("In 2 -> " + t)); Thread.sleep(5000); subscribe1.unsubscribe(); @@ -174,69 +170,49 @@ public void deferTest() throws InterruptedException { subscribe2.unsubscribe(); Thread.sleep(5000); - Subscription subscribe3 = getObs4().subscribeOn(Schedulers.io()).subscribe((t) -> { - System.out.println("In 3 -> " + t); - }); + Subscription subscribe3 = getObs4().subscribeOn(Schedulers.io()).subscribe((t) -> System.out.println("In 3 -> " + t)); Thread.sleep(5000); subscribe3.unsubscribe(); Thread.sleep(10000); } Observable getObs() { - Observable oLong = Observable.defer(() -> { - return Observable.interval(1, TimeUnit.SECONDS).doOnNext((e) -> { - System.out.println("Minted " + e); - }).share(); - }).doOnSubscribe(() -> { - System.out.println("Subscribed111" + System.currentTimeMillis()); - }).doOnUnsubscribe(() -> { - System.out.println("UnSubscribed111" + System.currentTimeMillis()); - }); + Observable oLong = + Observable.defer(() -> Observable.interval(1, TimeUnit.SECONDS) + .doOnNext((e) -> System.out.println("Minted " + e)).share()) + .doOnSubscribe(() -> System.out.println("Subscribed111" + System.currentTimeMillis())) + .doOnUnsubscribe(() -> System.out.println("UnSubscribed111" + System.currentTimeMillis())); return oLong; } Observable getObs2() { return Observable.interval(1, TimeUnit.SECONDS) - .doOnNext((e) -> { - System.out.println("Minted " + e); - }) + .doOnNext((e) -> System.out.println("Minted " + e)) .share() - .doOnSubscribe(() -> { - System.out.println("Subscribed111" + System.currentTimeMillis()); - }).doOnUnsubscribe(() -> { - System.out.println("UnSubscribed111" + System.currentTimeMillis()); - }) - - ; + .doOnSubscribe(() -> System.out.println("Subscribed111" + System.currentTimeMillis())) + .doOnUnsubscribe(() -> System.out.println("UnSubscribed111" + System.currentTimeMillis())); } Observable getObs3() { - return Observable.range(1, 100).doOnNext((e) -> { - System.out.println("Minted " + e); - }).map((i) -> { - return new Long(i); - }).share() - .doOnSubscribe(() -> { - System.out.println("Subscribed111" + System.currentTimeMillis()); - }).doOnUnsubscribe(() -> { - System.out.println("UnSubscribed111" + System.currentTimeMillis()); - }); + return Observable.range(1, 100) + .doOnNext((e) -> System.out.println("Minted " + e)) + .map(Long::new) + .share() + .doOnSubscribe(() -> System.out.println("Subscribed111" + System.currentTimeMillis())) + .doOnUnsubscribe(() -> System.out.println("UnSubscribed111" + System.currentTimeMillis())); } Observable getObs4() { BehaviorSubject o = BehaviorSubject.create(); - Observable.interval(1, TimeUnit.SECONDS).doOnNext((e) -> { - System.out.println("Minted " + e); - }).doOnSubscribe(() -> { - System.out.println("Subscribed111" + System.currentTimeMillis()); - }).doOnUnsubscribe(() -> { - System.out.println("UnSubscribed111" + System.currentTimeMillis()); - }) - .subscribe(o); + Observable.interval(1, TimeUnit.SECONDS) + .doOnNext((e) -> System.out.println("Minted " + e)) + .doOnSubscribe(() -> System.out.println("Subscribed111" + System.currentTimeMillis())) + .doOnUnsubscribe(() -> System.out.println("UnSubscribed111" + System.currentTimeMillis())) + .subscribe(o); return o; diff --git a/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/AutoScaleMetricsConfigTest.java b/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/AutoScaleMetricsConfigTest.java index 8fa20f94c..841007482 100644 --- a/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/AutoScaleMetricsConfigTest.java +++ b/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/AutoScaleMetricsConfigTest.java @@ -36,7 +36,7 @@ public void testGenerateSourceJobMetricGroups() { } @Test - public void testGetAggregationAlgoForSourceJobMetrics() throws Exception { + public void testGetAggregationAlgoForSourceJobMetrics() { AutoScaleMetricsConfig config = new AutoScaleMetricsConfig(); AutoScaleMetricsConfig.AggregationAlgo aglo = config.getAggregationAlgo( diff --git a/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/JobAutoScalerTest.java b/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/JobAutoScalerTest.java index 72757e283..aa981b7a3 100644 --- a/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/JobAutoScalerTest.java +++ b/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/JobAutoScalerTest.java @@ -44,13 +44,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; import rx.Observer; -import rx.functions.Func1; public class JobAutoScalerTest { @@ -107,12 +105,13 @@ public void testScaleUp() throws InterruptedException { // retry sending auto scale event till scaleJobStage request sent to master, as there is possible a race between the sleep for coolDownSecs in the Test and the event being processed before coolDownSecs final CountDownLatch retryLatch = new CountDownLatch(1); - when(mockMasterClientApi.scaleJobStage(eq(jobId), eq(scalingStageNum), eq(numStage1Workers + 2 * increment), anyString())).thenAnswer(new Answer>() { - @Override - public Observable answer(InvocationOnMock invocation) throws Throwable { - retryLatch.countDown(); - return Observable.just(null); - } + when(mockMasterClientApi.scaleJobStage( + eq(jobId), eq(scalingStageNum), + eq(numStage1Workers + 2 * increment), + anyString())) + .thenAnswer((Answer>) invocation -> { + retryLatch.countDown(); + return Observable.just(null); }); do { logger.info("sending Job auto scale Event"); @@ -152,15 +151,12 @@ public void testScalingResiliency() throws InterruptedException { final CountDownLatch scaleJobStageSuccessLatch = new CountDownLatch(1); final AtomicInteger count = new AtomicInteger(0); - final Observable simulateScaleJobStageFailureResp = Observable.just(1).map(new Func1() { - @Override - public Boolean call(Integer integer) { - if (count.incrementAndGet() < 3) { - throw new IllegalStateException("fake connection exception"); - } else { - scaleJobStageSuccessLatch.countDown(); - return true; - } + final Observable simulateScaleJobStageFailureResp = Observable.just(1).map(integer -> { + if (count.incrementAndGet() < 3) { + throw new IllegalStateException("fake connection exception"); + } else { + scaleJobStageSuccessLatch.countDown(); + return true; } }); when(mockMasterClientApi.scaleJobStage(eq(jobId), eq(scalingStageNum), eq(numStage1Workers + increment), anyString())).thenReturn(simulateScaleJobStageFailureResp); @@ -235,7 +231,7 @@ public void testScaleDown() throws InterruptedException { } @Test - public void testScaleDownNotLessThanMin() throws InterruptedException { + public void testScaleDownNotLessThanMin() { final String jobId = "test-job-1"; final int coolDownSec = 2; final int scalingStageNum = 1; @@ -329,13 +325,15 @@ public void testScaleUpOnDifferentScalingReasons() throws InterruptedException { // retry sending auto scale event till scaleJobStage request sent to master, as there is possible a race between the sleep for coolDownSecs in the Test and the event being processed before coolDownSecs final CountDownLatch retryLatch = new CountDownLatch(1); - when(mockMasterClientApi.scaleJobStage(eq(jobId), eq(scalingStageNum), eq(numStage1Workers + 2 * increment), anyString())).thenAnswer(new Answer>() { - @Override - public Observable answer(InvocationOnMock invocation) throws Throwable { + when(mockMasterClientApi.scaleJobStage( + eq(jobId), + eq(scalingStageNum), + eq(numStage1Workers + 2 * increment), + anyString())) + .thenAnswer((Answer>) invocation -> { retryLatch.countDown(); return Observable.just(null); - } - }); + }); do { logger.info("sending Job auto scale Event"); @@ -348,7 +346,7 @@ public Observable answer(InvocationOnMock invocation) throws Throwable { } @Test - public void testGetClutchConfigurationFromJson() throws Exception { + public void testGetClutchConfigurationFromJson() { String json = "{" + " \"cooldownSeconds\": 100," + " \"integralDecay\": 0.7," + diff --git a/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java b/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java index 6c945cad3..14fd61cb1 100644 --- a/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java +++ b/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java @@ -44,13 +44,11 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; import rx.Observer; -import rx.functions.Func1; public class WorkerMetricHandlerTest { @@ -220,30 +218,24 @@ public void testOutlierResubmitWorks() throws InterruptedException { final CountDownLatch autoScaleLatch = new CountDownLatch(1); when(mockMasterClientApi.schedulingChanges(jobId)).thenReturn(Observable.just(new JobSchedulingInfo(jobId, assignmentsMap))); - when(mockMasterClientApi.resubmitJobWorker(anyString(), anyString(), anyInt(), anyString())).thenAnswer(new Answer>() { - @Override - public Observable answer(InvocationOnMock invocation) throws Throwable { - - final Object[] arguments = invocation.getArguments(); - final String jobIdRecv = (String) arguments[0]; - final String user = (String) arguments[1]; - final int resubmittedWorkerNum = (Integer) arguments[2]; - // final String reason = (String)arguments[3]; - - final Observable result = Observable.just(1).map(new Func1() { - @Override - public Boolean call(Integer integer) { - logger.info("resubmitting worker {} of jobId {}", resubmittedWorkerNum, jobId); - assertEquals(workerNum, resubmittedWorkerNum); - assertEquals(user, "JobMaster"); - assertEquals(jobId, jobIdRecv); - - resubmitLatch.countDown(); - return true; - } - }); - return result; - } + when(mockMasterClientApi.resubmitJobWorker(anyString(), anyString(), anyInt(), anyString())).thenAnswer((Answer>) invocation -> { + + final Object[] arguments = invocation.getArguments(); + final String jobIdRecv = (String) arguments[0]; + final String user = (String) arguments[1]; + final int resubmittedWorkerNum = (Integer) arguments[2]; + // final String reason = (String)arguments[3]; + + final Observable result = Observable.just(1).map(integer -> { + logger.info("resubmitting worker {} of jobId {}", resubmittedWorkerNum, jobId); + assertEquals(workerNum, resubmittedWorkerNum); + assertEquals(user, "JobMaster"); + assertEquals(jobId, jobIdRecv); + + resubmitLatch.countDown(); + return true; + }); + return result; });