Skip to content

Commit

Permalink
Merge branch 'develop' into feature/analysis
Browse files Browse the repository at this point in the history
  • Loading branch information
jtarraga committed Jun 1, 2017
2 parents 39f7008 + e8d3720 commit 6d5333d
Show file tree
Hide file tree
Showing 26 changed files with 1,498 additions and 503 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.opencb.hpg.bigdata.analysis.exceptions;

/**
* Created by pfurio on 23/05/17.
*/
public class AnalysisException extends Exception {

public AnalysisException(String message) {
super(message);
}

public AnalysisException(String message, Throwable cause) {
super(message, cause);
}

public AnalysisException(Throwable cause) {
super(cause);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.opencb.hpg.bigdata.analysis.exceptions;

/**
* Created by pfurio on 23/05/17.
*/
public class AnalysisToolException extends AnalysisException {

public AnalysisToolException(String message) {
super(message);
}

public AnalysisToolException(String message, Throwable cause) {
super(message, cause);
}

public AnalysisToolException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package org.opencb.hpg.bigdata.analysis.tools;

import org.codehaus.jackson.map.ObjectMapper;
import org.opencb.commons.exec.Command;
import org.opencb.commons.exec.RunnableProcess;
import org.opencb.hpg.bigdata.analysis.exceptions.AnalysisToolException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Path;

/**
* Created by pfurio on 24/05/17.
*/
public class Executor {

private static Logger logger = LoggerFactory.getLogger(Executor.class);
private static ObjectMapper objectMapper = new ObjectMapper();
private static int threadInitNumber;
private static volatile String status;

protected static void execute(String commandLine, Path outdir) throws AnalysisToolException {
if (!outdir.toFile().isDirectory()) {
throw new AnalysisToolException("Output directory " + outdir + " is not an actual directory");
}
if (!outdir.toFile().canWrite()) {
throw new AnalysisToolException("Cannot write on output directory " + outdir);
}

try {
status = Status.RUNNING;
Command com = new Command(commandLine);

DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(outdir.resolve("stdout.txt").toFile()));
com.setOutputOutputStream(dataOutputStream);

dataOutputStream = new DataOutputStream(new FileOutputStream(outdir.resolve("stderr.txt").toFile()));
com.setErrorOutputStream(dataOutputStream);

ExecutorMonitor monitor = new ExecutorMonitor();
// Thread thread = new Thread(statusProcess, "StatusThread-" + nextThreadNum());

Thread hook = new Thread(() -> {
// status = Status.ERROR;
monitor.stop(new Status(Status.ERROR));
logger.info("Running ShutdownHook. Tool execution has being aborted.");
com.setStatus(RunnableProcess.Status.KILLED);
com.setExitValue(-2);
closeOutputStreams(com);
});

logger.info("==========================================");
logger.info("Executing tool");
logger.debug("Executing commandLine {}", commandLine);
logger.info("==========================================");
System.err.println();

Runtime.getRuntime().addShutdownHook(hook);
monitor.start(outdir);
com.run();
Runtime.getRuntime().removeShutdownHook(hook);
monitor.stop(new Status(Status.DONE));
// status = Status.DONE;

System.err.println();
logger.info("==========================================");
logger.info("Finished tool execution");
logger.info("==========================================");

closeOutputStreams(com);
} catch (FileNotFoundException e) {
logger.error("Could not create the output/error files", e);
}
}

private static void closeOutputStreams(Command com) {
/** Close output streams **/
if (com.getOutputOutputStream() != null) {
try {
com.getOutputOutputStream().close();
} catch (IOException e) {
logger.warn("Error closing OutputStream", e);
}
com.setOutputOutputStream(null);
com.setOutput(null);
}
if (com.getErrorOutputStream() != null) {
try {
com.getErrorOutputStream().close();
} catch (IOException e) {
logger.warn("Error closing OutputStream", e);
}
com.setErrorOutputStream(null);
com.setError(null);
}
}

private static synchronized int nextThreadNum() {
return threadInitNumber++;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package org.opencb.hpg.bigdata.analysis.tools;

import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Path;

/**
* Created by pfurio on 31/05/17.
*/
public class ExecutorMonitor {

private static Logger logger = LoggerFactory.getLogger(Executor.class);
private static ObjectMapper objectMapper = new ObjectMapper();
private static int threadInitNumber;

private Path directory;
private final int TIME = 5000;
private Thread myThread;

public void start(Path directory) {
if (!directory.toFile().isDirectory()) {
logger.error("Output directory {} is not an actual directory", directory);
return;
}
if (!directory.toFile().canWrite()) {
logger.error("Cannot write on output directory {}", directory);
return;
}

this.directory = directory;

Runnable statusProcess = () -> {
// Create hook to write status with ERROR
Thread hook = new Thread(() -> {
Status statusObject = new Status(Status.ERROR);
try {
objectMapper.writer().writeValue(this.directory.resolve("status.json").toFile(), statusObject);
} catch (IOException e) {
logger.error("Could not write status {} to file", Status.ERROR, e);
}
});
Runtime.getRuntime().addShutdownHook(hook);

// Keep writing in the status file while running to update current time
Status statusObject = new Status(Status.RUNNING);
while (!Thread.currentThread().isInterrupted()) {
try {
statusObject.setCurrentDate();
objectMapper.writer().writeValue(this.directory.resolve("status.json").toFile(), statusObject);
} catch (IOException e) {
logger.error("Could not write status {} to file", Status.RUNNING, e);
}

try {
Thread.sleep(this.TIME);
} catch (InterruptedException e) {
logger.error("{}", e.getMessage(), e);
Thread.currentThread().interrupt();
break;
}
}

Runtime.getRuntime().removeShutdownHook(hook);
};

myThread = new Thread(statusProcess, "StatusThread-" + nextThreadNum());
myThread.start();
}

public void stop(Status status) {
if (this.directory == null) {
return;
}

// Interrupt and wait for the thread
myThread.interrupt();
try {
myThread.join();
} catch (InterruptedException e) {
logger.error("Thread is alive? {}", myThread.isAlive() ? "true" : "false");
}

// Write the status
try {
objectMapper.writer().writeValue(this.directory.resolve("status.json").toFile(), status);
} catch (IOException e) {
logger.error("Could not write status {} to file", status.getName(), e);
}
}

private static synchronized int nextThreadNum() {
return threadInitNumber++;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.opencb.hpg.bigdata.analysis.tools;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
* Created by pfurio on 24/05/17.
*/

public class Status {

public static final String RUNNING = "RUNNING";
/**
* DONE status means that the job has finished the execution, but the output is still not ready.
*/
public static final String DONE = "DONE";
/**
* ERROR status means that the job finished with an error.
*/
public static final String ERROR = "ERROR";

private String name;
private String date;
private String message;

public Status() {
this(RUNNING, "");
}

public Status(String name) {
this(name, "");
}

public Status(String name, String message) {
this.name = name;
this.date = getCurrentTime();
this.message = message;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getDate() {
return date;
}

public void setDate(String date) {
this.date = date;
}

public void setCurrentDate() {
this.date = getCurrentTime();
}

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Status{");
sb.append("name='").append(name).append('\'');
sb.append(", date='").append(date).append('\'');
sb.append(", message='").append(message).append('\'');
sb.append('}');
return sb.toString();
}

private String getCurrentTime() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
return sdf.format(new Date());
}

}
Loading

0 comments on commit 6d5333d

Please sign in to comment.