Skip to content

Commit

Permalink
Bulk unindexing for IndexerInterface (#598)
Browse files Browse the repository at this point in the history
* [sdk] Add IndexerInterface::unindex(Collection<URI>)

- for unindexing in bulk
- clarify that both unindex methods are synchronous,
  unlike the indexing ones

* [sdk] Revamp IndexerInterface documentation

- adjust the content to align with good practices
  (see also https://bioinformatics-ua.github.io/dicoogle-learning-pack/docs/query_index/)

* [sdk] rethink bulk unindexing to be more informative

- add `UnindexReport` class and nested classes
   - for containing errors which may occur in bulk unindexing
- change `IndexerInterface#unindex(Collection<URI>)`
   - returns `UnindexReport`
   - can throw `IOException`

* [sdk] reiterate on IndexerInterface batch unindex

- make it asynchronous: returns a `Task` like in `index`
- add second parameter for keeping track of progress

* [sdk] format UnindexReport

* [sdk] Improve bulk IndexerInterface#unindex

- clarify that it returns a task
- remove unused import

* [sdk] Add UnindexReport#errorCount

* Add bulk unindexing to plugin controller

- can only handle one indexer at a time,
  but other than that it works

* Tweak PluginController

- remove deprecated method call #handles,
  check scheme instead

* Update unindex servlet to use bulk unindexing where appropriate

* [core] Dispatch batch-unindex tasks

* [sdk] Reiterate on the UnindexReport API

- record a collection of URIs in each unindex failure

* [sdk] Tweak UnindexReport interface and fix error file count

- provide clearer methods to collect the counts
  of files which were not unindexed successfully
  • Loading branch information
Enet4 committed Aug 27, 2024
1 parent 2ae9401 commit 7af44ec
Show file tree
Hide file tree
Showing 4 changed files with 338 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import pt.ua.dicoogle.plugins.webui.WebUIPluginManager;
import pt.ua.dicoogle.sdk.*;
import pt.ua.dicoogle.sdk.datastructs.Report;
import pt.ua.dicoogle.sdk.datastructs.UnindexReport;
import pt.ua.dicoogle.sdk.datastructs.SearchResult;
import pt.ua.dicoogle.sdk.datastructs.dim.DimLevel;
import pt.ua.dicoogle.sdk.settings.ConfigurationHolder;
Expand All @@ -45,6 +46,7 @@
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.zip.ZipFile;

Expand Down Expand Up @@ -755,7 +757,43 @@ public void unindex(URI path, Collection<String> indexProviders) {
}
}

/** Issue an unindexation procedure to the given indexers.
/** Issue the removal of indexed entries in bulk.
*
* @param indexProvider the name of the indexer
* @param items a collections of item identifiers to unindex
* @param progressCallback an optional function (can be `null`),
* called for every batch of items successfully unindexed
* to indicate early progress
* and inform consumers that
* it is safe to remove or exclude the unindexed item
* @return an asynchronous task object returning
* a report containing which files were not unindexed,
* and whether some of them were not found in the database
* @throws IOException
*/
public Task<UnindexReport> unindex(String indexProvider, Collection<URI> items, Consumer<Collection<URI>> progressCallback) throws IOException {
logger.info("Starting unindexing procedure for {} items", items.size());

IndexerInterface indexer = null;
if (indexProvider != null) {
indexer = this.getIndexerByName(indexProvider, true);
}
if (indexer == null) {
indexer = this.getIndexingPlugins(true).iterator().next();
}
Task<UnindexReport> task = indexer.unindex(items, progressCallback);
if (task != null) {
final String taskUniqueID = UUID.randomUUID().toString();
task.setName(String.format("[%s]unindex", indexer.getName()));
task.onCompletion(() -> {
logger.info("Unindexing task [{}] complete", taskUniqueID);
});
taskManager.dispatch(task);
}
return task;
}

/** Issue an unindexing procedure to the given indexers.
*
* @param path the URI of the directory or file to unindex
* @param indexers a collection of providers
Expand All @@ -776,7 +814,7 @@ public void remove(URI uri) {
}

public void doRemove(URI uri, StorageInterface si) {
if (si.handles(uri)) {
if (Objects.equals(uri.getScheme(), si.getScheme())) {
si.remove(uri);
} else {
logger.warn("Storage Plugin does not handle URI: {},{}", uri, si);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import javax.servlet.ServletException;
Expand All @@ -42,6 +43,7 @@
import pt.ua.dicoogle.plugins.PluginController;
import pt.ua.dicoogle.sdk.QueryInterface;
import pt.ua.dicoogle.sdk.datastructs.SearchResult;
import pt.ua.dicoogle.sdk.datastructs.UnindexReport;
import pt.ua.dicoogle.sdk.task.JointQueryTask;
import pt.ua.dicoogle.sdk.task.Task;

Expand Down Expand Up @@ -81,26 +83,50 @@ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws S
"No arguments provided; must include either one of `uri`, `SOPInstanceUID`, `SeriesInstanceUID` or `StudyInstanceUID`");
return;
}

PluginController pc = PluginController.getInstance();

long indexed = 0;
long failed = 0;
long notfound = 0;

Collection<String> uris = resolveURIs(paramUri, paramSop, paramSeries, paramStudy);
Collection<URI> uris = resolveURIs(paramUri, paramSop, paramSeries, paramStudy);

// unindex
for (String strUri : uris) {
try {
URI uri = new URI(strUri);
// if only one entry, do it inline
if (uris.size() <= 1) {
for (URI uri : uris) {
try {
PluginController.getInstance().unindex(uri, providers);
pc.unindex(uri, providers);
indexed += 1;
} catch (RuntimeException ex) {
logger.error("Failed to unindex {}", uri, ex);
failed += 1;
}
} catch (URISyntaxException ex) {
logger.warn("Received bad URI", ex);
failed += 1;
}

} else {
// if many, use bulk unindexing
List<Task<UnindexReport>> tasks = new ArrayList<>();

if (providers == null) {
providers = pc.getIndexingPlugins(true).stream()
.map(p -> p.getName())
.collect(Collectors.toList());
}
for (String indexProvider: providers) {
tasks.add(pc.unindex(indexProvider, uris, null));
}

int i = 0;
for (Task<UnindexReport> task: tasks) {
try {
UnindexReport report = task.get();
indexed = uris.size() - report.notUnindexedFileCount();
failed = report.failedFileCount();
notfound = report.getNotFound().size();
} catch (Exception ex) {
logger.error("Task to unindex items in {} failed", providers.get(i), ex);
}
}
}

Expand All @@ -109,15 +135,18 @@ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws S
JSONObject obj = new JSONObject();
obj.put("indexed", indexed);
obj.put("failed", failed);
obj.put("notFound", notfound);
resp.setStatus(200);
resp.getWriter().write(obj.toString());
}

/// Convert the given parameters into a list of URIs
private static Collection<String> resolveURIs(String[] paramUri, String[] paramSop, String[] paramSeries,
private static Collection<URI> resolveURIs(String[] paramUri, String[] paramSop, String[] paramSeries,
String[] paramStudy) {
if (paramUri != null) {
return Arrays.asList(paramUri);
return Stream.of(paramUri)
.map(URI::create)
.collect(Collectors.toList());
}
String attribute = null;
if (paramSop != null) {
Expand All @@ -142,19 +171,19 @@ public void onCompletion() {}
};
try {
return StreamSupport.stream(PluginController.getInstance()
.queryAll(holder, dcmAttribute + ":" + uid).get().spliterator(), false);
.queryAll(holder, dcmAttribute + ":\"" + uid + '"').get().spliterator(), false);
} catch (InterruptedException | ExecutionException ex) {
throw new RuntimeException(ex);
}
}).map(r -> r.getURI().toString()).collect(Collectors.toList());
}).map(r -> r.getURI()).collect(Collectors.toList());

}
String dicomProvider = dicomProviders.iterator().next();
return Arrays.stream(paramSop).flatMap(uid -> {
// translate to URIs
QueryInterface dicom = PluginController.getInstance().getQueryProviderByName(dicomProvider, false);

return StreamSupport.stream(dicom.query(dcmAttribute + ":" + uid).spliterator(), false);
}).map(r -> r.getURI().toString()).collect(Collectors.toList());
return StreamSupport.stream(dicom.query(dcmAttribute + ":\"" + uid + '"').spliterator(), false);
}).map(r -> r.getURI()).collect(Collectors.toList());
}
}
107 changes: 94 additions & 13 deletions sdk/src/main/java/pt/ua/dicoogle/sdk/IndexerInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,47 +18,73 @@
*/
package pt.ua.dicoogle.sdk;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;

import pt.ua.dicoogle.sdk.datastructs.Report;
import pt.ua.dicoogle.sdk.datastructs.UnindexReport;
import pt.ua.dicoogle.sdk.datastructs.UnindexReport.FailedUnindex;
import pt.ua.dicoogle.sdk.task.Task;

/**
* Index Interface Plugin. Indexers analyze documents for performing queries. They may index
* documents by DICOM metadata for instance, but other document processing procedures may be involved.
* Indexing plugin interface.
*
* Indexers analyze and record documents for future retrieval.
* They are primarily designed to index DICOM meta-data,
* which in that case they are accompanied by a query plugin,
* and both plugins are called <em>DIM providers</em>.
* However, indexers are not restricted to processing DICOM files,
* or to retrieving and indexing meta-data.
*
* @author Luís A. Bastião Silva <bastiao@ua.pt>
* @author Luís A. Bastião Silva <bastiao@bmd-software.com>
* @author Frederico Valente <[email protected]>
*/
public interface IndexerInterface extends DicooglePlugin {

/**
* Indexes the file path to the database. Indexation procedures are asynchronous, and will return
* Indexes the file path to the database. Indexing procedures are asynchronous, and will return
* immediately after the call. The outcome is a report that can be retrieved from the given task
* as a future.
*
* @param file directory or file to index
* @return a representation of the asynchronous indexation task
* @return a representation of the asynchronous indexing task
*/
public Task<Report> index(StorageInputStream file, Object... parameters);

/**
* Indexes multiple file paths to the database. Indexation procedures are asynchronous, and will return
* Indexes multiple file paths to the database. Indexing procedures are asynchronous, and will return
* immediately after the call. The outcomes are aggregated into a single report and can be retrieved from
* the given task as a future.
*
* @param files a collection of directories and/or files to index
* @return a representation of the asynchronous indexation task
* @return a representation of the asynchronous indexing task
*/
public Task<Report> index(Iterable<StorageInputStream> files, Object... parameters);


/**
* Checks whether the file in the given path can be indexed by this indexer. The indexer should verify if
* the file holds compatible content (e.g. a DICOM file). If this method returns false, the file will not
* be indexed.
*
* Checks whether the file in the given path can be indexed by this indexer.
*
* The method should return <code>false</code> <em>if and only if</em>
* it is sure that the file cannot be indexed,
* by observation of its URI.
* This method exists in order to filter out files
* that are obviously incompatible for the indexer.
* However, there are situations where this is not reliable,
* since the storage is free to establish its own file naming rules,
* and that can affect the file extension.
* In case of doubt, it is recommended to leave the default implementation,
* which returns true unconditionally.
* Attempts to read invalid files can instead
* be handled gracefully by the indexer by capturing exceptions.
*
* @param path a URI to the file to check
* @return whether the indexer can handle the file at the given path
* @return whether the item at the given URI path can be fed to this indexer
*/
public default boolean handles(URI path) {
return true;
Expand All @@ -67,8 +93,63 @@ public default boolean handles(URI path) {
/**
* Removes the indexed file at the given path from the database.
*
* Unlike the other indexing tasks,
* this operation is synchronous
* and will only return when the operation is done.
*
* @param path the URI of the document
* @return whether it was successfully deleted from the database
*/
public boolean unindex(URI path);

/**
* Removes indexed files from the database in bulk.
*
* The default implementation unindexes each item one by one
* in a non-specified order via {@linkplain #unindex(URI)},
* but indexers may implement this as
* one or more individual operations in batch,
* thus becoming faster than unindexing each item individually.
*
* Like {@linkplain index},
* this operation is asynchronous.
* One can keep track of the unindexing task's progress
* by passing a callback function as the second parameter.
*
* @param uris the URIs of the items to unindex
* @param progressCallback an optional function (can be `null`),
* called for every batch of items successfully unindexed
* to indicate early progress
* and inform consumers that
* it is safe to remove or exclude the unindexed item
* @return an asynchronous task object returning
* a report containing which files were not unindexed,
* and whether some of them were not found in the database
* @throws IOException if an error occurred
* before the unindexing operation could start,
* such as when failing to access or open the database
*/
public default Task<UnindexReport> unindex(Collection<URI> uris, Consumer<Collection<URI>> progressCallback)
throws IOException {
Objects.requireNonNull(uris);
return new Task<>(() -> {
List<FailedUnindex> failures = new ArrayList<>();
for (URI uri : uris) {
try {
if (unindex(uri)) {
// unindexed successfully
if (progressCallback != null) {
progressCallback.accept(Collections.singleton(uri));
}
} else {
// failed to unindex, reason unknown
failures.add(new FailedUnindex(Collections.singleton(uri), null));
}
} catch (Exception ex) {
failures.add(new FailedUnindex(Collections.singleton(uri), ex));
}
}
return UnindexReport.withFailures(failures);
});
}
}
Loading

0 comments on commit 7af44ec

Please sign in to comment.