Skip to content

Commit

Permalink
[TH2-5267] Split ErrorCollector to interface and implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita-Smirnov-Exactpro committed Feb 18, 2025
1 parent 5c1dec5 commit bce9bad
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 154 deletions.
153 changes: 4 additions & 149 deletions src/main/java/com/exactpro/th2/mstore/ErrorCollector.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
* Copyright 2023-2025 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,163 +15,18 @@
*/
package com.exactpro.th2.mstore;

import com.exactpro.th2.common.event.Event;
import com.exactpro.th2.common.event.Event.Status;
import com.exactpro.th2.common.event.IBodyData;
import com.exactpro.th2.common.grpc.EventBatch;
import com.exactpro.th2.common.grpc.EventID;
import com.exactpro.th2.common.schema.message.MessageRouter;
import com.fasterxml.jackson.annotation.JsonCreator;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static java.util.Objects.requireNonNull;

@SuppressWarnings("unused")
public class ErrorCollector implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(ErrorCollector.class);
private final ScheduledFuture<?> drainFuture;
private final MessageRouter<EventBatch> eventRouter;
private final EventID rootEvent;
private final Lock lock = new ReentrantLock();
private Map<String, ErrorMetadata> errors = new HashMap<>();

public ErrorCollector(@NotNull ScheduledExecutorService executor,
@NotNull MessageRouter<EventBatch> eventRouter,
@NotNull EventID rootEvent,
long period,
@NotNull TimeUnit unit) {
this.eventRouter = requireNonNull(eventRouter, "Event router can't be null");
this.rootEvent = requireNonNull(rootEvent, "Root event can't be null");
requireNonNull(unit, "Unit can't be null");
this.drainFuture = requireNonNull(executor, "Executor can't be null")
.scheduleAtFixedRate(this::drain, period, period, unit);
}

public ErrorCollector(@NotNull ScheduledExecutorService executor,
@NotNull MessageRouter<EventBatch> eventRouter,
@NotNull EventID rootEvent) {
this(executor, eventRouter, rootEvent, 1, TimeUnit.MINUTES);
}

public interface ErrorCollector extends AutoCloseable {
/**
* Log error and call the {@link #collect(String)}} method
* @param error is used as key identifier. Avoid put a lot of unique values
*/
public void collect(Logger logger, String error, Throwable cause) {
logger.error(error, cause);
collect(error);
}
void collect(Logger logger, String error, Throwable cause);

/**
* @param error is used as key identifier. Avoid put a lot of unique values
*/
public void collect(String error) {
lock.lock();
try {
errors.compute(error, (key, metadata) -> {
if (metadata == null) {
return new ErrorMetadata();
}
metadata.inc();
return metadata;
});
} finally {
lock.unlock();
}
}

@Override
public void close() throws Exception {
drainFuture.cancel(true);
drain();
}

private void drain() {
try {
Map<String, ErrorMetadata> map = clear();
if (map.isEmpty()) { return; }

eventRouter.sendAll(Event.start()
.name("mstore internal problem(s): " + calculateTotalQty(map.values()))
.type("InternalError")
.status(Status.FAILED)
.bodyData(new BodyData(map))
.toBatchProto(rootEvent));

} catch (IOException | RuntimeException e) {
LOGGER.error("Drain events task failure", e);
}
}

private Map<String, ErrorMetadata> clear() {
lock.lock();
try {
Map<String, ErrorMetadata> result = errors;
errors = new HashMap<>();
return result;
} finally {
lock.unlock();
}
}

private static int calculateTotalQty(Collection<ErrorMetadata> errors) {
return errors.stream()
.map(ErrorMetadata::getQuantity)
.reduce(0, Integer::sum);
}

private static class BodyData implements IBodyData {
private final Map<String, ErrorMetadata> errors;
@JsonCreator
private BodyData(Map<String, ErrorMetadata> errors) {
this.errors = errors;
}
public Map<String, ErrorMetadata> getErrors() {
return errors;
}
}

private static class ErrorMetadata {
private final Instant firstDate = Instant.now();
private Instant lastDate;
private int quantity = 1;

public void inc() {
quantity += 1;
lastDate = Instant.now();
}

public Instant getFirstDate() {
return firstDate;
}

public Instant getLastDate() {
return lastDate;
}

public void setLastDate(Instant lastDate) {
this.lastDate = lastDate;
}

public int getQuantity() {
return quantity;
}

public void setQuantity(int quantity) {
this.quantity = quantity;
}
}
void collect(String error);
}
185 changes: 185 additions & 0 deletions src/main/java/com/exactpro/th2/mstore/EventErrorCollector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright 2023-2025 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.exactpro.th2.mstore;

import com.exactpro.th2.common.event.Event;
import com.exactpro.th2.common.event.IBodyData;
import com.exactpro.th2.common.grpc.EventBatch;
import com.exactpro.th2.common.grpc.EventID;
import com.exactpro.th2.common.schema.message.MessageRouter;
import com.fasterxml.jackson.annotation.JsonCreator;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static java.util.Objects.requireNonNull;

