-
Notifications
You must be signed in to change notification settings - Fork 200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix compile warnings in mantis server worker #423
base: master
Are you sure you want to change the base?
Changes from 8 commits
fac848d
5cf7a54
ca99649
2f49762
f2ae5b6
d523f38
e2dcdc6
d2008bd
e9ad7a8
4df01c3
0b3695b
4526f92
07ad134
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,15 +40,13 @@ | |
import java.util.Optional; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ThreadFactory; | ||
import java.util.concurrent.TimeUnit; | ||
import org.apache.mesos.MesosExecutorDriver; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import rx.Observable; | ||
import rx.Observer; | ||
import rx.functions.Action0; | ||
import rx.functions.Action1; | ||
import rx.schedulers.Schedulers; | ||
import rx.subjects.PublishSubject; | ||
|
||
|
@@ -59,23 +57,20 @@ public class VirtualMachineWorkerServiceLocalImpl extends BaseService implements | |
private static final Logger logger = LoggerFactory.getLogger(VirtualMachineWorkerServiceLocalImpl.class); | ||
private final WorkerTopologyInfo.Data workerInfo; | ||
private MesosExecutorDriver mesosDriver; | ||
private ExecutorService executor; | ||
private Observer<WrappedExecuteStageRequest> executeStageRequestObserver; | ||
private Observable<VirtualMachineTaskStatus> vmTaskStatusObservable; | ||
private final ExecutorService executor; | ||
private final Observer<WrappedExecuteStageRequest> executeStageRequestObserver; | ||
private final Observable<VirtualMachineTaskStatus> vmTaskStatusObservable; | ||
|
||
public VirtualMachineWorkerServiceLocalImpl(final WorkerTopologyInfo.Data workerInfo, | ||
Observer<WrappedExecuteStageRequest> executeStageRequestObserver, | ||
Observable<VirtualMachineTaskStatus> vmTaskStatusObservable) { | ||
this.workerInfo = workerInfo; | ||
this.executeStageRequestObserver = executeStageRequestObserver; | ||
this.vmTaskStatusObservable = vmTaskStatusObservable; | ||
executor = Executors.newSingleThreadExecutor(new ThreadFactory() { | ||
@Override | ||
public Thread newThread(Runnable r) { | ||
Thread t = new Thread(r, "vm_worker_mesos_executor_thread"); | ||
t.setDaemon(true); | ||
return t; | ||
} | ||
executor = Executors.newSingleThreadExecutor(r -> { | ||
Thread t = new Thread(r, "vm_worker_mesos_executor_thread"); | ||
t.setDaemon(true); | ||
return t; | ||
}); | ||
} | ||
|
||
|
@@ -113,7 +108,7 @@ private WrappedExecuteStageRequest createExecuteStageRequest() throws MalformedU | |
jobJarUrl, workerInfo.getStageNumber(), workerInfo.getNumStages(), ports, timeoutToReportStartSec, workerInfo.getMetricsPort(), params, schedInfo, MantisJobDurationType.Transient, | ||
0L, 0L, new WorkerPorts(Arrays.asList(7151, 7152, 7153, 7154, 7155)), Optional.empty()); | ||
|
||
return new WrappedExecuteStageRequest(PublishSubject.<Boolean>create(), executeStageRequest); | ||
return new WrappedExecuteStageRequest(PublishSubject.create(), executeStageRequest); | ||
} | ||
|
||
private void setupRequestFailureHandler(long waitSeconds, Observable<Boolean> requestObservable, | ||
|
@@ -145,38 +140,27 @@ public void onNext(List<Boolean> booleans) { | |
@Override | ||
public void start() { | ||
logger.info("Starting VirtualMachineWorkerServiceLocalImpl"); | ||
Schedulers.newThread().createWorker().schedule(new Action0() { | ||
@Override | ||
public void call() { | ||
try { | ||
WrappedExecuteStageRequest request = null; | ||
request = createExecuteStageRequest(); | ||
setupRequestFailureHandler(request.getRequest().getTimeoutToReportStart(), request.getRequestSubject(), | ||
new Action0() { | ||
@Override | ||
public void call() { | ||
logger.error("launch error"); | ||
} | ||
}); | ||
logger.info("onNext'ing WrappedExecuteStageRequest: {}", request.toString()); | ||
executeStageRequestObserver.onNext(request); | ||
} catch (MalformedURLException e) { | ||
e.printStackTrace(); | ||
} | ||
Schedulers.newThread().createWorker().schedule(() -> { | ||
try { | ||
WrappedExecuteStageRequest request; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. merge the two lines There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you mean to merge them like that |
||
request = createExecuteStageRequest(); | ||
setupRequestFailureHandler(request.getRequest().getTimeoutToReportStart(), request.getRequestSubject(), | ||
() -> logger.error("launch error")); | ||
logger.info("onNext'ing WrappedExecuteStageRequest: {}", request); | ||
executeStageRequestObserver.onNext(request); | ||
} catch (MalformedURLException e) { | ||
e.printStackTrace(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's use logger.error and attach request information There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in 4df01c3 |
||
} | ||
}, 2, TimeUnit.SECONDS); | ||
|
||
|
||
// subscribe to vm task updates on current thread | ||
vmTaskStatusObservable.subscribe(new Action1<VirtualMachineTaskStatus>() { | ||
@Override | ||
public void call(VirtualMachineTaskStatus vmTaskStatus) { | ||
TYPE type = vmTaskStatus.getType(); | ||
if (type == TYPE.COMPLETED) { | ||
logger.info("Got COMPLETED state for " + vmTaskStatus.getTaskId()); | ||
} else if (type == TYPE.STARTED) { | ||
logger.info("Would send RUNNING state to mesos, worker started for " + vmTaskStatus.getTaskId()); | ||
} | ||
vmTaskStatusObservable.subscribe(vmTaskStatus -> { | ||
TYPE type = vmTaskStatus.getType(); | ||
if (type == TYPE.COMPLETED) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's replace this with switch-case There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in 4df01c3 |
||
logger.info("Got COMPLETED state for " + vmTaskStatus.getTaskId()); | ||
} else if (type == TYPE.STARTED) { | ||
logger.info("Would send RUNNING state to mesos, worker started for " + vmTaskStatus.getTaskId()); | ||
} | ||
}); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we replace string concatenation with parameterized / placeholder in logging? It applies to other logging lines in this module.
See this doc for advantages: https://www.tutorialspoint.com/slf4j/slf4j_parameterized_logging.htm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion. Fixed in e9ad7a8