Skip to content

Commit

Permalink
replace loop-wait with future tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
emanueldima authored and andmor- committed Jun 4, 2020
1 parent 024472b commit 9afdce1
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,14 @@ public Duration getCleanupPeriod() {
@Override
public void acceptFile(File file) throws StoragePolicyException {
if (file.length() > dataStoreConfig.getMaxSize()) {
throw new StoragePolicyException(
"The resource is too large. The maximum allowed data size is " +
humanSize(dataStoreConfig.getMaxSize()) + ".",
StoragePolicyException.Kind.TOO_BIG);
throw tooBig();
}
}

@Override
public void acceptSize(long fileSize) throws StoragePolicyException {
if (fileSize > dataStoreConfig.getMaxSize()) {
throw tooBig();
}
}

Expand All @@ -57,6 +61,13 @@ public void acceptProfile(Profile profile) throws StoragePolicyException {
}
}

private StoragePolicyException tooBig() {
return new StoragePolicyException(
"The resource is too large. The maximum allowed data size is " +
humanSize(getMaxAllowedDataSize()) + ".",
StoragePolicyException.Kind.TOO_BIG);
}

public static String humanSize(long maxSize) {
final double K = 1024;
if (maxSize < K) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@
import eu.clarin.switchboard.profiler.api.Profile;

import java.nio.file.Path;
import java.time.Instant;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

public class FileInfo {
private final UUID id;
private final Instant creation;

private final String filename; // original filename, on disk we use a sanitized form
private final Path path; // actual path on disk
Expand All @@ -29,18 +26,13 @@ public FileInfo(UUID id, String filename, Path path) {
this.filename = filename;
this.path = path;

this.creation = new Date().toInstant();
this.fileLength = path == null ? -1 : path.toFile().length();
}

public UUID getId() {
return id;
}

public Instant getCreation() {
return creation;
}

public String getFilename() {
return filename;
}
Expand Down Expand Up @@ -88,7 +80,6 @@ public void setProfiles(Profile profile, List<Profile> secondaryProfiles) {
public String toString() {
return "FileInfo: " +
"\nid=" + id +
"\ncreation=" + creation.toString() +
"\nfilename='" + filename + '\'' +
"\npath=" + path +
"\nfileLength=" + fileLength +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package eu.clarin.switchboard.core;

import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.Future;

public class FileInfoFuture {
private final UUID id;
private final Instant creation;
private final Future<FileInfo> future;

public FileInfoFuture(UUID id, Future<FileInfo> future) {
this.id = id;
this.future = future;
this.creation = Instant.now();
}

public UUID getId() {
return id;
}

public Instant getCreation() {
return creation;
}

public Future<FileInfo> getFileInfoFuture() {
return future;
}
}
159 changes: 64 additions & 95 deletions backend/src/main/java/eu/clarin/switchboard/core/MediaLibrary.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

/**
* MediaLibrary keeps records about datafiles, identified by uuids.
Expand All @@ -41,26 +38,7 @@ public class MediaLibrary {
private final CloseableHttpClient cachingClient;
private final ExecutorService executorService;

Map<UUID, FileInfo> fileInfoMap = Collections.synchronizedMap(new HashMap<>());

public static class FileError {
Instant creation;
Exception exception;

public FileError(Exception exception) {
this.creation = Instant.now();
this.exception = exception;
}

public Instant getCreation() {
return creation;
}

public Exception getException() {
return exception;
}
}
Map<UUID, FileError> fileInfoAsyncErrorMap = Collections.synchronizedMap(new HashMap<>());
Map<UUID, FileInfoFuture> fileInfoFutureMap = Collections.synchronizedMap(new HashMap<>());

public MediaLibrary(DataStore dataStore, Profiler profiler, StoragePolicy storagePolicy,
UrlResolverConfig urlResolverConfig, DataStoreConfig dataStoreConfig) {
Expand Down Expand Up @@ -92,64 +70,63 @@ public MediaLibrary(DataStore dataStore, Profiler profiler, StoragePolicy storag

public FileInfo addMedia(String originalUrlOrDoiOrHandle) throws CommonException, ProfilingException {
UUID id = UUID.randomUUID();
return addMedia(id, originalUrlOrDoiOrHandle);
FileInfo fileInfo = addMedia(cachingClient, dataStore, profiler, storagePolicy, id, originalUrlOrDoiOrHandle);
fileInfoFutureMap.put(id, new FileInfoFuture(id, wrap(fileInfo)));
return fileInfo;
}

public FileInfo addMedia(String filename, InputStream inputStream) throws
StoragePolicyException, StorageException, ProfilingException {
UUID id = UUID.randomUUID();
FileInfo fileInfo = addMedia(dataStore, profiler, storagePolicy, id, filename, inputStream);
fileInfoFutureMap.put(id, new FileInfoFuture(id, wrap(fileInfo)));
return fileInfo;
}

public UUID addMediaAsync(String originalUrlOrDoiOrHandle) {
UUID id = UUID.randomUUID();
fileInfoMap.put(id, new FileInfo(id, null, null));
executorService.submit(() -> {
try {
addMedia(id, originalUrlOrDoiOrHandle);
} catch (Exception exception) {
LOGGER.debug("async error: {}", exception.getMessage());
fileInfoAsyncErrorMap.put(id, new FileError(exception));
fileInfoMap.remove(id);
}
});
Future<FileInfo> future = executorService.submit(() ->
addMedia(cachingClient, dataStore, profiler, storagePolicy, id, originalUrlOrDoiOrHandle));
fileInfoFutureMap.put(id, new FileInfoFuture(id, future));
return id;
}

public FileInfo addMedia(String filename, InputStream inputStream) throws
StoragePolicyException, StorageException, ProfilingException {
UUID id = UUID.randomUUID();
return addMedia(id, filename, inputStream);
private Future<FileInfo> wrap(FileInfo fileInfo) {
FutureTask<FileInfo> future = new FutureTask<>(() -> fileInfo);
future.run();
return future;
}

public UUID addMediaAsync(String filename, InputStream inputStream) {
UUID id = UUID.randomUUID();
fileInfoMap.put(id, new FileInfo(id, null, null));
executorService.submit(() -> {
try {
addMedia(id, filename, inputStream);
} catch (Exception exception) {
LOGGER.debug("async error: {}", exception.getMessage());
fileInfoAsyncErrorMap.put(id, new FileError(exception));
fileInfoMap.remove(id);
}
});
Future<FileInfo> future = executorService.submit(() ->
addMedia(dataStore, profiler, storagePolicy, id, filename, inputStream));
fileInfoFutureMap.put(id, new FileInfoFuture(id, future));
return id;
}

public FileInfo getFileInfo(UUID id) {
return fileInfoMap.get(id);
}

public Exception getFileInfoAsyncError(UUID id) {
FileError fileError = fileInfoAsyncErrorMap.get(id);
return fileError == null ? null : fileError.getException();
public FileInfo waitForFileInfo(UUID id) throws Throwable {
FileInfoFuture fif = fileInfoFutureMap.get(id);
Future<FileInfo> future = fif == null ? null : fif.getFileInfoFuture();
if (future == null) {
return null;
}
try {
return future.get();
} catch (ExecutionException xc) {
LOGGER.debug("rethrow previous async error: {}", xc.getCause().getMessage());
throw xc.getCause();
}
}

private FileInfo addMedia(UUID id, String originalUrlOrDoiOrHandle) throws CommonException, ProfilingException {
private static FileInfo addMedia(CloseableHttpClient cachingClient,
DataStore dataStore, Profiler profiler, StoragePolicy storagePolicy,
UUID id, String originalUrlOrDoiOrHandle) throws CommonException, ProfilingException {
LinkMetadata.LinkInfo linkInfo = LinkMetadata.getLinkData(cachingClient, originalUrlOrDoiOrHandle);
try {
if (linkInfo.response.getEntity().getContentLength() > storagePolicy.getMaxAllowedDataSize()) {
throw new StoragePolicyException(
"The resource is too large. The maximum allowed data size is " +
DefaultStoragePolicy.humanSize(storagePolicy.getMaxAllowedDataSize()) + ".",
StoragePolicyException.Kind.TOO_BIG);
}
FileInfo fileInfo = addMedia(id, linkInfo.filename, linkInfo.response.getEntity().getContent());
storagePolicy.acceptSize(linkInfo.response.getEntity().getContentLength());
FileInfo fileInfo = addMedia(dataStore, profiler, storagePolicy,
id, linkInfo.filename, linkInfo.response.getEntity().getContent());
fileInfo.setLinksInfo(originalUrlOrDoiOrHandle, linkInfo.downloadLink, linkInfo.redirects);
return fileInfo;
} catch (IOException xc) {
Expand All @@ -163,7 +140,8 @@ private FileInfo addMedia(UUID id, String originalUrlOrDoiOrHandle) throws Commo
}
}

private FileInfo addMedia(UUID id, String filename, InputStream inputStream) throws
private static FileInfo addMedia(DataStore dataStore, Profiler profiler, StoragePolicy storagePolicy,
UUID id, String filename, InputStream inputStream) throws
StoragePolicyException, StorageException, ProfilingException {
Path path;
try {
Expand All @@ -173,9 +151,9 @@ private FileInfo addMedia(UUID id, String filename, InputStream inputStream) thr
}

FileInfo fileInfo = new FileInfo(id, filename, path);
File file = path.toFile();

try {
File file = path.toFile();

List<Profile> profileList = profiler.profile(file);
if (profileList == null || profileList.isEmpty()) {
throw new ProfilingException("null profiling result");
Expand All @@ -184,46 +162,37 @@ private FileInfo addMedia(UUID id, String filename, InputStream inputStream) thr
profileList.get(0),
profileList.subList(1, profileList.size())
);

storagePolicy.acceptProfile(fileInfo.getProfile().toProfile());

return fileInfo;
} catch (IOException xc) {
dataStore.delete(id, path);
throw new StorageException(xc);
} catch (ProfilingException xc) {
dataStore.delete(id, path);
throw xc;
}

fileInfoMap.put(id, fileInfo);

try {
storagePolicy.acceptProfile(fileInfo.getProfile().toProfile());
} catch (StoragePolicyException xc) {
LOGGER.debug("profile not accepted: " + fileInfo);
} catch (ProfilingException | StoragePolicyException xc) {
dataStore.delete(id, path);
fileInfoMap.remove(id);
throw xc;
}

return fileInfo;
}

private void periodicCleanup() {
// this runs on its own thread
LOGGER.info("start periodic cleanup now");
for (Iterator<FileInfo> iterator = fileInfoMap.values().iterator(); iterator.hasNext(); ) {
FileInfo fi = iterator.next();
Duration lifetime = Duration.between(fi.getCreation(), Instant.now());
if (lifetime.compareTo(storagePolicy.getMaxAllowedLifetime()) > 0) {
LOGGER.debug("removing entry: " + fi.getId());
dataStore.delete(fi.getId(), fi.getPath());
iterator.remove();
}
}

LOGGER.info("start periodic error cleanup now");
for (Iterator<FileError> iterator = fileInfoAsyncErrorMap.values().iterator(); iterator.hasNext(); ) {
FileError fe = iterator.next();
Duration lifetime = Duration.between(fe.getCreation(), Instant.now());
for (Iterator<FileInfoFuture> iterator = fileInfoFutureMap.values().iterator(); iterator.hasNext(); ) {
FileInfoFuture fileInfoFuture = iterator.next();
Duration lifetime = Duration.between(fileInfoFuture.getCreation(), Instant.now());
if (lifetime.compareTo(storagePolicy.getMaxAllowedLifetime()) > 0) {
LOGGER.debug("removing entry: " + fileInfoFuture.getId());
try {
Future<FileInfo> future = fileInfoFuture.getFileInfoFuture();
if (!future.isDone()) {
future.cancel(true);
}
FileInfo fileInfo = future.get(1, TimeUnit.SECONDS);
dataStore.delete(fileInfoFuture.getId(), fileInfo.getPath());
} catch (ExecutionException | InterruptedException | TimeoutException e) {
// ignore 'future' errors, we're only interested in deleting data on disk
}
iterator.remove();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ public interface StoragePolicy {

void acceptFile(File file) throws StoragePolicyException;

void acceptSize(long fileSize) throws StoragePolicyException;

void acceptProfile(Profile profile) throws StoragePolicyException;

Duration getMaxAllowedLifetime();
Expand Down
Loading

0 comments on commit 9afdce1

Please sign in to comment.