public class EventErrorCollector implements ErrorCollector {
private static final Logger LOGGER = LoggerFactory.getLogger(EventErrorCollector.class);
private final ScheduledFuture<?> drainFuture;
private final MessageRouter<EventBatch> eventRouter;
private final EventID rootEvent;
private final Lock lock = new ReentrantLock();
private Map<String, EventErrorCollector.ErrorMetadata> errors = new HashMap<>();

private EventErrorCollector(@NotNull ScheduledExecutorService executor,
@NotNull MessageRouter<EventBatch> eventRouter,
@NotNull EventID rootEvent,
long period,
@NotNull TimeUnit unit) {
this.eventRouter = requireNonNull(eventRouter, "Event router can't be null");
this.rootEvent = requireNonNull(rootEvent, "Root event can't be null");
requireNonNull(unit, "Unit can't be null");
this.drainFuture = requireNonNull(executor, "Executor can't be null")
.scheduleAtFixedRate(this::drain, period, period, unit);
}

public static ErrorCollector create(@NotNull ScheduledExecutorService executor,
@NotNull MessageRouter<EventBatch> eventRouter,
@NotNull EventID rootEvent,
long period,
@NotNull TimeUnit unit) {
return new EventErrorCollector(executor, eventRouter, rootEvent, period, unit);
}

public static ErrorCollector create(@NotNull ScheduledExecutorService executor,
@NotNull MessageRouter<EventBatch> eventRouter,
@NotNull EventID rootEvent) {
return create(executor, eventRouter, rootEvent, 1, TimeUnit.MINUTES);
}

/**
* Log error and call the {@link #collect(String)}} method
* @param error is used as key identifier. Avoid put a lot of unique values
*/
public void collect(Logger logger, String error, Throwable cause) {
logger.error(error, cause);
collect(error);
}

/**
* @param error is used as key identifier. Avoid put a lot of unique values
*/
public void collect(String error) {
lock.lock();
try {
errors.compute(error, (key, metadata) -> {
if (metadata == null) {
return new EventErrorCollector.ErrorMetadata();
}
metadata.inc();
return metadata;
});
} finally {
lock.unlock();
}
}

@Override
public void close() throws Exception {
drainFuture.cancel(true);
drain();
}

private void drain() {
try {
Map<String, EventErrorCollector.ErrorMetadata> map = clear();
if (map.isEmpty()) { return; }

eventRouter.sendAll(Event.start()
.name("mstore internal problem(s): " + calculateTotalQty(map.values()))
.type("InternalError")
.status(Event.Status.FAILED)
.bodyData(new EventErrorCollector.BodyData(map))
.toBatchProto(rootEvent));

} catch (IOException | RuntimeException e) {
LOGGER.error("Drain events task failure", e);
}
}

private Map<String, EventErrorCollector.ErrorMetadata> clear() {
lock.lock();
try {
Map<String, EventErrorCollector.ErrorMetadata> result = errors;
errors = new HashMap<>();
return result;
} finally {
lock.unlock();
}
}

private static int calculateTotalQty(Collection<EventErrorCollector.ErrorMetadata> errors) {
return errors.stream()
.map(EventErrorCollector.ErrorMetadata::getQuantity)
.reduce(0, Integer::sum);
}

@SuppressWarnings("unused")
private static class BodyData implements IBodyData {
private final Map<String, EventErrorCollector.ErrorMetadata> errors;
@JsonCreator
private BodyData(Map<String, EventErrorCollector.ErrorMetadata> errors) {
this.errors = errors;
}
public Map<String, EventErrorCollector.ErrorMetadata> getErrors() {
return errors;
}
}

@SuppressWarnings("unused")
private static class ErrorMetadata {
private final Instant firstDate = Instant.now();
private Instant lastDate;
private int quantity = 1;

public void inc() {
quantity += 1;
lastDate = Instant.now();
}

public Instant getFirstDate() {
return firstDate;
}

public Instant getLastDate() {
return lastDate;
}

public void setLastDate(Instant lastDate) {
this.lastDate = lastDate;
}

public int getQuantity() {
return quantity;
}

public void setQuantity(int quantity) {
this.quantity = quantity;
}
}
}
2 changes: 1 addition & 1 deletion src/main/java/com/exactpro/th2/mstore/MessageStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
shutdownManager.registerResource(() -> shutdownGracefully(executor, 5, TimeUnit.SECONDS));

ErrorCollector errorCollector = new ErrorCollector(executor, factory.getEventBatchRouter(), factory.getRootEventId());
ErrorCollector errorCollector = EventErrorCollector.create(executor, factory.getEventBatchRouter(), factory.getRootEventId());
shutdownManager.registerResource(errorCollector);

// Initialize persistor
Expand Down
8 changes: 4 additions & 4 deletions src/test/java/com/exactpro/th2/mstore/TestErrorCollector.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
* Copyright 2023-2025 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,8 +51,8 @@
@SuppressWarnings("DynamicRegexReplaceableByCompiledPattern")
class TestErrorCollector {

private static final long PERIOD = RandomUtils.nextLong(0, Long.MAX_VALUE);
private static final TimeUnit TIME_UNIT = TimeUnit.values()[RandomUtils.nextInt(0, TimeUnit.values().length)] ;
private static final long PERIOD = RandomUtils.secure().randomLong(0, Long.MAX_VALUE);
private static final TimeUnit TIME_UNIT = TimeUnit.values()[RandomUtils.secure().randomInt(0, TimeUnit.values().length)] ;

@Mock
private Logger logger;
Expand All @@ -75,7 +75,7 @@ class TestErrorCollector {
@BeforeEach
void beforeEach() {
doReturn(future).when(executor).scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
errorCollector = new ErrorCollector(executor, eventRouter, rootEvent, PERIOD, TIME_UNIT);
errorCollector = EventErrorCollector.create(executor, eventRouter, rootEvent, PERIOD, TIME_UNIT);
verify(executor).scheduleAtFixedRate(taskCaptor.capture(), eq(PERIOD), eq(PERIOD), eq(TIME_UNIT));
verifyNoMoreInteractions(executor);
}
Expand Down

0 comments on commit bce9bad

Please sign in to comment.