diff --git a/src/main/java/com/conveyal/analysis/components/BackendComponents.java b/src/main/java/com/conveyal/analysis/components/BackendComponents.java index 9a7270a1b..dafb1b57e 100644 --- a/src/main/java/com/conveyal/analysis/components/BackendComponents.java +++ b/src/main/java/com/conveyal/analysis/components/BackendComponents.java @@ -86,7 +86,7 @@ public List standardHttpControllers () { new GtfsController(gtfsCache), new BundleController(this), new OpportunityDatasetController(fileStorage, taskScheduler, censusExtractor, database), - new RegionalAnalysisController(broker, fileStorage), + new RegionalAnalysisController(broker, fileStorage, taskScheduler), new AggregationAreaController(fileStorage, database, taskScheduler), // This broker controller registers at least one handler at URL paths beginning with /internal, which // is exempted from authentication and authorization, but should be hidden from the world diff --git a/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java b/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java index 5cde0f9bc..82806ad66 100644 --- a/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java +++ b/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java @@ -3,6 +3,7 @@ import com.conveyal.analysis.AnalysisServerException; import com.conveyal.analysis.SelectingGridReducer; import com.conveyal.analysis.UserPermissions; +import com.conveyal.analysis.components.TaskScheduler; import com.conveyal.analysis.components.broker.Broker; import com.conveyal.analysis.components.broker.JobStatus; import com.conveyal.analysis.models.AnalysisRequest; @@ -11,6 +12,7 @@ import com.conveyal.analysis.models.RegionalAnalysis; import com.conveyal.analysis.persistence.Persistence; import com.conveyal.analysis.results.CsvResultType; +import com.conveyal.analysis.util.HttpStatus; import com.conveyal.analysis.util.JsonUtil; import com.conveyal.file.FileStorage; import com.conveyal.file.FileStorageFormat; @@ -22,6 +24,7 @@ import com.conveyal.r5.analyst.PointSet; import com.conveyal.r5.analyst.PointSetCache; import com.conveyal.r5.analyst.cluster.RegionalTask; +import com.conveyal.r5.analyst.progress.Task; import com.google.common.primitives.Ints; import com.mongodb.QueryBuilder; import gnu.trove.list.array.TIntArrayList; @@ -36,6 +39,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.URI; import java.nio.file.FileSystem; import java.nio.file.FileSystems; @@ -45,9 +49,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.zip.GZIPOutputStream; import static com.conveyal.analysis.util.JsonUtil.toJson; @@ -60,6 +67,7 @@ import static com.google.common.base.Preconditions.checkState; import static org.eclipse.jetty.http.MimeTypes.Type.APPLICATION_JSON; import static org.eclipse.jetty.http.MimeTypes.Type.TEXT_HTML; +import static org.eclipse.jetty.http.MimeTypes.Type.TEXT_PLAIN; /** * Spark HTTP handler methods that allow launching new regional analyses, as well as deleting them and fetching @@ -80,10 +88,12 @@ public class RegionalAnalysisController implements HttpController { private final Broker broker; private final FileStorage fileStorage; + private final TaskScheduler taskScheduler; - public RegionalAnalysisController (Broker broker, FileStorage fileStorage) { + public RegionalAnalysisController (Broker broker, FileStorage fileStorage, TaskScheduler taskScheduler) { this.broker = broker; this.fileStorage = fileStorage; + this.taskScheduler = taskScheduler; } private Collection getRegionalAnalysesForRegion(String regionId, UserPermissions userPermissions) { @@ -254,8 +264,9 @@ private HumanKey getSingleCutoffGrid ( grid.writeGeotiff(fos); break; } - + LOG.debug("Finished deriving single-cutoff grid {}. Transferring to storage.", singleCutoffKey); fileStorage.moveIntoStorage(singleCutoffFileStorageKey, localFile); + LOG.debug("Finished transferring single-cutoff grid {} to storage.", singleCutoffKey); } String analysisHumanName = humanNameForEntity(analysis); String destinationHumanName = humanNameForEntity(destinations); @@ -266,6 +277,10 @@ private HumanKey getSingleCutoffGrid ( return new HumanKey(singleCutoffFileStorageKey, resultHumanFilename); } + // Prevent multiple requests from creating the same files in parallel. + // This could potentially be integrated into FileStorage with enum return values or an additional boolean method. + private Set filesBeingPrepared = Collections.synchronizedSet(new HashSet<>()); + private Object getAllRegionalResults (Request req, Response res) throws IOException { final String regionalAnalysisId = req.params("_id"); final UserPermissions userPermissions = UserPermissions.from(req); @@ -277,39 +292,61 @@ private Object getAllRegionalResults (Request req, Response res) throws IOExcept throw AnalysisServerException.badRequest("Batch result download only available for gridded origins."); } FileStorageKey zippedResultsKey = new FileStorageKey(RESULTS, analysis._id + "_ALL.zip"); - if (!fileStorage.exists(zippedResultsKey)) { - // Iterate over all dest, cutoff, percentile combinations and generate one geotiff grid output for each one. - List humanKeys = new ArrayList<>(); - for (String destinationPointSetId : analysis.destinationPointSetIds) { - OpportunityDataset destinations = getDestinations(destinationPointSetId, userPermissions); - for (int cutoffMinutes : analysis.cutoffsMinutes) { - for (int percentile : analysis.travelTimePercentiles) { - HumanKey gridKey = getSingleCutoffGrid( - analysis, destinations, cutoffMinutes, percentile, FileStorageFormat.GEOTIFF - ); - humanKeys.add(gridKey); + if (fileStorage.exists(zippedResultsKey)) { + res.type(APPLICATION_JSON.asString()); + String analysisHumanName = humanNameForEntity(analysis); + return fileStorage.getJsonUrl(zippedResultsKey, analysisHumanName, "zip"); + } + if (filesBeingPrepared.contains(zippedResultsKey.path)) { + res.type(TEXT_PLAIN.asString()); + res.status(HttpStatus.ACCEPTED_202); + return "Geotiff zip is already being prepared in the background."; + } + // File did not exist. Create it in the background and ask caller to request it later. + filesBeingPrepared.add(zippedResultsKey.path); + Task task = Task.create("Zip all geotiffs for regional analysis " + analysis.name) + .forUser(userPermissions) + .withAction(progressListener -> { + int nSteps = analysis.destinationPointSetIds.length * analysis.cutoffsMinutes.length * + analysis.travelTimePercentiles.length * 2 + 1; + progressListener.beginTask("Creating and archiving geotiffs...", nSteps); + // Iterate over all dest, cutoff, percentile combinations and generate one geotiff for each combination. + List humanKeys = new ArrayList<>(); + for (String destinationPointSetId : analysis.destinationPointSetIds) { + OpportunityDataset destinations = getDestinations(destinationPointSetId, userPermissions); + for (int cutoffMinutes : analysis.cutoffsMinutes) { + for (int percentile : analysis.travelTimePercentiles) { + HumanKey gridKey = getSingleCutoffGrid( + analysis, destinations, cutoffMinutes, percentile, FileStorageFormat.GEOTIFF + ); + humanKeys.add(gridKey); + progressListener.increment(); + } } } - } - File tempZipFile = File.createTempFile("regional", ".zip"); - // Zipfs can't open existing empty files, the file has to not exist. FIXME: Non-dangerous race condition - // Examining ZipFileSystemProvider reveals a "useTempFile" env parameter, but this is for the individual entries. - // May be better to just use zipOutputStream which would also allow gzip - zip CSV conversion. - tempZipFile.delete(); - Map env = Map.of("create", "true"); - URI uri = URI.create("jar:file:" + tempZipFile.getAbsolutePath()); - try (FileSystem zipFilesystem = FileSystems.newFileSystem(uri, env)) { - for (HumanKey key : humanKeys) { - Path storagePath = fileStorage.getFile(key.storageKey).toPath(); - Path zipPath = zipFilesystem.getPath(key.humanName); - Files.copy(storagePath, zipPath, StandardCopyOption.REPLACE_EXISTING); + File tempZipFile = File.createTempFile("regional", ".zip"); + // Zipfs can't open existing empty files, the file has to not exist. FIXME: Non-dangerous race condition + // Examining ZipFileSystemProvider reveals a "useTempFile" env parameter, but this is for the individual + // entries. May be better to just use zipOutputStream which would also allow gzip - zip CSV conversion. + tempZipFile.delete(); + Map env = Map.of("create", "true"); + URI uri = URI.create("jar:file:" + tempZipFile.getAbsolutePath()); + try (FileSystem zipFilesystem = FileSystems.newFileSystem(uri, env)) { + for (HumanKey key : humanKeys) { + Path storagePath = fileStorage.getFile(key.storageKey).toPath(); + Path zipPath = zipFilesystem.getPath(key.humanName); + Files.copy(storagePath, zipPath, StandardCopyOption.REPLACE_EXISTING); + progressListener.increment(); + } } - } - fileStorage.moveIntoStorage(zippedResultsKey, tempZipFile); - } - res.type(APPLICATION_JSON.asString()); - String analysisHumanName = humanNameForEntity(analysis); - return fileStorage.getJsonUrl(zippedResultsKey, analysisHumanName, "zip"); + fileStorage.moveIntoStorage(zippedResultsKey, tempZipFile); + progressListener.increment(); + filesBeingPrepared.remove(zippedResultsKey.path); + }); + taskScheduler.enqueue(task); + res.type(TEXT_PLAIN.asString()); + res.status(HttpStatus.ACCEPTED_202); + return "Building geotiff zip in background."; } /** diff --git a/src/main/java/com/conveyal/r5/analyst/progress/Task.java b/src/main/java/com/conveyal/r5/analyst/progress/Task.java index 49990af7a..a072d234d 100644 --- a/src/main/java/com/conveyal/r5/analyst/progress/Task.java +++ b/src/main/java/com/conveyal/r5/analyst/progress/Task.java @@ -162,7 +162,7 @@ protected void bubbleUpProgress() { } /** - * Check that all necesary fields have been set before enqueueing for execution, and check any invariants. + * Check that all necessary fields have been set before enqueueing for execution, and check any invariants. */ public void validate () { if (this.user == null) {