Skip to content

Commit

Permalink
Merge pull request #5828 from mtbc/file-upload-in-parallel
Browse files Browse the repository at this point in the history
for CLI import add --parallel-{upload,fileset} options
  • Loading branch information
joshmoore authored Aug 31, 2018
2 parents 668fa1d + d39eaf6 commit 040837b
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 58 deletions.
6 changes: 5 additions & 1 deletion components/blitz/src/ome/formats/importer/ImportConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ public class ImportConfig {
public final BoolValue sendReport;
public final BoolValue sendFiles;
public final BoolValue sendLogFile;
public final IntValue parallelUpload;
public final IntValue parallelFileset;
public final StrValue qaBaseURL;
public final BoolValue checkUpgrade;

Expand Down Expand Up @@ -275,13 +277,15 @@ public synchronized void load() {
sendReport = new BoolValue("sendReport", this, false);
sendFiles = new BoolValue("sendFiles", this, true);
sendLogFile = new BoolValue("sendLogFile", this, true);
parallelUpload = new IntValue("parallelUpload", this, 1);
parallelFileset = new IntValue("parallelFileset", this, 1);

useFullPath = new BoolValue("useFullPath", this, true);
useCustomImageNaming = new BoolValue("overrideImageName", this, true);
numOfDirectories = new IntValue("numOfDirectories", this, 0);
savedDirectory = new FileValue("savedDirectory", this);

encryptedConnection = new BoolValue("ecryptedConnection", this, true);
encryptedConnection = new BoolValue("encryptedConnection", this, true);
autoClose = new BoolValue("autoClose", this, false);

annotations = new AnnotationListValue(
Expand Down
180 changes: 132 additions & 48 deletions components/blitz/src/ome/formats/importer/ImportLibrary.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,18 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import loci.common.Location;
import loci.formats.FormatException;
Expand Down Expand Up @@ -83,6 +90,7 @@
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;

import Ice.Current;

Expand Down Expand Up @@ -257,52 +265,90 @@ public void notifyObservers(ImportEvent event)
* @param candidates Hosts information about the files to import.
* @return if the import did not exit because of an error
*/
public boolean importCandidates(ImportConfig config, ImportCandidates candidates)
public boolean importCandidates(final ImportConfig config, ImportCandidates candidates)
{
List<ImportContainer> containers = candidates.getContainers();
if (containers != null) {
int numDone = 0;
for (int index = 0; index < containers.size(); index++) {
ImportContainer ic = containers.get(index);
ImportTarget target = config.getTarget();
if (target != null) {
try {
IObject obj = target.load(store, ic);
if (!(obj instanceof Annotation)) {
ic.setTarget(obj);
} else {
// This is likely a "post-processing" annotation
// so that we don't have to resolve the target
// until later.
ic.getCustomAnnotationList().add((Annotation) obj);
}
} catch (Exception e) {
log.error("Could not load target: {}", target);
throw new RuntimeException("Failed to load target", e);
final int count = containers.size();
ExecutorService filesetThreadPool, uploadThreadPool;
filesetThreadPool = Executors.newFixedThreadPool(Math.min(count, config.parallelFileset.get()));
uploadThreadPool = Executors.newFixedThreadPool(config.parallelUpload.get());
try {
final List<Callable<Boolean>> threads = new ArrayList<>(count);
for (int index = 0; index < count; index++) {
final ImportContainer ic = containers.get(index);
final ImportTarget target = config.getTarget();
if (config.checksumAlgorithm.get() != null) {
ic.setChecksumAlgorithm(config.checksumAlgorithm.get());
}
final ExecutorService uploadThreadPoolFinal = uploadThreadPool;
final int indexFinal = index;
threads.add(new Callable<Boolean>() {
@Override
public Boolean call() {
try {
if (target != null) {
try {
IObject obj = target.load(store, ic);
if (!(obj instanceof Annotation)) {
ic.setTarget(obj);
} else {
// This is likely a "post-processing" annotation
// so that we don't have to resolve the target
// until later.
ic.getCustomAnnotationList().add((Annotation) obj);
}
} catch (Exception e) {
log.error("Could not load target: {}", target);
throw new RuntimeException("Failed to load target", e);
}
}
importImage(ic, uploadThreadPoolFinal, indexFinal);
return true;
} catch (Throwable t) {
String message = "Error on import";
if (t instanceof ServerError) {
final ServerError se = (ServerError) t;
if (StringUtils.isNotBlank(se.message)) {
message += ": " + se.message;
}
}
log.error(message, t);
if (!config.contOnError.get()) {
log.info("Exiting on error");
return false;
} else {
log.info("Continuing after error");
return true;
}
}
}
});
}
if (config.checksumAlgorithm.get() != null) {
ic.setChecksumAlgorithm(config.checksumAlgorithm.get());
final ExecutorCompletionService<Boolean> threadQueue = new ExecutorCompletionService<>(filesetThreadPool);
final List<Future<Boolean>> outcomes = new ArrayList<>(count);
for (final Callable<Boolean> thread : threads) {
outcomes.add(threadQueue.submit(thread));
}

try {
importImage(ic,index,numDone,containers.size());
numDone++;
} catch (Throwable t) {
String message = "Error on import";
if (t instanceof ServerError) {
final ServerError se = (ServerError) t;
if (StringUtils.isNotBlank(se.message)) {
message += ": " + se.message;
for (int index = 0; index < count; index++) {
if (!threadQueue.take().get()) {
return false;
}
}
log.error(message, t);
if (!config.contOnError.get()) {
log.info("Exiting on error");
return false;
} else {
log.info("Continuing after error");
}
} catch (InterruptedException ie) {
log.error("import interrupted", ie);
return false;
} catch (ExecutionException ee) {
log.error("exception should be handled in other thread", ee);
return false;
}
} finally {
if (filesetThreadPool != null) {
filesetThreadPool.shutdownNow();
}
if (uploadThreadPool != null) {
uploadThreadPool.shutdownNow();
}
}
}
Expand Down Expand Up @@ -461,17 +507,29 @@ public String uploadFile(final ImportProcessPrx proc,

}

/**
* Image import with only one file upload thread.
* @see #importImage(ImportContainer, ExecutorService, int)
* @deprecated now used by tests only
*/
@SuppressWarnings("javadoc")
@Deprecated
public List<Pixels> importImage(final ImportContainer container, int index, int numDone, int total) throws Throwable {
final ExecutorService threadPool = Executors.newSingleThreadExecutor();
try {
return importImage(container, threadPool, index);
} finally {
threadPool.shutdown();
}
}

/**
* Perform an image import uploading files if necessary.
* @param container The import container which houses all the configuration
* values and target for the import.
* @param threadPool The pool of threads to use in file upload.
* @param index Index of the import in a set. <code>0</code> is safe if
* this is a singular import.
* @param numDone Number of imports completed in a set. <code>0</code> is
* safe if this is a singular import.
* @param total Total number of imports in a set. <code>1</code> is safe
* if this is a singular import.
* @return List of Pixels that have been imported.
* @throws FormatException If there is a Bio-Formats image file format
* error during import.
Expand All @@ -481,8 +539,7 @@ public String uploadFile(final ImportProcessPrx proc,
* @throws Throwable If there is some other kind of error during import.
* @since OMERO Beta 4.2.1.
*/
public List<Pixels> importImage(final ImportContainer container, int index,
int numDone, int total)
public List<Pixels> importImage(final ImportContainer container, ExecutorService threadPool, int index)
throws FormatException, IOException, Throwable
{
HandlePrx handle;
Expand All @@ -505,20 +562,47 @@ public List<Pixels> importImage(final ImportContainer container, int index,
}
final ImportProcessPrx proc = createImport(container);
final String[] srcFiles = container.getUsedFiles();
final List<String> checksums = new ArrayList<String>();
final byte[] buf = new byte[store.getDefaultBlockSize()];
final ThreadLocal<byte[]> buf = new ThreadLocal<byte[]>() {
@Override
protected byte[] initialValue() {
return new byte[store.getDefaultBlockSize()];
}
};
final TimeEstimator estimator = new ProportionalTimeEstimatorImpl(
container.getUsedFilesTotalSize());
Map<Integer, String> failingChecksums = new HashMap<Integer, String>();

notifyObservers(new ImportEvent.FILESET_UPLOAD_START(
null, index, srcFiles.length, null, null, null));

final List<Callable<Map.Entry<Integer, String>>> threads = new ArrayList<>(srcFiles.length);
for (int i = 0; i < srcFiles.length; i++) {
checksums.add(uploadFile(proc, srcFiles, i, checksumProviderFactory,
estimator, buf));
final int fileIndex = i;
threads.add(new Callable<Map.Entry<Integer, String>>() {
@Override
public Map.Entry<Integer, String> call() throws Exception {
final String checksum = uploadFile(proc, srcFiles, fileIndex, checksumProviderFactory, estimator, buf.get());
return Maps.immutableEntry(fileIndex, checksum);
}});
}

final ExecutorCompletionService<Map.Entry<Integer, String>> threadQueue = new ExecutorCompletionService<>(threadPool);
final List<Future<Map.Entry<Integer, String>>> outcomes = new ArrayList<>(srcFiles.length);
for (final Callable<Map.Entry<Integer, String>> thread : threads) {
outcomes.add(threadQueue.submit(thread));
}
final String[] checksumArray = new String[srcFiles.length];
for (index = 0; index < srcFiles.length; index++) {
try {
final Map.Entry<Integer, String> outcome = threadQueue.take().get();
checksumArray[outcome.getKey()] = outcome.getValue();
} catch (InterruptedException | ExecutionException e) {
for (final Future<Map.Entry<Integer, String>> outcome : outcomes) {
outcome.cancel(true);
}
throw e instanceof ExecutionException ? e.getCause() : e;
}
}
final List<String> checksums = Arrays.asList(checksumArray);
try {
handle = proc.verifyUpload(checksums);
} catch (ChecksumValidationException cve) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
/*
* Copyright (C) 2009-2016 University of Dundee & Open Microscopy Environment.
* Copyright (C) 2009-2018 University of Dundee & Open Microscopy Environment.
* All rights reserved.
*
* Use is subject to license terms supplied in LICENSE.txt
*/

package ome.formats.importer.cli;

import gnu.getopt.Getopt;
Expand Down Expand Up @@ -547,6 +548,8 @@ public static void main(String[] args) throws Exception {
config.sendLogFile.set(false);
config.sendReport.set(false);
config.contOnError.set(false);
config.parallelUpload.set(1);
config.parallelFileset.set(1);
config.debug.set(false);
config.encryptedConnection.set(false);

Expand Down Expand Up @@ -611,6 +614,12 @@ public static void main(String[] args) throws Exception {
LongOpt encryptedConnection =
new LongOpt("encrypted", LongOpt.REQUIRED_ARGUMENT, null, 26);

LongOpt parallelUpload =
new LongOpt("parallel-upload", LongOpt.REQUIRED_ARGUMENT, null, 27);

LongOpt parallelFileset =
new LongOpt("parallel-fileset", LongOpt.REQUIRED_ARGUMENT, null, 28);

// DEPRECATED OPTIONS
LongOpt minutesWaitDeprecated =
new LongOpt("minutes_wait", LongOpt.REQUIRED_ARGUMENT, null, 86);
Expand Down Expand Up @@ -649,6 +658,7 @@ public static void main(String[] args) throws Exception {
exclude, target, noStatsInfo,
noUpgradeCheck, qaBaseURL,
outputFormat, encryptedConnection,
parallelUpload, parallelFileset,
plateName, plateName2,
plateDescription, plateDescription2,
noThumbnailsDeprecated,
Expand Down Expand Up @@ -813,6 +823,18 @@ public static void main(String[] args) throws Exception {
config.encryptedConnection.set(Boolean.valueOf(encryptedArg));
break;
}
case 27: {
String parallelFArg = g.getOptarg();
log.info("Setting parallel upload: {}", parallelFArg);
config.parallelUpload.set(Integer.valueOf(parallelFArg));
break;
}
case 28: {
String parallelUArg = g.getOptarg();
log.info("Setting parallel fileset: {}", parallelUArg);
config.parallelFileset.set(Integer.valueOf(parallelUArg));
break;
}
// ADVANCED END ---------------------------------------------------
// DEPRECATED OPTIONS
case 90:
Expand Down
Loading

0 comments on commit 040837b

Please sign in to comment.