diff --git a/src/main/java/com/exactpro/th2/mstore/ErrorCollector.java b/src/main/java/com/exactpro/th2/mstore/ErrorCollector.java index 4bfa3e0..0e52f09 100644 --- a/src/main/java/com/exactpro/th2/mstore/ErrorCollector.java +++ b/src/main/java/com/exactpro/th2/mstore/ErrorCollector.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 Exactpro (Exactpro Systems Limited) + * Copyright 2023-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,163 +15,18 @@ */ package com.exactpro.th2.mstore; -import com.exactpro.th2.common.event.Event; -import com.exactpro.th2.common.event.Event.Status; -import com.exactpro.th2.common.event.IBodyData; -import com.exactpro.th2.common.grpc.EventBatch; -import com.exactpro.th2.common.grpc.EventID; -import com.exactpro.th2.common.schema.message.MessageRouter; -import com.fasterxml.jackson.annotation.JsonCreator; -import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.time.Instant; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import static java.util.Objects.requireNonNull; @SuppressWarnings("unused") -public class ErrorCollector implements AutoCloseable { - private static final Logger LOGGER = LoggerFactory.getLogger(ErrorCollector.class); - private final ScheduledFuture drainFuture; - private final MessageRouter eventRouter; - private final EventID rootEvent; - private final Lock lock = new ReentrantLock(); - private Map errors = new HashMap<>(); - - public ErrorCollector(@NotNull ScheduledExecutorService executor, - @NotNull MessageRouter eventRouter, - @NotNull EventID rootEvent, - long period, - @NotNull TimeUnit unit) { - this.eventRouter = requireNonNull(eventRouter, "Event router can't be null"); - this.rootEvent = requireNonNull(rootEvent, "Root event can't be null"); - requireNonNull(unit, "Unit can't be null"); - this.drainFuture = requireNonNull(executor, "Executor can't be null") - .scheduleAtFixedRate(this::drain, period, period, unit); - } - - public ErrorCollector(@NotNull ScheduledExecutorService executor, - @NotNull MessageRouter eventRouter, - @NotNull EventID rootEvent) { - this(executor, eventRouter, rootEvent, 1, TimeUnit.MINUTES); - } - +public interface ErrorCollector extends AutoCloseable { /** * Log error and call the {@link #collect(String)}} method * @param error is used as key identifier. Avoid put a lot of unique values */ - public void collect(Logger logger, String error, Throwable cause) { - logger.error(error, cause); - collect(error); - } + void collect(Logger logger, String error, Throwable cause); /** * @param error is used as key identifier. Avoid put a lot of unique values */ - public void collect(String error) { - lock.lock(); - try { - errors.compute(error, (key, metadata) -> { - if (metadata == null) { - return new ErrorMetadata(); - } - metadata.inc(); - return metadata; - }); - } finally { - lock.unlock(); - } - } - - @Override - public void close() throws Exception { - drainFuture.cancel(true); - drain(); - } - - private void drain() { - try { - Map map = clear(); - if (map.isEmpty()) { return; } - - eventRouter.sendAll(Event.start() - .name("mstore internal problem(s): " + calculateTotalQty(map.values())) - .type("InternalError") - .status(Status.FAILED) - .bodyData(new BodyData(map)) - .toBatchProto(rootEvent)); - - } catch (IOException | RuntimeException e) { - LOGGER.error("Drain events task failure", e); - } - } - - private Map clear() { - lock.lock(); - try { - Map result = errors; - errors = new HashMap<>(); - return result; - } finally { - lock.unlock(); - } - } - - private static int calculateTotalQty(Collection errors) { - return errors.stream() - .map(ErrorMetadata::getQuantity) - .reduce(0, Integer::sum); - } - - private static class BodyData implements IBodyData { - private final Map errors; - @JsonCreator - private BodyData(Map errors) { - this.errors = errors; - } - public Map getErrors() { - return errors; - } - } - - private static class ErrorMetadata { - private final Instant firstDate = Instant.now(); - private Instant lastDate; - private int quantity = 1; - - public void inc() { - quantity += 1; - lastDate = Instant.now(); - } - - public Instant getFirstDate() { - return firstDate; - } - - public Instant getLastDate() { - return lastDate; - } - - public void setLastDate(Instant lastDate) { - this.lastDate = lastDate; - } - - public int getQuantity() { - return quantity; - } - - public void setQuantity(int quantity) { - this.quantity = quantity; - } - } + void collect(String error); } diff --git a/src/main/java/com/exactpro/th2/mstore/EventErrorCollector.java b/src/main/java/com/exactpro/th2/mstore/EventErrorCollector.java new file mode 100644 index 0000000..3aafc8e --- /dev/null +++ b/src/main/java/com/exactpro/th2/mstore/EventErrorCollector.java @@ -0,0 +1,185 @@ +/* + * Copyright 2023-2025 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.mstore; + +import com.exactpro.th2.common.event.Event; +import com.exactpro.th2.common.event.IBodyData; +import com.exactpro.th2.common.grpc.EventBatch; +import com.exactpro.th2.common.grpc.EventID; +import com.exactpro.th2.common.schema.message.MessageRouter; +import com.fasterxml.jackson.annotation.JsonCreator; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Instant; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static java.util.Objects.requireNonNull; + +public class EventErrorCollector implements ErrorCollector { + private static final Logger LOGGER = LoggerFactory.getLogger(EventErrorCollector.class); + private final ScheduledFuture drainFuture; + private final MessageRouter eventRouter; + private final EventID rootEvent; + private final Lock lock = new ReentrantLock(); + private Map errors = new HashMap<>(); + + private EventErrorCollector(@NotNull ScheduledExecutorService executor, + @NotNull MessageRouter eventRouter, + @NotNull EventID rootEvent, + long period, + @NotNull TimeUnit unit) { + this.eventRouter = requireNonNull(eventRouter, "Event router can't be null"); + this.rootEvent = requireNonNull(rootEvent, "Root event can't be null"); + requireNonNull(unit, "Unit can't be null"); + this.drainFuture = requireNonNull(executor, "Executor can't be null") + .scheduleAtFixedRate(this::drain, period, period, unit); + } + + public static ErrorCollector create(@NotNull ScheduledExecutorService executor, + @NotNull MessageRouter eventRouter, + @NotNull EventID rootEvent, + long period, + @NotNull TimeUnit unit) { + return new EventErrorCollector(executor, eventRouter, rootEvent, period, unit); + } + + public static ErrorCollector create(@NotNull ScheduledExecutorService executor, + @NotNull MessageRouter eventRouter, + @NotNull EventID rootEvent) { + return create(executor, eventRouter, rootEvent, 1, TimeUnit.MINUTES); + } + + /** + * Log error and call the {@link #collect(String)}} method + * @param error is used as key identifier. Avoid put a lot of unique values + */ + public void collect(Logger logger, String error, Throwable cause) { + logger.error(error, cause); + collect(error); + } + + /** + * @param error is used as key identifier. Avoid put a lot of unique values + */ + public void collect(String error) { + lock.lock(); + try { + errors.compute(error, (key, metadata) -> { + if (metadata == null) { + return new EventErrorCollector.ErrorMetadata(); + } + metadata.inc(); + return metadata; + }); + } finally { + lock.unlock(); + } + } + + @Override + public void close() throws Exception { + drainFuture.cancel(true); + drain(); + } + + private void drain() { + try { + Map map = clear(); + if (map.isEmpty()) { return; } + + eventRouter.sendAll(Event.start() + .name("mstore internal problem(s): " + calculateTotalQty(map.values())) + .type("InternalError") + .status(Event.Status.FAILED) + .bodyData(new EventErrorCollector.BodyData(map)) + .toBatchProto(rootEvent)); + + } catch (IOException | RuntimeException e) { + LOGGER.error("Drain events task failure", e); + } + } + + private Map clear() { + lock.lock(); + try { + Map result = errors; + errors = new HashMap<>(); + return result; + } finally { + lock.unlock(); + } + } + + private static int calculateTotalQty(Collection errors) { + return errors.stream() + .map(EventErrorCollector.ErrorMetadata::getQuantity) + .reduce(0, Integer::sum); + } + + @SuppressWarnings("unused") + private static class BodyData implements IBodyData { + private final Map errors; + @JsonCreator + private BodyData(Map errors) { + this.errors = errors; + } + public Map getErrors() { + return errors; + } + } + + @SuppressWarnings("unused") + private static class ErrorMetadata { + private final Instant firstDate = Instant.now(); + private Instant lastDate; + private int quantity = 1; + + public void inc() { + quantity += 1; + lastDate = Instant.now(); + } + + public Instant getFirstDate() { + return firstDate; + } + + public Instant getLastDate() { + return lastDate; + } + + public void setLastDate(Instant lastDate) { + this.lastDate = lastDate; + } + + public int getQuantity() { + return quantity; + } + + public void setQuantity(int quantity) { + this.quantity = quantity; + } + } +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/mstore/MessageStore.java b/src/main/java/com/exactpro/th2/mstore/MessageStore.java index c1dd04a..1fd1352 100644 --- a/src/main/java/com/exactpro/th2/mstore/MessageStore.java +++ b/src/main/java/com/exactpro/th2/mstore/MessageStore.java @@ -59,7 +59,7 @@ public static void main(String[] args) { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY); shutdownManager.registerResource(() -> shutdownGracefully(executor, 5, TimeUnit.SECONDS)); - ErrorCollector errorCollector = new ErrorCollector(executor, factory.getEventBatchRouter(), factory.getRootEventId()); + ErrorCollector errorCollector = EventErrorCollector.create(executor, factory.getEventBatchRouter(), factory.getRootEventId()); shutdownManager.registerResource(errorCollector); // Initialize persistor diff --git a/src/test/java/com/exactpro/th2/mstore/TestErrorCollector.java b/src/test/java/com/exactpro/th2/mstore/TestErrorCollector.java index f7b48fc..6d95b53 100644 --- a/src/test/java/com/exactpro/th2/mstore/TestErrorCollector.java +++ b/src/test/java/com/exactpro/th2/mstore/TestErrorCollector.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 Exactpro (Exactpro Systems Limited) + * Copyright 2023-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,8 +51,8 @@ @SuppressWarnings("DynamicRegexReplaceableByCompiledPattern") class TestErrorCollector { - private static final long PERIOD = RandomUtils.nextLong(0, Long.MAX_VALUE); - private static final TimeUnit TIME_UNIT = TimeUnit.values()[RandomUtils.nextInt(0, TimeUnit.values().length)] ; + private static final long PERIOD = RandomUtils.secure().randomLong(0, Long.MAX_VALUE); + private static final TimeUnit TIME_UNIT = TimeUnit.values()[RandomUtils.secure().randomInt(0, TimeUnit.values().length)] ; @Mock private Logger logger; @@ -75,7 +75,7 @@ class TestErrorCollector { @BeforeEach void beforeEach() { doReturn(future).when(executor).scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); - errorCollector = new ErrorCollector(executor, eventRouter, rootEvent, PERIOD, TIME_UNIT); + errorCollector = EventErrorCollector.create(executor, eventRouter, rootEvent, PERIOD, TIME_UNIT); verify(executor).scheduleAtFixedRate(taskCaptor.capture(), eq(PERIOD), eq(PERIOD), eq(TIME_UNIT)); verifyNoMoreInteractions(executor); }