Skip to content

Commit

Permalink
support inputformat cache in local disk when running mutil epochs
Browse files Browse the repository at this point in the history
  • Loading branch information
jiarunying committed Jan 30, 2018
1 parent 290de39 commit 8555c75
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
12 changes: 12 additions & 0 deletions src/main/java/net/qihoo/xlearning/conf/XLearningConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,18 @@ public XLearningConfiguration(Configuration conf) {

public static final Boolean DEFAULT_XLEARNING_INPUT_STREAM_SHUFFLE = false;

public static final String XLEARNING_INPUTFORMAT_CACHESIZE_LIMIT= "xlearning.inputformat.cachesize.limit";

public static final int DEFAULT_XLEARNING_INPUTFORMAT_CACHESIZE_LIMIT = 100 * 1024;

public static final String XLEARNING_INPUTFORMAT_CACHE = "xlearning.inputformat.cache";

public static final boolean DEFAULT_XLEARNING_INPUTFORMAT_CACHE = false;

public static final String XLEARNING_INPUTFORMAT_CACHEFILE_NAME = "xlearning.inputformat.cachefile.name";

public static final String DEFAULT_XLEARNING_INPUTFORMAT_CACHEFILE_NAME = "inputformatCache.gz";

public static final String XLEARNING_INTERREAULST_DIR = "xlearning.interresult.dir";

public static final String DEFAULT_XLEARNING_INTERRESULT_DIR = "/interResult_";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.*;
import java.util.concurrent.*;
import java.text.SimpleDateFormat;
import java.util.zip.GZIPOutputStream;

public class XLearningContainer {

Expand Down Expand Up @@ -597,6 +598,9 @@ private Boolean run() throws IOException {
public void run() {
try {
OutputStreamWriter osw = new OutputStreamWriter(xlearningProcess.getOutputStream());
File gzFile = new File(conf.get(XLearningConfiguration.XLEARNING_INPUTFORMAT_CACHEFILE_NAME, XLearningConfiguration.DEFAULT_XLEARNING_INPUTFORMAT_CACHEFILE_NAME));
GZIPOutputStream gos = new GZIPOutputStream(new FileOutputStream(gzFile));
boolean isCache = conf.getBoolean(XLearningConfiguration.XLEARNING_INPUTFORMAT_CACHE, XLearningConfiguration.DEFAULT_XLEARNING_INPUTFORMAT_CACHE);
List<InputSplit> inputs = Arrays.asList(amClient.getStreamInputSplit(containerId));
JobConf jobConf = new JobConf(conf);
RecordReader reader;
Expand All @@ -618,16 +622,35 @@ public void run() {
}
osw.write(value.toString());
osw.write("\n");
if(j == 0 && isCache) {
if(conf.getInt(XLearningConfiguration.XLEARNING_STREAM_EPOCH, XLearningConfiguration.DEFAULT_XLEARNING_STREAM_EPOCH) > 1) {
gos.write(value.toString().getBytes());
gos.write("\n".getBytes());

if((gzFile.length() / 1024 / 1024) > conf.getInt(XLearningConfiguration.XLEARNING_INPUTFORMAT_CACHESIZE_LIMIT, XLearningConfiguration.DEFAULT_XLEARNING_INPUTFORMAT_CACHESIZE_LIMIT)) {
LOG.info("Inputformat cache file size is:" + gzFile.length() / 1024 / 1024 + "M "
+ "beyond the limit size:" + conf.getInt(XLearningConfiguration.XLEARNING_INPUTFORMAT_CACHESIZE_LIMIT, XLearningConfiguration.DEFAULT_XLEARNING_INPUTFORMAT_CACHESIZE_LIMIT) + "M.");
gzFile.delete();
LOG.info("Local cache file deleted and will not use cache.");
isCache = false;
}
}
}
} catch (EOFException e) {
finished = true;
e.printStackTrace();
}
}
reader.close();
LOG.info("split " + (i + 1) + " is finished.");
}
LOG.info("Epoch " + (j + 1) + " finished.");
if(isCache) {
break;
}
}
osw.close();
gos.close();
} catch (Exception e) {
LOG.warn("Exception in thread stdinRedirectThread");
e.printStackTrace();
Expand Down Expand Up @@ -754,7 +777,7 @@ public void run() {
}
}
String boardUrl = "http://" + boardHost + ":" + boardPort;
LOG.info("Executing borad command:" + boardCommand);
LOG.info("Executing board command:" + boardCommand);
boardReservedSocket.close();
try {
final Process boardProcess = rt.exec(boardCommand, env);
Expand Down

0 comments on commit 8555c75

Please sign in to comment.