Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve indexer queue worker thread handling #664

Merged
merged 3 commits into from
Aug 25, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions dicoogle/src/main/java/pt/ua/dicoogle/server/DicomStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class DicomStorage extends StorageService {
private BlockingQueue<ImageElement> queue = new PriorityBlockingQueue<>();
private NetworkApplicationEntity[] naeArr = null;
private AtomicLong seqNum = new AtomicLong(0L);
private volatile boolean workerShouldExit = false;

private static boolean ASYNC_INDEX = Boolean.valueOf(System.getProperty("dicoogle.index.async", "true"));

/**
Expand Down Expand Up @@ -366,11 +368,12 @@ static int compareElementsImpl(Set<String> priorityAETs, String thisAETitle, lon
return Long.compare(thisSeqNumber, otherSeqNumber);
}

class Indexer extends Thread {
class IndexerQueueWorker implements Runnable {
public Collection<IndexerInterface> plugins;

@Override
public void run() {
while (true) {
while (!workerShouldExit) {
try {
// Fetch an element by the queue taking into account the priorities.
ImageElement element = queue.take();
Expand All @@ -380,15 +383,16 @@ public void run() {
else
PluginController.getInstance().indexBlocking(exam);
} catch (InterruptedException ex) {
LOG.error("Could not take instance to index", ex);
LOG.warn("Indexer queue worker thread interrupted", ex);
} catch (Exception ex) {
LOG.error("Unexpected error in storage index actor", ex);
LOG.error("Unexpected error in indexer queue worker", ex);
}
}
LOG.debug("Indexer queue worker exiting by request");
}
}

private Indexer indexer = new Indexer();
private Thread indexer = new Thread(new IndexerQueueWorker(), "indexer-queue-worker");

/*
* Start the Storage Service
Expand All @@ -397,12 +401,20 @@ public void run() {
public void start() throws IOException {
device.startListening(executor);
indexer.start();

indexer.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
LOG.error("Fatal error in indexer queue worker", e);
ControlServices.getInstance().stopStorage();
LOG.warn("DICOM storage service was taken down to prevent further errors");
});
}

/**
* Stop the storage service
*/
public void stop() {
device.stopListening();
workerShouldExit = true;
indexer.interrupt();
}
}