From 9afdce17460152b8f73a070d9468cd744ca492b0 Mon Sep 17 00:00:00 2001 From: Emanuel Dima Date: Thu, 4 Jun 2020 17:27:17 +0200 Subject: [PATCH] replace loop-wait with future tasks --- .../core/DefaultStoragePolicy.java | 19 ++- .../eu/clarin/switchboard/core/FileInfo.java | 9 - .../switchboard/core/FileInfoFuture.java | 29 ++++ .../clarin/switchboard/core/MediaLibrary.java | 159 +++++++----------- .../switchboard/core/StoragePolicy.java | 2 + .../switchboard/resources/DataResource.java | 31 +--- 6 files changed, 118 insertions(+), 131 deletions(-) create mode 100644 backend/src/main/java/eu/clarin/switchboard/core/FileInfoFuture.java diff --git a/backend/src/main/java/eu/clarin/switchboard/core/DefaultStoragePolicy.java b/backend/src/main/java/eu/clarin/switchboard/core/DefaultStoragePolicy.java index 5e7396ec7..d372b91fe 100644 --- a/backend/src/main/java/eu/clarin/switchboard/core/DefaultStoragePolicy.java +++ b/backend/src/main/java/eu/clarin/switchboard/core/DefaultStoragePolicy.java @@ -39,10 +39,14 @@ public Duration getCleanupPeriod() { @Override public void acceptFile(File file) throws StoragePolicyException { if (file.length() > dataStoreConfig.getMaxSize()) { - throw new StoragePolicyException( - "The resource is too large. The maximum allowed data size is " + - humanSize(dataStoreConfig.getMaxSize()) + ".", - StoragePolicyException.Kind.TOO_BIG); + throw tooBig(); + } + } + + @Override + public void acceptSize(long fileSize) throws StoragePolicyException { + if (fileSize > dataStoreConfig.getMaxSize()) { + throw tooBig(); } } @@ -57,6 +61,13 @@ public void acceptProfile(Profile profile) throws StoragePolicyException { } } + private StoragePolicyException tooBig() { + return new StoragePolicyException( + "The resource is too large. The maximum allowed data size is " + + humanSize(getMaxAllowedDataSize()) + ".", + StoragePolicyException.Kind.TOO_BIG); + } + public static String humanSize(long maxSize) { final double K = 1024; if (maxSize < K) { diff --git a/backend/src/main/java/eu/clarin/switchboard/core/FileInfo.java b/backend/src/main/java/eu/clarin/switchboard/core/FileInfo.java index de6d08959..818536d8f 100644 --- a/backend/src/main/java/eu/clarin/switchboard/core/FileInfo.java +++ b/backend/src/main/java/eu/clarin/switchboard/core/FileInfo.java @@ -3,15 +3,12 @@ import eu.clarin.switchboard.profiler.api.Profile; import java.nio.file.Path; -import java.time.Instant; -import java.util.Date; import java.util.List; import java.util.UUID; import java.util.stream.Collectors; public class FileInfo { private final UUID id; - private final Instant creation; private final String filename; // original filename, on disk we use a sanitized form private final Path path; // actual path on disk @@ -29,7 +26,6 @@ public FileInfo(UUID id, String filename, Path path) { this.filename = filename; this.path = path; - this.creation = new Date().toInstant(); this.fileLength = path == null ? -1 : path.toFile().length(); } @@ -37,10 +33,6 @@ public UUID getId() { return id; } - public Instant getCreation() { - return creation; - } - public String getFilename() { return filename; } @@ -88,7 +80,6 @@ public void setProfiles(Profile profile, List secondaryProfiles) { public String toString() { return "FileInfo: " + "\nid=" + id + - "\ncreation=" + creation.toString() + "\nfilename='" + filename + '\'' + "\npath=" + path + "\nfileLength=" + fileLength + diff --git a/backend/src/main/java/eu/clarin/switchboard/core/FileInfoFuture.java b/backend/src/main/java/eu/clarin/switchboard/core/FileInfoFuture.java new file mode 100644 index 000000000..39b2a16b6 --- /dev/null +++ b/backend/src/main/java/eu/clarin/switchboard/core/FileInfoFuture.java @@ -0,0 +1,29 @@ +package eu.clarin.switchboard.core; + +import java.time.Instant; +import java.util.UUID; +import java.util.concurrent.Future; + +public class FileInfoFuture { + private final UUID id; + private final Instant creation; + private final Future future; + + public FileInfoFuture(UUID id, Future future) { + this.id = id; + this.future = future; + this.creation = Instant.now(); + } + + public UUID getId() { + return id; + } + + public Instant getCreation() { + return creation; + } + + public Future getFileInfoFuture() { + return future; + } +} diff --git a/backend/src/main/java/eu/clarin/switchboard/core/MediaLibrary.java b/backend/src/main/java/eu/clarin/switchboard/core/MediaLibrary.java index 62f15868a..d76a5f91a 100644 --- a/backend/src/main/java/eu/clarin/switchboard/core/MediaLibrary.java +++ b/backend/src/main/java/eu/clarin/switchboard/core/MediaLibrary.java @@ -21,10 +21,7 @@ import java.time.Duration; import java.time.Instant; import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * MediaLibrary keeps records about datafiles, identified by uuids. @@ -41,26 +38,7 @@ public class MediaLibrary { private final CloseableHttpClient cachingClient; private final ExecutorService executorService; - Map fileInfoMap = Collections.synchronizedMap(new HashMap<>()); - - public static class FileError { - Instant creation; - Exception exception; - - public FileError(Exception exception) { - this.creation = Instant.now(); - this.exception = exception; - } - - public Instant getCreation() { - return creation; - } - - public Exception getException() { - return exception; - } - } - Map fileInfoAsyncErrorMap = Collections.synchronizedMap(new HashMap<>()); + Map fileInfoFutureMap = Collections.synchronizedMap(new HashMap<>()); public MediaLibrary(DataStore dataStore, Profiler profiler, StoragePolicy storagePolicy, UrlResolverConfig urlResolverConfig, DataStoreConfig dataStoreConfig) { @@ -92,64 +70,63 @@ public MediaLibrary(DataStore dataStore, Profiler profiler, StoragePolicy storag public FileInfo addMedia(String originalUrlOrDoiOrHandle) throws CommonException, ProfilingException { UUID id = UUID.randomUUID(); - return addMedia(id, originalUrlOrDoiOrHandle); + FileInfo fileInfo = addMedia(cachingClient, dataStore, profiler, storagePolicy, id, originalUrlOrDoiOrHandle); + fileInfoFutureMap.put(id, new FileInfoFuture(id, wrap(fileInfo))); + return fileInfo; + } + + public FileInfo addMedia(String filename, InputStream inputStream) throws + StoragePolicyException, StorageException, ProfilingException { + UUID id = UUID.randomUUID(); + FileInfo fileInfo = addMedia(dataStore, profiler, storagePolicy, id, filename, inputStream); + fileInfoFutureMap.put(id, new FileInfoFuture(id, wrap(fileInfo))); + return fileInfo; } public UUID addMediaAsync(String originalUrlOrDoiOrHandle) { UUID id = UUID.randomUUID(); - fileInfoMap.put(id, new FileInfo(id, null, null)); - executorService.submit(() -> { - try { - addMedia(id, originalUrlOrDoiOrHandle); - } catch (Exception exception) { - LOGGER.debug("async error: {}", exception.getMessage()); - fileInfoAsyncErrorMap.put(id, new FileError(exception)); - fileInfoMap.remove(id); - } - }); + Future future = executorService.submit(() -> + addMedia(cachingClient, dataStore, profiler, storagePolicy, id, originalUrlOrDoiOrHandle)); + fileInfoFutureMap.put(id, new FileInfoFuture(id, future)); return id; } - public FileInfo addMedia(String filename, InputStream inputStream) throws - StoragePolicyException, StorageException, ProfilingException { - UUID id = UUID.randomUUID(); - return addMedia(id, filename, inputStream); + private Future wrap(FileInfo fileInfo) { + FutureTask future = new FutureTask<>(() -> fileInfo); + future.run(); + return future; } public UUID addMediaAsync(String filename, InputStream inputStream) { UUID id = UUID.randomUUID(); - fileInfoMap.put(id, new FileInfo(id, null, null)); - executorService.submit(() -> { - try { - addMedia(id, filename, inputStream); - } catch (Exception exception) { - LOGGER.debug("async error: {}", exception.getMessage()); - fileInfoAsyncErrorMap.put(id, new FileError(exception)); - fileInfoMap.remove(id); - } - }); + Future future = executorService.submit(() -> + addMedia(dataStore, profiler, storagePolicy, id, filename, inputStream)); + fileInfoFutureMap.put(id, new FileInfoFuture(id, future)); return id; } - public FileInfo getFileInfo(UUID id) { - return fileInfoMap.get(id); - } - - public Exception getFileInfoAsyncError(UUID id) { - FileError fileError = fileInfoAsyncErrorMap.get(id); - return fileError == null ? null : fileError.getException(); + public FileInfo waitForFileInfo(UUID id) throws Throwable { + FileInfoFuture fif = fileInfoFutureMap.get(id); + Future future = fif == null ? null : fif.getFileInfoFuture(); + if (future == null) { + return null; + } + try { + return future.get(); + } catch (ExecutionException xc) { + LOGGER.debug("rethrow previous async error: {}", xc.getCause().getMessage()); + throw xc.getCause(); + } } - private FileInfo addMedia(UUID id, String originalUrlOrDoiOrHandle) throws CommonException, ProfilingException { + private static FileInfo addMedia(CloseableHttpClient cachingClient, + DataStore dataStore, Profiler profiler, StoragePolicy storagePolicy, + UUID id, String originalUrlOrDoiOrHandle) throws CommonException, ProfilingException { LinkMetadata.LinkInfo linkInfo = LinkMetadata.getLinkData(cachingClient, originalUrlOrDoiOrHandle); try { - if (linkInfo.response.getEntity().getContentLength() > storagePolicy.getMaxAllowedDataSize()) { - throw new StoragePolicyException( - "The resource is too large. The maximum allowed data size is " + - DefaultStoragePolicy.humanSize(storagePolicy.getMaxAllowedDataSize()) + ".", - StoragePolicyException.Kind.TOO_BIG); - } - FileInfo fileInfo = addMedia(id, linkInfo.filename, linkInfo.response.getEntity().getContent()); + storagePolicy.acceptSize(linkInfo.response.getEntity().getContentLength()); + FileInfo fileInfo = addMedia(dataStore, profiler, storagePolicy, + id, linkInfo.filename, linkInfo.response.getEntity().getContent()); fileInfo.setLinksInfo(originalUrlOrDoiOrHandle, linkInfo.downloadLink, linkInfo.redirects); return fileInfo; } catch (IOException xc) { @@ -163,7 +140,8 @@ private FileInfo addMedia(UUID id, String originalUrlOrDoiOrHandle) throws Commo } } - private FileInfo addMedia(UUID id, String filename, InputStream inputStream) throws + private static FileInfo addMedia(DataStore dataStore, Profiler profiler, StoragePolicy storagePolicy, + UUID id, String filename, InputStream inputStream) throws StoragePolicyException, StorageException, ProfilingException { Path path; try { @@ -173,9 +151,9 @@ private FileInfo addMedia(UUID id, String filename, InputStream inputStream) thr } FileInfo fileInfo = new FileInfo(id, filename, path); - File file = path.toFile(); - try { + File file = path.toFile(); + List profileList = profiler.profile(file); if (profileList == null || profileList.isEmpty()) { throw new ProfilingException("null profiling result"); @@ -184,46 +162,37 @@ private FileInfo addMedia(UUID id, String filename, InputStream inputStream) thr profileList.get(0), profileList.subList(1, profileList.size()) ); + + storagePolicy.acceptProfile(fileInfo.getProfile().toProfile()); + + return fileInfo; } catch (IOException xc) { dataStore.delete(id, path); throw new StorageException(xc); - } catch (ProfilingException xc) { - dataStore.delete(id, path); - throw xc; - } - - fileInfoMap.put(id, fileInfo); - - try { - storagePolicy.acceptProfile(fileInfo.getProfile().toProfile()); - } catch (StoragePolicyException xc) { - LOGGER.debug("profile not accepted: " + fileInfo); + } catch (ProfilingException | StoragePolicyException xc) { dataStore.delete(id, path); - fileInfoMap.remove(id); throw xc; } - - return fileInfo; } private void periodicCleanup() { // this runs on its own thread LOGGER.info("start periodic cleanup now"); - for (Iterator iterator = fileInfoMap.values().iterator(); iterator.hasNext(); ) { - FileInfo fi = iterator.next(); - Duration lifetime = Duration.between(fi.getCreation(), Instant.now()); - if (lifetime.compareTo(storagePolicy.getMaxAllowedLifetime()) > 0) { - LOGGER.debug("removing entry: " + fi.getId()); - dataStore.delete(fi.getId(), fi.getPath()); - iterator.remove(); - } - } - - LOGGER.info("start periodic error cleanup now"); - for (Iterator iterator = fileInfoAsyncErrorMap.values().iterator(); iterator.hasNext(); ) { - FileError fe = iterator.next(); - Duration lifetime = Duration.between(fe.getCreation(), Instant.now()); + for (Iterator iterator = fileInfoFutureMap.values().iterator(); iterator.hasNext(); ) { + FileInfoFuture fileInfoFuture = iterator.next(); + Duration lifetime = Duration.between(fileInfoFuture.getCreation(), Instant.now()); if (lifetime.compareTo(storagePolicy.getMaxAllowedLifetime()) > 0) { + LOGGER.debug("removing entry: " + fileInfoFuture.getId()); + try { + Future future = fileInfoFuture.getFileInfoFuture(); + if (!future.isDone()) { + future.cancel(true); + } + FileInfo fileInfo = future.get(1, TimeUnit.SECONDS); + dataStore.delete(fileInfoFuture.getId(), fileInfo.getPath()); + } catch (ExecutionException | InterruptedException | TimeoutException e) { + // ignore 'future' errors, we're only interested in deleting data on disk + } iterator.remove(); } } diff --git a/backend/src/main/java/eu/clarin/switchboard/core/StoragePolicy.java b/backend/src/main/java/eu/clarin/switchboard/core/StoragePolicy.java index 4a8bce2cc..15ab89c9c 100644 --- a/backend/src/main/java/eu/clarin/switchboard/core/StoragePolicy.java +++ b/backend/src/main/java/eu/clarin/switchboard/core/StoragePolicy.java @@ -11,6 +11,8 @@ public interface StoragePolicy { void acceptFile(File file) throws StoragePolicyException; + void acceptSize(long fileSize) throws StoragePolicyException; + void acceptProfile(Profile profile) throws StoragePolicyException; Duration getMaxAllowedLifetime(); diff --git a/backend/src/main/java/eu/clarin/switchboard/resources/DataResource.java b/backend/src/main/java/eu/clarin/switchboard/resources/DataResource.java index b9cf1894f..33371f740 100644 --- a/backend/src/main/java/eu/clarin/switchboard/resources/DataResource.java +++ b/backend/src/main/java/eu/clarin/switchboard/resources/DataResource.java @@ -6,7 +6,6 @@ import eu.clarin.switchboard.core.FileInfo; import eu.clarin.switchboard.core.MediaLibrary; import eu.clarin.switchboard.core.xc.CommonException; -import eu.clarin.switchboard.core.xc.SwitchboardExceptionMapper; import eu.clarin.switchboard.profiler.api.ProfilingException; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.glassfish.jersey.media.multipart.FormDataParam; @@ -35,14 +34,14 @@ public DataResource(MediaLibrary mediaLibrary) { @GET @Path("/{id}") - public Response getFile(@PathParam("id") String idString) { + public Response getFile(@PathParam("id") String idString) throws Throwable { UUID id; try { id = UUID.fromString(idString); } catch (IllegalArgumentException xc) { return Response.status(Response.Status.NOT_FOUND).build(); } - FileInfo fi = mediaLibrary.getFileInfo(id); + FileInfo fi = mediaLibrary.waitForFileInfo(id); if (fi == null) { return Response.status(Response.Status.NOT_FOUND).build(); } @@ -63,7 +62,7 @@ public Response getFile(@PathParam("id") String idString) { @Path("/{id}/info") @Produces(MediaType.APPLICATION_JSON + ";charset=utf-8") public Response getFileInfo(@Context HttpServletRequest request, @PathParam("id") String idString) - throws Exception { + throws Throwable { UUID id; try { id = UUID.fromString(idString); @@ -71,26 +70,12 @@ public Response getFileInfo(@Context HttpServletRequest request, @PathParam("id" return Response.status(Response.Status.NOT_FOUND).build(); } - while (true) { - Exception exception = mediaLibrary.getFileInfoAsyncError(id); - if (exception != null) { - LOGGER.debug("rethrow previous async error: {}", exception.getMessage()); - throw exception; - } - - FileInfo fi = mediaLibrary.getFileInfo(id); - if (fi == null) { - return Response.status(Response.Status.NOT_FOUND).build(); - } - - if (fi.getPath() == null) { - // async file transfer not finished, looping - continue; - } - - // async file transfer has finished - return fileInfoToResponse(request.getRequestURI(), fi); + FileInfo fi = mediaLibrary.waitForFileInfo(id); + if (fi == null) { + return Response.status(Response.Status.NOT_FOUND).build(); } + + return fileInfoToResponse(request.getRequestURI(), fi); } @POST