Skip to content

Commit

Permalink
Improve indexer queue worker thread handling (#664)
Browse files Browse the repository at this point in the history
* Improve DICOM storage indexer thread handling

- define indexer thread with a Runnable
- name indexer thread to indexer-queue-worker
- add logic to take down old indexer worker thread
   - if DICOM storage service is stopped,
     flip `workerShouldExit` flag and interrupt thread

* Add exception handler to indexer queue worker

- for damage control,
  DICOM storage service is disabled automatically,
  which prevents the archive from accepting files
  that will not be indexed

* Tune logging in indexer worker thread

- reduce Interrupted log line to warning level
- tweak messages
  • Loading branch information
Enet4 committed Aug 25, 2023
1 parent c594989 commit 74e3b9f
Showing 1 changed file with 17 additions and 5 deletions.
22 changes: 17 additions & 5 deletions dicoogle/src/main/java/pt/ua/dicoogle/server/DicomStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class DicomStorage extends StorageService {
private BlockingQueue<ImageElement> queue = new PriorityBlockingQueue<>();
private NetworkApplicationEntity[] naeArr = null;
private AtomicLong seqNum = new AtomicLong(0L);
private volatile boolean workerShouldExit = false;

private static boolean ASYNC_INDEX = Boolean.valueOf(System.getProperty("dicoogle.index.async", "true"));

/**
Expand Down Expand Up @@ -366,11 +368,12 @@ static int compareElementsImpl(Set<String> priorityAETs, String thisAETitle, lon
return Long.compare(thisSeqNumber, otherSeqNumber);
}

class Indexer extends Thread {
class IndexerQueueWorker implements Runnable {
public Collection<IndexerInterface> plugins;

@Override
public void run() {
while (true) {
while (!workerShouldExit) {
try {
// Fetch an element by the queue taking into account the priorities.
ImageElement element = queue.take();
Expand All @@ -380,15 +383,16 @@ public void run() {
else
PluginController.getInstance().indexBlocking(exam);
} catch (InterruptedException ex) {
LOG.error("Could not take instance to index", ex);
LOG.warn("Indexer queue worker thread interrupted", ex);
} catch (Exception ex) {
LOG.error("Unexpected error in storage index actor", ex);
LOG.error("Unexpected error in indexer queue worker", ex);
}
}
LOG.debug("Indexer queue worker exiting by request");
}
}

private Indexer indexer = new Indexer();
private Thread indexer = new Thread(new IndexerQueueWorker(), "indexer-queue-worker");

/*
* Start the Storage Service
Expand All @@ -397,12 +401,20 @@ public void run() {
public void start() throws IOException {
device.startListening(executor);
indexer.start();

indexer.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
LOG.error("Fatal error in indexer queue worker", e);
ControlServices.getInstance().stopStorage();
LOG.warn("DICOM storage service was taken down to prevent further errors");
});
}

/**
* Stop the storage service
*/
public void stop() {
device.stopListening();
workerShouldExit = true;
indexer.interrupt();
}
}

0 comments on commit 74e3b9f

Please sign in to comment.