Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix compile warnings in mantis server worker #423

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
public class DownloadJob {

private static final Logger logger = LoggerFactory.getLogger(DownloadJob.class);
private URL jobArtifactUrl;
private String jobName;
private String locationToStore;
private final URL jobArtifactUrl;
private final String jobName;
private final String locationToStore;

public DownloadJob(
URL jobArtifactUrl, String jobName,
Expand Down Expand Up @@ -69,7 +69,7 @@ public void execute() {
Files.createDirectories(path);
try (OutputStream os = Files.newOutputStream(Paths.get(path.toString(), jarName))) {
byte[] bytes = new byte[2048];
int read = 0;
int read;
while ((read = is.read(bytes)) >= 0) {
os.write(bytes, 0, read);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,10 @@ public ExecuteStageRequestService(
public void start() {
subscription = executeStageRequestObservable
// map to request with status observer
.map(new Func1<WrappedExecuteStageRequest, TrackedExecuteStageRequest>() {
@Override
public TrackedExecuteStageRequest call(
WrappedExecuteStageRequest executeRequest) {
PublishSubject<Status> statusSubject = PublishSubject.create();
tasksStatusObserver.onNext(statusSubject);
return new TrackedExecuteStageRequest(executeRequest, statusSubject);
}
.map(executeRequest -> {
PublishSubject<Status> statusSubject = PublishSubject.create();
tasksStatusObserver.onNext(statusSubject);
return new TrackedExecuteStageRequest(executeRequest, statusSubject);
})
// get provider from jar, return tracked MantisJob
.flatMap(new Func1<TrackedExecuteStageRequest, Observable<ExecutionDetails>>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
@SuppressWarnings("rawtypes") // suppressed due to unknown mantis job typ
public class ExecutionDetails {

private ClassLoader classLoader;
private WrappedExecuteStageRequest executeStageRequest;
private Observer<Status> status;
private Job mantisJob;
private List<Parameter> parameters;
private final ClassLoader classLoader;
private final WrappedExecuteStageRequest executeStageRequest;
private final Observer<Status> status;
private final Job mantisJob;
private final List<Parameter> parameters;

public ExecutionDetails(WrappedExecuteStageRequest executeStageRequest, Observer<Status> status,
Job mantisJob, ClassLoader classLoader, List<Parameter> parameters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ public class MantisWorker extends BaseService {
private static final Logger logger = LoggerFactory.getLogger(MantisWorker.class);
@Argument(alias = "p", description = "Specify a configuration file", required = false)
private static String propFile = "worker.properties";
private CountDownLatch blockUntilShutdown = new CountDownLatch(1);
private final CountDownLatch blockUntilShutdown = new CountDownLatch(1);
private final List<Service> mantisServices = new LinkedList<>();

// static {
// RxNetty.useNativeTransportIfApplicable();
// }
private List<Service> mantisServices = new LinkedList<Service>();

public MantisWorker(ConfigurationFactory configFactory, io.mantisrx.server.master.client.config.ConfigurationFactory coreConfigFactory) {
this(configFactory, Optional.empty());
Expand Down Expand Up @@ -102,12 +102,7 @@ public String toString() {
final MantisMasterGateway gateway =
highAvailabilityServices.getMasterClientApi();
// shutdown hook
Thread t = new Thread() {
@Override
public void run() {
shutdown();
}
};
Thread t = new Thread(this::shutdown);
t.setDaemon(true);
Runtime.getRuntime().addShutdownHook(t);

Expand Down Expand Up @@ -209,7 +204,7 @@ private static Properties loadProperties(String propFile) {
*
* @param resourceName the name of the resource. It can either be a file name, or a path.
* @return An {@link java.io.InputStream} instance that represents the found resource. Null otherwise.
* @throws java.io.FileNotFoundException
* @throws java.io.FileNotFoundException if the resource is not found.
*/
private static InputStream findResourceAsStream(String resourceName) throws FileNotFoundException {
File resource = new File(resourceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ public class RunningWorker {

private static final Logger logger = LoggerFactory.getLogger(RunningWorker.class);
private final int totalStagesNet;
private Action0 onTerminateCallback;
private Action0 onCompleteCallback;
private Action1<Throwable> onErrorCallback;
private CountDownLatch blockUntilTerminate = new CountDownLatch(1);
private Job job;
private SchedulingInfo schedulingInfo;
private StageConfig stage;
private Observer<Status> jobStatus;
private String jobId;
private final Action0 onTerminateCallback;
private final Action0 onCompleteCallback;
private final Action1<Throwable> onErrorCallback;
private final CountDownLatch blockUntilTerminate = new CountDownLatch(1);
private final Job job;
private final SchedulingInfo schedulingInfo;
private final StageConfig stage;
private final Observer<Status> jobStatus;
private final String jobId;
private final int stageNum;
private final int workerNum;
private final int workerIndex;
Expand Down Expand Up @@ -88,37 +88,21 @@ public RunningWorker(Builder builder) {
this.stageTotalWorkersObservable = builder.stageTotalWorkersObservable;
this.jobSchedulingInfoObservable = builder.jobSchedulingInfoObservable;

this.onTerminateCallback = new Action0() {
@Override
public void call() {
blockUntilTerminate.countDown();
}
};
this.onCompleteCallback = new Action0() {
@Override
public void call() {
logger.info("JobId: " + jobId + " stage: " + stageNum + ", completed");
// setup a timeout to call forced exit as sure way to exit
new Thread() {
@Override
public void run() {
try {
sleep(3000);
System.exit(1);
} catch (Exception e) {
logger.error("Ignoring exception during exit: " + e.getMessage(), e);
}
}
}.start();
signalCompleted();
}
};
this.onErrorCallback = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
signalFailed(t);
}
this.onTerminateCallback = blockUntilTerminate::countDown;
this.onCompleteCallback = () -> {
logger.info("JobId: " + jobId + " stage: " + stageNum + ", completed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we replace string concatenation with parameterized / placeholder in logging? It applies to other logging lines in this module.

logger.info("JobId: {} stage: {} completed", jobId, stageNum);

See this doc for advantages: https://www.tutorialspoint.com/slf4j/slf4j_parameterized_logging.htm

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. Fixed in e9ad7a8

// setup a timeout to call forced exit as sure way to exit
new Thread(() -> {
try {
Thread.sleep(3000);
System.exit(1);
} catch (Exception e) {
logger.error("Ignoring exception during exit: " + e.getMessage(), e);
}
}).start();
signalCompleted();
};
this.onErrorCallback = this::signalFailed;
}

private String getWorkerStringPrefix(int stageNum, int index, int number) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

public class TrackedExecuteStageRequest {

private WrappedExecuteStageRequest executeRequest;
private Observer<Status> status;
private final WrappedExecuteStageRequest executeRequest;
private final Observer<Status> status;

public TrackedExecuteStageRequest(WrappedExecuteStageRequest executeRequest,
Observer<Status> status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,13 @@
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.mesos.MesosExecutorDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

Expand All @@ -59,23 +57,20 @@ public class VirtualMachineWorkerServiceLocalImpl extends BaseService implements
private static final Logger logger = LoggerFactory.getLogger(VirtualMachineWorkerServiceLocalImpl.class);
private final WorkerTopologyInfo.Data workerInfo;
private MesosExecutorDriver mesosDriver;
private ExecutorService executor;
private Observer<WrappedExecuteStageRequest> executeStageRequestObserver;
private Observable<VirtualMachineTaskStatus> vmTaskStatusObservable;
private final ExecutorService executor;
private final Observer<WrappedExecuteStageRequest> executeStageRequestObserver;
private final Observable<VirtualMachineTaskStatus> vmTaskStatusObservable;

public VirtualMachineWorkerServiceLocalImpl(final WorkerTopologyInfo.Data workerInfo,
Observer<WrappedExecuteStageRequest> executeStageRequestObserver,
Observable<VirtualMachineTaskStatus> vmTaskStatusObservable) {
this.workerInfo = workerInfo;
this.executeStageRequestObserver = executeStageRequestObserver;
this.vmTaskStatusObservable = vmTaskStatusObservable;
executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "vm_worker_mesos_executor_thread");
t.setDaemon(true);
return t;
}
executor = Executors.newSingleThreadExecutor(r -> {
Thread t = new Thread(r, "vm_worker_mesos_executor_thread");
t.setDaemon(true);
return t;
});
}

Expand Down Expand Up @@ -113,7 +108,7 @@ private WrappedExecuteStageRequest createExecuteStageRequest() throws MalformedU
jobJarUrl, workerInfo.getStageNumber(), workerInfo.getNumStages(), ports, timeoutToReportStartSec, workerInfo.getMetricsPort(), params, schedInfo, MantisJobDurationType.Transient,
0L, 0L, new WorkerPorts(Arrays.asList(7151, 7152, 7153, 7154, 7155)), Optional.empty());

return new WrappedExecuteStageRequest(PublishSubject.<Boolean>create(), executeStageRequest);
return new WrappedExecuteStageRequest(PublishSubject.create(), executeStageRequest);
}

private void setupRequestFailureHandler(long waitSeconds, Observable<Boolean> requestObservable,
Expand Down Expand Up @@ -145,38 +140,27 @@ public void onNext(List<Boolean> booleans) {
@Override
public void start() {
logger.info("Starting VirtualMachineWorkerServiceLocalImpl");
Schedulers.newThread().createWorker().schedule(new Action0() {
@Override
public void call() {
try {
WrappedExecuteStageRequest request = null;
request = createExecuteStageRequest();
setupRequestFailureHandler(request.getRequest().getTimeoutToReportStart(), request.getRequestSubject(),
new Action0() {
@Override
public void call() {
logger.error("launch error");
}
});
logger.info("onNext'ing WrappedExecuteStageRequest: {}", request.toString());
executeStageRequestObserver.onNext(request);
} catch (MalformedURLException e) {
e.printStackTrace();
}
Schedulers.newThread().createWorker().schedule(() -> {
try {
WrappedExecuteStageRequest request;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge the two lines

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you mean to merge them like that
Schedulers.newThread().createWorker().schedule(() -> { try {
It looks more well-formated to me with the line break.

request = createExecuteStageRequest();
setupRequestFailureHandler(request.getRequest().getTimeoutToReportStart(), request.getRequestSubject(),
() -> logger.error("launch error"));
logger.info("onNext'ing WrappedExecuteStageRequest: {}", request);
executeStageRequestObserver.onNext(request);
} catch (MalformedURLException e) {
e.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use logger.error and attach request information

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 4df01c3
But I think the request would be null in this case. Because the exception would be thrown during the initialization of the request.

}
}, 2, TimeUnit.SECONDS);


// subscribe to vm task updates on current thread
vmTaskStatusObservable.subscribe(new Action1<VirtualMachineTaskStatus>() {
@Override
public void call(VirtualMachineTaskStatus vmTaskStatus) {
TYPE type = vmTaskStatus.getType();
if (type == TYPE.COMPLETED) {
logger.info("Got COMPLETED state for " + vmTaskStatus.getTaskId());
} else if (type == TYPE.STARTED) {
logger.info("Would send RUNNING state to mesos, worker started for " + vmTaskStatus.getTaskId());
}
vmTaskStatusObservable.subscribe(vmTaskStatus -> {
TYPE type = vmTaskStatus.getType();
if (type == TYPE.COMPLETED) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's replace this with switch-case

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 4df01c3

logger.info("Got COMPLETED state for " + vmTaskStatus.getTaskId());
} else if (type == TYPE.STARTED) {
logger.info("Would send RUNNING state to mesos, worker started for " + vmTaskStatus.getTaskId());
}
});
}
Expand Down
Loading