From 74d004a9a3e0155ba94670499cc8f94448b322a6 Mon Sep 17 00:00:00 2001 From: Nick Molcanov <32801560+nck-mlcnv@users.noreply.github.com> Date: Thu, 13 Jun 2024 15:05:10 +0200 Subject: [PATCH] Add ResponseBodyProcessor timeout (#250) * Add ResponseBodyProcessor timeout * Update documentation Co-authored-by: Alexander Bigerl --- docs/configuration/overview.md | 2 +- docs/configuration/response_body_processor.md | 9 +++---- example-suite.yml | 1 + schema/iguana-schema.json | 6 +++-- .../cc/worker/ResponseBodyProcessor.java | 24 +++++++++++++------ src/main/resources/iguana-schema.json | 6 +++-- 6 files changed, 32 insertions(+), 16 deletions(-) diff --git a/docs/configuration/overview.md b/docs/configuration/overview.md index 0b848c7eb..652b3e23d 100644 --- a/docs/configuration/overview.md +++ b/docs/configuration/overview.md @@ -104,7 +104,7 @@ metrics: ## Durations Durations are used to define time spans in the configuration. -They can be used for the `timeout`-property of the workers or for the `completionTarget`-property of the tasks. +They can be used for the `timeout`-property of the workers or the response body processors or for the `completionTarget`-property of the tasks. Duration values can be defined as a XSD duration string or as a string with a number and a unit. The following units are supported: - `s` or `sec`or `secs` for seconds diff --git a/docs/configuration/response_body_processor.md b/docs/configuration/response_body_processor.md index 650ef3546..5b4cc7259 100644 --- a/docs/configuration/response_body_processor.md +++ b/docs/configuration/response_body_processor.md @@ -18,7 +18,8 @@ To use a response body processor, it needs to be defined in the configuration fi in the `responseBodyProcessors` list. ## Properties -| property | required | description | example | -|-------------|----------|------------------------------------------------------------------------------------|-------------------------------------| -| contentType | yes | The content type of the response body. | `"application/sparql-results+json"` | -| threads | no | The number of threads that are used to process the response bodies. (default is 1) | `2` | +| property | required | description | example | +|-------------|----------|--------------------------------------------------------------------------------------------------------------------|-------------------------------------| +| contentType | yes | The content type of the response body. | `"application/sparql-results+json"` | +| threads | no | The number of threads that are used to process the response bodies. (default is 1) | `2` | +| timeout | no | The maximum duration that the response body processor can take to process a response body. (default is 10 minutes) | `10m` | \ No newline at end of file diff --git a/example-suite.yml b/example-suite.yml index 66e8ba6c0..a2928fed7 100644 --- a/example-suite.yml +++ b/example-suite.yml @@ -105,3 +105,4 @@ storages: responseBodyProcessors: - contentType: "application/sparql-results+json" threads: 1 + timeout: 1 min diff --git a/schema/iguana-schema.json b/schema/iguana-schema.json index d2e12eff2..e0a821c39 100644 --- a/schema/iguana-schema.json +++ b/schema/iguana-schema.json @@ -151,11 +151,13 @@ "threads": { "type": "integer", "minimum": 1 + }, + "timeout" : { + "type": "string" } }, "required": [ - "contentType", - "threads" + "contentType" ], "title": "ResponseBodyProcessor" }, diff --git a/src/main/java/org/aksw/iguana/cc/worker/ResponseBodyProcessor.java b/src/main/java/org/aksw/iguana/cc/worker/ResponseBodyProcessor.java index 3cd6e56a2..6f44574c8 100644 --- a/src/main/java/org/aksw/iguana/cc/worker/ResponseBodyProcessor.java +++ b/src/main/java/org/aksw/iguana/cc/worker/ResponseBodyProcessor.java @@ -14,10 +14,11 @@ import java.util.concurrent.*; public class ResponseBodyProcessor { - public record Config(String contentType, Integer threads) { - public Config(String contentType, Integer threads) { + public record Config(String contentType, Integer threads, Duration timeout) { + public Config(String contentType, Integer threads, Duration timeout) { this.contentType = contentType; this.threads = threads == null ? 1 : threads; + this.timeout = timeout == null ? Duration.ofMinutes(10) : timeout; } } @@ -26,20 +27,24 @@ public record Key(long contentLength, long xxh64) {} public ResponseBodyProcessor(Config config) { this.executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(config.threads == null ? 1 : config.threads); this.languageProcessor = LanguageProcessor.getInstance(config.contentType); + this.timeout = config.timeout; } public ResponseBodyProcessor(String contentType) { - this(new Config(contentType, null)); + this(new Config(contentType, null, null)); } private static final Logger LOGGER = LoggerFactory.getLogger(ResponseBodyProcessor.class); + private final Duration timeout; + private final ConcurrentHashMap.KeySetView seenResponseBodies = ConcurrentHashMap.newKeySet(); private final List responseDataMetrics = Collections.synchronizedList(new ArrayList<>()); private final LanguageProcessor languageProcessor; private final ThreadPoolExecutor executor; + private final ScheduledExecutorService executorHandler = Executors.newScheduledThreadPool(1); public boolean add(long contentLength, long xxh64, BigByteArrayOutputStream bbaos) { final var key = new Key(contentLength, xxh64); @@ -51,10 +56,16 @@ public boolean add(long contentLength, long xxh64, BigByteArrayOutputStream bbao } private void submit(Key key, BigByteArrayOutputStream bigByteArrayOutputStream) { - executor.execute(() -> { + final var future = executor.submit(() -> { var processingResult = languageProcessor.process(new BigByteArrayInputStream(bigByteArrayOutputStream), key.xxh64); responseDataMetrics.add(processingResult); }); + executorHandler.schedule(() -> { + if (!future.isDone()) { + future.cancel(true); + LOGGER.warn("ResponseBodyProcessor timed out for key: {}", key); + } + }, timeout.toSeconds(), TimeUnit.SECONDS); } public List getResponseDataMetrics() { @@ -62,12 +73,11 @@ public List getResponseDataMetrics() { return responseDataMetrics; } - final var timeout = Duration.ofMinutes(10); - LOGGER.info(MessageFormat.format("Shutting down ResponseBodyProcessor with {0}min timeout to finish processing. {1} tasks remaining.", timeout.toMinutes(), executor.getQueue().size())); + LOGGER.info(MessageFormat.format("Shutting down ResponseBodyProcessor with {0} min timeout to finish processing. {1} tasks remaining.", timeout.toMinutes() + "." + (timeout.toSecondsPart() / (double) 60), executor.getQueue().size())); boolean noTimeout; try { executor.shutdown(); - noTimeout = executor.awaitTermination(10, TimeUnit.MINUTES); + noTimeout = executor.awaitTermination(timeout.toSeconds(), TimeUnit.SECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } diff --git a/src/main/resources/iguana-schema.json b/src/main/resources/iguana-schema.json index 375dd5679..4750e59df 100644 --- a/src/main/resources/iguana-schema.json +++ b/src/main/resources/iguana-schema.json @@ -151,11 +151,13 @@ "threads": { "type": "integer", "minimum": 1 + }, + "timeout" : { + "type": "string" } }, "required": [ - "contentType", - "threads" + "contentType" ], "title": "ResponseBodyProcessor" },