From a86542f62ce6624b1dd45500b853ef92b27997bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A5le=20Pedersen?= Date: Wed, 30 Aug 2023 10:18:08 +0200 Subject: [PATCH 1/2] Refactored MessageBus channel name to use enum --- .../tools/horreum/api/alerting/Change.java | 1 - .../tools/horreum/api/alerting/DataPoint.java | 3 -- .../api/alerting/MissingDataRuleResult.java | 2 +- .../tools/horreum/api/alerting/Variable.java | 2 +- .../api/services/ExperimentService.java | 1 - .../tools/horreum/bus/MessageBus.java | 36 +++++++++--------- .../tools/horreum/bus/MessageBusChannels.java | 24 ++++++++++++ .../tools/horreum/entity/BannerDAO.java | 1 - .../horreum/entity/alerting/ChangeDAO.java | 25 ------------- .../horreum/entity/alerting/DataPointDAO.java | 37 ------------------- .../tools/horreum/entity/data/ActionDAO.java | 19 ---------- .../tools/horreum/entity/data/DataSetDAO.java | 22 ----------- .../tools/horreum/entity/data/RunDAO.java | 3 -- .../tools/horreum/entity/data/SchemaDAO.java | 12 ------ .../tools/horreum/entity/data/TestDAO.java | 3 -- .../tools/horreum/svc/ActionServiceImpl.java | 33 +++++++++-------- .../horreum/svc/AlertingServiceImpl.java | 26 ++++++------- .../tools/horreum/svc/DatasetServiceImpl.java | 5 ++- .../tools/horreum/svc/EventAggregator.java | 5 ++- .../horreum/svc/ExperimentServiceImpl.java | 7 ++-- .../tools/horreum/svc/LogServiceImpl.java | 3 +- .../horreum/svc/NotificationServiceImpl.java | 5 ++- .../tools/horreum/svc/ReportServiceImpl.java | 6 +-- .../tools/horreum/svc/RunServiceImpl.java | 18 ++++----- .../tools/horreum/svc/SchemaServiceImpl.java | 7 ++-- .../horreum/svc/SubscriptionServiceImpl.java | 3 +- .../tools/horreum/svc/TestServiceImpl.java | 7 ++-- .../tools/horreum/bus/MessageBusTest.java | 8 ++-- .../tools/horreum/svc/ActionServiceTest.java | 7 ++-- .../horreum/svc/AlertingServiceTest.java | 35 +++++++++--------- .../tools/horreum/svc/BaseServiceTest.java | 20 +++++----- .../tools/horreum/svc/DatasetServiceTest.java | 13 ++++--- .../tools/horreum/svc/ReportServiceTest.java | 6 +-- .../tools/horreum/svc/RunServiceTest.java | 29 ++++++++------- .../tools/horreum/svc/SchemaServiceTest.java | 7 ++-- .../tools/horreum/svc/TestServiceTest.java | 21 +++++------ 36 files changed, 178 insertions(+), 284 deletions(-) create mode 100644 horreum-backend/src/main/java/io/hyperfoil/tools/horreum/bus/MessageBusChannels.java diff --git a/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/alerting/Change.java b/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/alerting/Change.java index 5574beef5..de550c16a 100644 --- a/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/alerting/Change.java +++ b/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/alerting/Change.java @@ -7,7 +7,6 @@ import java.time.Instant; public class Change { - public static final String EVENT_NEW = "change/new"; @JsonProperty( required = true ) public int id; @JsonProperty( required = true ) diff --git a/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/alerting/DataPoint.java b/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/alerting/DataPoint.java index ef630ea34..f82bd613c 100644 --- a/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/alerting/DataPoint.java +++ b/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/alerting/DataPoint.java @@ -6,9 +6,6 @@ import java.time.Instant; public class DataPoint { - public static final String EVENT_NEW = "datapoint/new"; - public static final String EVENT_DELETED = "datapoint/deleted"; - public static final String EVENT_DATASET_PROCESSED = "datapoint/dataset_processed"; public Integer id; public Instant timestamp; public double value; diff --git a/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/alerting/MissingDataRuleResult.java b/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/alerting/MissingDataRuleResult.java index 3cdab0197..9587df16b 100644 --- a/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/alerting/MissingDataRuleResult.java +++ b/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/alerting/MissingDataRuleResult.java @@ -5,7 +5,7 @@ import java.util.Objects; public class MissingDataRuleResult { - private Pk pk; + private Pk pk; public Instant timestamp; public MissingDataRule rule; diff --git a/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/alerting/Variable.java b/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/alerting/Variable.java index c550d2d4e..fe75b2f5b 100644 --- a/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/alerting/Variable.java +++ b/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/alerting/Variable.java @@ -42,7 +42,7 @@ public Variable(Integer id, int testId, String name, String group, int order, Js } public String toString() { - return "VariableDTO{id=" + this.id + ", testId=" + this.testId + ", name='" + this.name + '\'' + ", group='" + this.group + '\'' + ", order=" + this.order + ", labels=" + this.labels + ", calculation='" + this.calculation + '\'' + ", changeDetection=" + this.changeDetection + '}'; + return "Variable{id=" + this.id + ", testId=" + this.testId + ", name='" + this.name + '\'' + ", group='" + this.group + '\'' + ", order=" + this.order + ", labels=" + this.labels + ", calculation='" + this.calculation + '\'' + ", changeDetection=" + this.changeDetection + '}'; } } diff --git a/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/services/ExperimentService.java b/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/services/ExperimentService.java index a992aa171..dfc56c599 100644 --- a/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/services/ExperimentService.java +++ b/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/services/ExperimentService.java @@ -63,7 +63,6 @@ enum BetterOrWorse { @Schema(name = "ExperimentResult") class ExperimentResult { - public static final String NEW_RESULT = "experiment_result/new"; public ExperimentProfile profile; public List logs; diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/bus/MessageBus.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/bus/MessageBus.java index f932332fd..69f0ceeff 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/bus/MessageBus.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/bus/MessageBus.java @@ -97,15 +97,15 @@ void destroy() { } @Transactional(Transactional.TxType.MANDATORY) - public void publish(String channel, int testId, Object payload) { + public void publish(MessageBusChannels channel, int testId, Object payload) { JsonNode json = Util.OBJECT_MAPPER.valueToTree(payload); - Integer componentFlag = flags.get(channel); + Integer componentFlag = flags.get(channel.name()); BigInteger id; if (componentFlag != null && componentFlag != 0) { try (CloseMe ignored = roleManager.withRoles(Collections.singleton(Roles.HORREUM_MESSAGEBUS))) { id = (BigInteger) em.createNativeQuery("INSERT INTO messagebus (id, \"timestamp\", channel, testid, message, flags) VALUES (nextval('messagebus_seq'), NOW(), ?1, ?2, ?3, ?4) RETURNING id", BigInteger.class) .unwrap(NativeQuery.class) - .setParameter(1, channel) + .setParameter(1, channel.name()) .setParameter(2, testId) .setParameter(3, json, JsonBinaryType.INSTANCE) .setParameter(4, componentFlag) @@ -120,7 +120,7 @@ public void publish(String channel, int testId, Object payload) { log.debugf("Publishing %d on test %d with flag %X on %s: %s", id.longValue(), testId, flag, channel, payload); Util.doAfterCommitThrowing(tm, () -> { log.debugf("Sending %d on test %d with flag %X to eventbus %s ", id.longValue(), testId, flag, channel); - eventBus.publish(channel, new Message(id.longValue(), testId, flag, payload)); + eventBus.publish(channel.name(), new Message(id.longValue(), testId, flag, payload)); }); } catch (RollbackException e) { log.debug("Not publishing the event as the transaction has been marked rollback-only"); @@ -129,8 +129,8 @@ public void publish(String channel, int testId, Object payload) { } } - public AutoCloseable subscribe(String channel, String component, Class payloadClass, Handler handler) { - payloadClasses.compute(channel, (c, current) -> { + public AutoCloseable subscribe(MessageBusChannels channel, String component, Class payloadClass, Handler handler) { + payloadClasses.compute(channel.name(), (c, current) -> { if (current == null || current.isAssignableFrom(payloadClass)) { return payloadClass; } else if (payloadClass.isAssignableFrom(current)) { @@ -141,14 +141,14 @@ public AutoCloseable subscribe(String channel, String component, Class pa }); int index = registerIndex(channel, component); log.debugf("Channel %s, component %s has index %d", channel, component, index); - MessageConsumer consumer = eventBus.consumer(channel, event -> { + MessageConsumer consumer = eventBus.consumer(channel.name(), event -> { if (!(event.body() instanceof Message)) { - log.errorf("Not a message on %s: %s", channel, event.body()); + log.errorf("Not a message on %s: %s", channel.name(), event.body()); return; } Message msg = (Message) event.body(); if ((msg.componentFlags & (1 << index)) == 0) { - log.debugf("%s ignoring message %d on %s with flags %X: doesn't match index %d", component, msg.id, channel, msg.componentFlags, index); + log.debugf("%s ignoring message %d on %s with flags %X: doesn't match index %d", component, msg.id, channel.name(), msg.componentFlags, index); return; } executeForTest(msg.testId, () -> { @@ -162,9 +162,9 @@ public AutoCloseable subscribe(String channel, String component, Class pa // so we'll remove the record with a trigger int updateCount = em.createNativeQuery("UPDATE messagebus SET flags = flags & ~(1 << ?1) WHERE id = ?2") .setParameter(1, index).setParameter(2, msg.id).executeUpdate(); - log.debugf("%s consumed %d on %s - %d records updated", component, msg.id, channel, updateCount); + log.debugf("%s consumed %d on %s - %d records updated", component, msg.id, channel.name(), updateCount); } else { - log.debugf("Rolling back, %s cannot consume %d on %s", component, msg.id, channel); + log.debugf("Rolling back, %s cannot consume %d on %s", component, msg.id, channel.name()); } } catch (SystemException e) { log.error("Exception querying transaction status", e); @@ -177,16 +177,16 @@ public AutoCloseable subscribe(String channel, String component, Class pa return null; }); } catch (Throwable t) { - errorReporter.reportException(t, ERROR_SUBJECT, "Exception in handler for message bus channel %s, message %s%n%n", channel, msg.payload); + errorReporter.reportException(t, ERROR_SUBJECT, "Exception in handler for message bus channel %s, message %s%n%n", channel.name(), msg.payload); } }); }); unregisters.add(consumer::unregister); return () -> { - removeIndex(channel, index); - int newFlags = flags.compute(channel, (c, current) -> current != null ? current & ~(1 << index) : 0); + removeIndex(channel.name(), index); + int newFlags = flags.compute(channel.name(), (c, current) -> current != null ? current & ~(1 << index) : 0); consumer.unregister(); - log.debugf("Unregistered index %d on channel %s, new flags: %d", index, channel, Integer.valueOf(newFlags)); + log.debugf("Unregistered index %d on channel %s, new flags: %d", index, channel.name(), Integer.valueOf(newFlags)); }; } @@ -199,14 +199,14 @@ else if (entity instanceof TestDAO) return entity; } - private int registerIndex(String channel, String component) { + private int registerIndex(MessageBusChannels channel, String component) { Integer index; do { - index = tryRegisterIndex(channel, component); + index = tryRegisterIndex(channel.name(), component); Thread.yield(); } while (index == null); int finalIndex = index; - flags.compute(channel, (c, current) -> (current == null ? 0 : current) | (1 << finalIndex)); + flags.compute(channel.name(), (c, current) -> (current == null ? 0 : current) | (1 << finalIndex)); return index; } diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/bus/MessageBusChannels.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/bus/MessageBusChannels.java new file mode 100644 index 000000000..56dfaa397 --- /dev/null +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/bus/MessageBusChannels.java @@ -0,0 +1,24 @@ +package io.hyperfoil.tools.horreum.bus; + +public enum MessageBusChannels { + DATAPOINT_NEW, + DATAPOINT_DELETED, + DATAPOINT_PROCESSED, + DATASET_NEW, + DATASET_UPDATED_LABELS, + DATASET_MISSING_VALUES, + DATASET_DELETED, + DATASET_VALIDATED, + DATASET_CHANGES_NEW, + TEST_NEW, + TEST_DELETED, + RUN_NEW, + RUN_TRASHED, + RUN_VALIDATED, + CHANGE_NEW, + EXPERIMENT_RESULT_NEW, + FOOBAR + + + +} diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/BannerDAO.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/BannerDAO.java index 02fbe2c86..06c83da20 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/BannerDAO.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/BannerDAO.java @@ -2,7 +2,6 @@ import java.time.Instant; -import io.quarkus.hibernate.orm.panache.PanacheEntity; import io.quarkus.hibernate.orm.panache.PanacheEntityBase; import jakarta.persistence.Entity; import jakarta.persistence.GeneratedValue; diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/alerting/ChangeDAO.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/alerting/ChangeDAO.java index 1302f1cf5..0180b8d0a 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/alerting/ChangeDAO.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/alerting/ChangeDAO.java @@ -24,7 +24,6 @@ @Entity(name = "Change") @Table(name = "change") public class ChangeDAO extends PanacheEntityBase { - public static final String EVENT_NEW = "change/new"; @Id @GeneratedValue @@ -75,28 +74,4 @@ public static ChangeDAO fromDatapoint(DataPointDAO dp) { return change; } - public static class Event { - public ChangeDAO change; - public String testName; - public DataSetDAO.Info dataset; - public boolean notify; - - public Event() {} - - public Event(ChangeDAO change, String testName, DataSetDAO.Info dataset, boolean notify) { - this.change = change; - this.testName = testName; - this.dataset = dataset; - this.notify = notify; - } - - @Override - public String toString() { - return "Change.Event{" + - "change=" + change + - ", dataset=" + dataset + - ", notify=" + notify + - '}'; - } - } } diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/alerting/DataPointDAO.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/alerting/DataPointDAO.java index 585f17a11..cea46a57f 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/alerting/DataPointDAO.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/alerting/DataPointDAO.java @@ -22,9 +22,6 @@ @Entity(name = "DataPoint") @Table(name = "DataPoint") public class DataPointDAO extends PanacheEntityBase { - public static final String EVENT_NEW = "datapoint/new"; - public static final String EVENT_DELETED = "datapoint/deleted"; - public static final String EVENT_DATASET_PROCESSED = "datapoint/dataset_processed"; @Id @GeneratedValue(strategy = GenerationType.AUTO) @@ -64,38 +61,4 @@ public int getDatasetId() { return dataset.id; } - public static class Event { - public DataPointDAO dataPoint; - public int testId; - public boolean notify; - - public Event() { - } - - public Event(DataPointDAO dataPoint, int testId, boolean notify) { - this.dataPoint = dataPoint; - this.testId = testId; - this.notify = notify; - } - - @Override - public String toString() { - return "DataPoint.Event{" + - "dataPoint=" + dataPoint + - ", notify=" + notify + - '}'; - } - } - - public static class DatasetProcessedEvent { - public DataSetDAO.Info dataset; - public boolean notify; - - public DatasetProcessedEvent() {} - - public DatasetProcessedEvent(DataSetDAO.Info dataset, boolean notify) { - this.dataset = dataset; - this.notify = notify; - } - } } diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/data/ActionDAO.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/data/ActionDAO.java index 555109451..925df93bb 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/data/ActionDAO.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/data/ActionDAO.java @@ -12,8 +12,6 @@ import org.hibernate.id.enhanced.SequenceStyleGenerator; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.fasterxml.jackson.databind.node.ObjectNode; @Entity(name = "Action") public class ActionDAO extends PanacheEntityBase { @@ -73,21 +71,4 @@ public ActionDAO(Integer id, String event, String type, JsonNode config, JsonNod this.runAlways = runAlways; } - public void setSecrets(JsonNode secrets) { - this.secrets = secrets; - } - - // Had we called this simply `getSecrets` Quarkus would rewrite (??!!) some property - // accesses to use of that method - public JsonNode getMaskedSecrets() { - if (secrets != null && secrets.isObject()) { - ObjectNode masked = JsonNodeFactory.instance.objectNode(); - secrets.fieldNames().forEachRemaining(name -> { - masked.put(name, "********"); - }); - return masked; - } else { - return JsonNodeFactory.instance.objectNode(); - } - } } diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/data/DataSetDAO.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/data/DataSetDAO.java index 98689f0fe..e542f0bef 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/data/DataSetDAO.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/data/DataSetDAO.java @@ -28,8 +28,6 @@ import io.hyperfoil.tools.horreum.entity.ValidationErrorDAO; import io.smallrye.common.constraint.NotNull; -import org.hibernate.type.CustomType; -import org.hibernate.type.spi.TypeConfiguration; /** * Purpose of this object is to represent derived run data. @@ -37,11 +35,6 @@ @Entity(name="dataset") @JsonIgnoreType public class DataSetDAO extends OwnedEntityBase { - public static final String EVENT_NEW = "dataset/new"; - public static final String EVENT_LABELS_UPDATED = "dataset/updatedlabels"; - public static final String EVENT_MISSING_VALUES = "dataset/missing_values"; - public static final String EVENT_DELETED = "dataset/deleted"; - public static final String EVENT_VALIDATED = "dataset/validated"; @Id @SequenceGenerator( @@ -128,21 +121,6 @@ public String toString() { } } - public static class LabelsUpdatedEvent { - public int testId; - public int datasetId; - public boolean isRecalculation; - - public LabelsUpdatedEvent() { - } - - public LabelsUpdatedEvent(int testId, int datasetId, boolean isRecalculation) { - this.testId = testId; - this.datasetId = datasetId; - this.isRecalculation = isRecalculation; - } - } - public DataSetDAO() {} public DataSetDAO(RunDAO run, int ordinal, String description, JsonNode data) { diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/data/RunDAO.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/data/RunDAO.java index 8cb4c0962..075c20b9f 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/data/RunDAO.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/data/RunDAO.java @@ -27,9 +27,6 @@ @DynamicUpdate // We don't want to trigger schema analysis when trashing the run @JsonIgnoreType public class RunDAO extends ProtectedBaseEntity { - public static final String EVENT_NEW = "run/new"; - public static final String EVENT_TRASHED = "run/trashed"; - public static final String EVENT_VALIDATED = "run/validated"; @Id @SequenceGenerator( diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/data/SchemaDAO.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/data/SchemaDAO.java index acf8d896c..0a0eb961c 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/data/SchemaDAO.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/data/SchemaDAO.java @@ -1,9 +1,6 @@ package io.hyperfoil.tools.horreum.entity.data; -import java.util.Collection; - import com.fasterxml.jackson.annotation.JsonIgnoreType; -import io.hyperfoil.tools.horreum.entity.ValidationErrorDAO; import io.hyperfoil.tools.horreum.hibernate.JsonBinaryType; import org.hibernate.annotations.GenericGenerator; @@ -91,13 +88,4 @@ public class SchemaDAO extends ProtectedBaseEntity { @Column(columnDefinition = "jsonb") public JsonNode schema; - public static class ValidationEvent { - public int id; // context = run/dataset depends on event name - public Collection errors; - - public ValidationEvent(int id, Collection errors) { - this.id = id; - this.errors = errors; - } - } } diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/data/TestDAO.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/data/TestDAO.java index ae05a81ed..655807c33 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/data/TestDAO.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/entity/data/TestDAO.java @@ -33,9 +33,6 @@ @Entity(name="test") @JsonIgnoreType public class TestDAO extends PanacheEntityBase { - public static final String EVENT_NEW = "test/new"; - public static final String EVENT_DELETED = "test/deleted"; - @Id @GenericGenerator( name = "testIdGenerator", diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/ActionServiceImpl.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/ActionServiceImpl.java index 1b301b5eb..d031ff099 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/ActionServiceImpl.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/ActionServiceImpl.java @@ -3,6 +3,7 @@ import io.hyperfoil.tools.horreum.api.data.Action; import io.hyperfoil.tools.horreum.api.data.AllowedSite; import io.hyperfoil.tools.horreum.api.data.Run; +import io.hyperfoil.tools.horreum.bus.MessageBusChannels; import io.hyperfoil.tools.horreum.entity.alerting.ChangeDAO; import io.hyperfoil.tools.horreum.api.alerting.Change; import io.hyperfoil.tools.horreum.entity.data.*; @@ -73,17 +74,17 @@ public class ActionServiceImpl implements ActionService { @PostConstruct() public void postConstruct(){ plugins = actionPlugins.stream().collect(Collectors.toMap(ActionPlugin::type, Function.identity())); - messageBus.subscribe(TestDAO.EVENT_NEW, "ActionService", TestDAO.class, this::onNewTest); - messageBus.subscribe(TestDAO.EVENT_DELETED, "ActionService", TestDAO.class, this::onTestDelete); - messageBus.subscribe(RunDAO.EVENT_NEW, "ActionService", Run.class, this::onNewRun); - messageBus.subscribe(Change.EVENT_NEW, "ActionService", Change.Event.class, this::onNewChange); - messageBus.subscribe(ExperimentService.ExperimentResult.NEW_RESULT, "ActionService", ExperimentService.ExperimentResult.class, this::onNewExperimentResult); + messageBus.subscribe(MessageBusChannels.TEST_NEW, "ActionService", TestDAO.class, this::onNewTest); + messageBus.subscribe(MessageBusChannels.TEST_DELETED, "ActionService", TestDAO.class, this::onTestDelete); + messageBus.subscribe(MessageBusChannels.RUN_NEW, "ActionService", Run.class, this::onNewRun); + messageBus.subscribe(MessageBusChannels.CHANGE_NEW, "ActionService", Change.Event.class, this::onNewChange); + messageBus.subscribe(MessageBusChannels.EXPERIMENT_RESULT_NEW, "ActionService", ExperimentService.ExperimentResult.class, this::onNewExperimentResult); } - private void executeActions(String event, int testId, Object payload, boolean notify){ - List actions = getActions(event, testId); + private void executeActions(MessageBusChannels event, int testId, Object payload, boolean notify){ + List actions = getActions(event.name(), testId); if (actions.isEmpty()) { - new ActionLogDAO(PersistentLog.DEBUG, testId, event, null, "No actions found.").persist(); + new ActionLogDAO(PersistentLog.DEBUG, testId, event.name(), null, "No actions found.").persist(); return; } for (ActionDAO action : actions) { @@ -95,15 +96,15 @@ private void executeActions(String event, int testId, Object payload, boolean no ActionPlugin plugin = plugins.get(action.type); if (plugin == null) { log.errorf("No plugin for action type %s", action.type); - new ActionLogDAO(PersistentLog.ERROR, testId, event, action.type, "No plugin for action type " + action.type).persist(); + new ActionLogDAO(PersistentLog.ERROR, testId, event.name(), action.type, "No plugin for action type " + action.type).persist(); continue; } plugin.execute(action.config, action.secrets, payload).subscribe() - .with(item -> {}, throwable -> logActionError(testId, event, action.type, throwable)); + .with(item -> {}, throwable -> logActionError(testId, event.name(), action.type, throwable)); } catch (Exception e) { log.errorf(e, "Failed to invoke action %d", action.id); - new ActionLogDAO(PersistentLog.ERROR, testId, event, action.type, "Failed to invoke: " + e.getMessage()).persist(); - new ActionLogDAO(PersistentLog.DEBUG, testId, event, action.type, + new ActionLogDAO(PersistentLog.ERROR, testId, event.name(), action.type, "Failed to invoke: " + e.getMessage()).persist(); + new ActionLogDAO(PersistentLog.DEBUG, testId, event.name(), action.type, "Configuration:
\n" + action.config.toPrettyString() +
                   "\n
Payload:
\n" + Util.OBJECT_MAPPER.valueToTree(payload).toPrettyString() +
                   "\n
").persist(); @@ -132,7 +133,7 @@ void doLogActionError(int testId, String event, String type, Throwable throwable @WithRoles(extras = Roles.HORREUM_SYSTEM) @Transactional public void onNewTest(TestDAO test) { - executeActions(TestDAO.EVENT_NEW, -1, test, true); + executeActions(MessageBusChannels.TEST_NEW, -1, test, true); } @WithRoles(extras = Roles.HORREUM_SYSTEM) @@ -145,7 +146,7 @@ public void onTestDelete(TestDAO test) { @Transactional public void onNewRun(Run run) { Integer testId = run.testid; - executeActions(RunDAO.EVENT_NEW, testId, run, true); + executeActions(MessageBusChannels.RUN_NEW, testId, run, true); } @WithRoles(extras = Roles.HORREUM_SYSTEM) @@ -153,7 +154,7 @@ public void onNewRun(Run run) { public void onNewChange(Change.Event changeEvent) { int testId = em.createQuery("SELECT testid FROM run WHERE id = ?1", Integer.class) .setParameter(1, changeEvent.dataset.runId).getResultStream().findFirst().orElse(-1); - executeActions(Change.EVENT_NEW, testId, changeEvent, changeEvent.notify); + executeActions(MessageBusChannels.CHANGE_NEW, testId, changeEvent, changeEvent.notify); } void validate(Action action) { @@ -286,7 +287,7 @@ public void deleteSite(long id) { @WithRoles(extras = Roles.HORREUM_SYSTEM) @Transactional public void onNewExperimentResult(ExperimentService.ExperimentResult result) { - executeActions(ExperimentService.ExperimentResult.NEW_RESULT, result.profile.testId, result, result.notify); + executeActions(MessageBusChannels.EXPERIMENT_RESULT_NEW, result.profile.testId, result, result.notify); } JsonNode exportTest(int testId) { diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/AlertingServiceImpl.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/AlertingServiceImpl.java index 30c136bb8..8e8f7e6c3 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/AlertingServiceImpl.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/AlertingServiceImpl.java @@ -1,7 +1,5 @@ package io.hyperfoil.tools.horreum.svc; -import java.math.BigInteger; -import java.sql.Timestamp; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; @@ -28,6 +26,7 @@ import io.hyperfoil.tools.horreum.api.data.DataSet; import io.hyperfoil.tools.horreum.api.data.Run; +import io.hyperfoil.tools.horreum.bus.MessageBusChannels; import io.hyperfoil.tools.horreum.hibernate.IntArrayType; import io.hyperfoil.tools.horreum.hibernate.JsonBinaryType; import jakarta.annotation.PostConstruct; @@ -58,7 +57,6 @@ import io.hyperfoil.tools.horreum.changedetection.RelativeDifferenceChangeDetectionModel; import io.hyperfoil.tools.horreum.entity.data.DataSetDAO; -import io.hyperfoil.tools.horreum.entity.data.RunDAO; import io.hyperfoil.tools.horreum.entity.data.TestDAO; import io.hyperfoil.tools.horreum.mapper.*; import org.eclipse.microprofile.config.inject.ConfigProperty; @@ -66,9 +64,7 @@ import org.hibernate.query.NativeQuery; import org.hibernate.transform.AliasToBeanResultTransformer; import org.hibernate.transform.Transformers; -import org.hibernate.type.CustomType; import org.hibernate.type.StandardBasicTypes; -import org.hibernate.type.spi.TypeConfiguration; import org.jboss.logging.Logger; import com.fasterxml.jackson.core.JsonProcessingException; @@ -273,11 +269,11 @@ private void createMissingDataRuleResult(DataSetDAO dataset, int ruleId) { @PostConstruct void init() { - messageBus.subscribe(DataSetDAO.EVENT_LABELS_UPDATED, "AlertingService", DataSet.LabelsUpdatedEvent.class, this::onLabelsUpdated); - messageBus.subscribe(DataSetDAO.EVENT_DELETED, "AlertingService", DataSet.Info.class, this::onDatasetDeleted); - messageBus.subscribe(DataPointDAO.EVENT_NEW, "AlertingService", DataPoint.Event.class, this::onNewDataPoint); - messageBus.subscribe(RunDAO.EVENT_NEW, "AlertingService", Run.class, this::removeExpected); - messageBus.subscribe(TestDAO.EVENT_DELETED, "AlertingService", TestDAO.class, this::onTestDeleted); + messageBus.subscribe(MessageBusChannels.DATASET_UPDATED_LABELS, "AlertingService", DataSet.LabelsUpdatedEvent.class, this::onLabelsUpdated); + messageBus.subscribe(MessageBusChannels.DATASET_DELETED, "AlertingService", DataSet.Info.class, this::onDatasetDeleted); + messageBus.subscribe(MessageBusChannels.DATAPOINT_NEW, "AlertingService", DataPoint.Event.class, this::onNewDataPoint); + messageBus.subscribe(MessageBusChannels.RUN_NEW, "AlertingService", Run.class, this::removeExpected); + messageBus.subscribe(MessageBusChannels.TEST_DELETED, "AlertingService", TestDAO.class, this::onTestDeleted); } private void recalculateDatapointsForDataset(DataSetDAO dataset, boolean notify, boolean debug, Recalculation recalculation) { @@ -490,9 +486,9 @@ private void emitDatapoints(DataSetDAO dataset, boolean notify, boolean debug, R output -> logCalculationMessage(dataset, PersistentLog.DEBUG, "Output while calculating variable:
%s
", output) ); if (!missingValueVariables.isEmpty()) { - messageBus.publish(DataSetDAO.EVENT_MISSING_VALUES, dataset.testid, new MissingValuesEvent(dataset.getInfo(), missingValueVariables, notify)); + messageBus.publish(MessageBusChannels.DATASET_MISSING_VALUES, dataset.testid, new MissingValuesEvent(dataset.getInfo(), missingValueVariables, notify)); } - messageBus.publish(DataPointDAO.EVENT_DATASET_PROCESSED, dataset.testid, new DataPoint.DatasetProcessedEvent( DataSetMapper.fromInfo( dataset.getInfo()), notify)); + messageBus.publish(MessageBusChannels.DATAPOINT_PROCESSED, dataset.testid, new DataPoint.DatasetProcessedEvent( DataSetMapper.fromInfo( dataset.getInfo()), notify)); } private void createDataPoint(DataSetDAO dataset, Instant timestamp, int variableId, double value, boolean notify) { @@ -502,7 +498,7 @@ private void createDataPoint(DataSetDAO dataset, Instant timestamp, int variable dataPoint.timestamp = timestamp; dataPoint.value = value; dataPoint.persist(); - messageBus.publish(DataPointDAO.EVENT_NEW, dataset.testid, + messageBus.publish(MessageBusChannels.DATAPOINT_NEW, dataset.testid, new DataPoint.Event(DataPointMapper.from( dataPoint), dataset.testid, notify)); } @@ -656,7 +652,7 @@ private void runChangeDetection(VariableDAO variable, JsonNode fingerprint, bool em.persist(change); Hibernate.initialize(change.dataset.run.id); String testName = TestDAO.findByIdOptional(variable.testId).map(test -> test.name).orElse(""); - messageBus.publish(Change.EVENT_NEW, change.dataset.testid, + messageBus.publish(MessageBusChannels.CHANGE_NEW, change.dataset.testid, new Change.Event(ChangeMapper.from(change), testName, DataSetMapper.fromInfo(info), notify)); }); } @@ -1225,7 +1221,7 @@ public void removeExpected(Run run) { void onDatasetDeleted(DataSet.Info info) { log.debugf("Removing datasets and changes for dataset %d (%d/%d, test %d)", info.id, info.runId, info.ordinal, info.testId); for (DataPointDAO dp : DataPointDAO.list("dataset.id", info.id)) { - messageBus.publish(DataPointDAO.EVENT_DELETED, info.testId, + messageBus.publish(MessageBusChannels.DATAPOINT_DELETED, info.testId, new DataPoint.Event(DataPointMapper.from(dp), info.testId, false)); dp.delete(); } diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/DatasetServiceImpl.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/DatasetServiceImpl.java index 51c4fb90d..8ca17abec 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/DatasetServiceImpl.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/DatasetServiceImpl.java @@ -6,6 +6,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import io.hyperfoil.tools.horreum.bus.MessageBusChannels; import io.hyperfoil.tools.horreum.hibernate.JsonBinaryType; import jakarta.annotation.PostConstruct; import jakarta.annotation.security.PermitAll; @@ -158,7 +159,7 @@ public Object transformTuple(Object[] tuple, String[] aliases) { @PostConstruct void init() { sqlService.registerListener("calculate_labels", this::onLabelChanged); - messageBus.subscribe(DataSetDAO.EVENT_NEW, "DatasetService", DataSet.EventNew.class, this::onNewDataset); + messageBus.subscribe(MessageBusChannels.DATASET_NEW, "DatasetService", DataSet.EventNew.class, this::onNewDataset); } @PermitAll @@ -440,7 +441,7 @@ void calculateLabels(int testId, int datasetId, int queryLabelId, boolean isReca (row, e, jsCode) -> logMessage(datasetId, PersistentLog.ERROR, "Evaluation of label %s failed: '%s' Code:
%s
", row[0], e.getMessage(), jsCode), out -> logMessage(datasetId, PersistentLog.DEBUG, "Output while calculating labels:
%s
", out)); - messageBus.publish(DataSetDAO.EVENT_LABELS_UPDATED, testId, new DataSet.LabelsUpdatedEvent(testId, datasetId, isRecalculation)); + messageBus.publish(MessageBusChannels.DATASET_UPDATED_LABELS, testId, new DataSet.LabelsUpdatedEvent(testId, datasetId, isRecalculation)); } @WithRoles(extras = Roles.HORREUM_SYSTEM) diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/EventAggregator.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/EventAggregator.java index 73b29d111..d3fdf5525 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/EventAggregator.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/EventAggregator.java @@ -3,6 +3,7 @@ import java.util.HashMap; import java.util.Map; +import io.hyperfoil.tools.horreum.bus.MessageBusChannels; import jakarta.annotation.PostConstruct; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -32,7 +33,7 @@ public class EventAggregator { @PostConstruct void init() { - messageBus.subscribe(Change.EVENT_NEW, "EventAggregator", Change.Event.class, this::onNewChange); + messageBus.subscribe(MessageBusChannels.CHANGE_NEW, "EventAggregator", Change.Event.class, this::onNewChange); } @WithRoles(extras = Roles.HORREUM_SYSTEM) @@ -53,7 +54,7 @@ void handleDatasetChanges() { if (next == null) { return; } else if (next.emitTimestamp() <= now) { - messageBus.publish(DatasetChanges.EVENT_NEW, next.dataset.testId, next); + messageBus.publish(MessageBusChannels.DATASET_CHANGES_NEW, next.dataset.testId, next); datasetChanges.remove(next.dataset.id); } else { if (timerId >= 0) { diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/ExperimentServiceImpl.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/ExperimentServiceImpl.java index 2da3c28d2..c3fad6d77 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/ExperimentServiceImpl.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/ExperimentServiceImpl.java @@ -11,6 +11,7 @@ import java.util.stream.Collectors; import io.hyperfoil.tools.horreum.api.alerting.DataPoint; +import io.hyperfoil.tools.horreum.bus.MessageBusChannels; import io.hyperfoil.tools.horreum.hibernate.JsonBinaryType; import jakarta.annotation.PostConstruct; import jakarta.annotation.security.PermitAll; @@ -68,8 +69,8 @@ public class ExperimentServiceImpl implements ExperimentService { @PostConstruct void init() { - messageBus.subscribe(DataPointDAO.EVENT_DATASET_PROCESSED, "ExperimentService", DataPoint.DatasetProcessedEvent.class, this::onDatapointsCreated); - messageBus.subscribe(TestDAO.EVENT_DELETED, "ExperimentService", TestDAO.class, this::onTestDeleted); + messageBus.subscribe(MessageBusChannels.DATAPOINT_PROCESSED, "ExperimentService", DataPoint.DatasetProcessedEvent.class, this::onDatapointsCreated); + messageBus.subscribe(MessageBusChannels.TEST_DELETED, "ExperimentService", TestDAO.class, this::onTestDeleted); } @WithRoles @@ -142,7 +143,7 @@ public List runExperiments(int datasetId) { @Transactional public void onDatapointsCreated(DataPoint.DatasetProcessedEvent event) { // TODO: experiments can use any datasets, including private ones, possibly leaking the information - runExperiments(event.dataset, result -> messageBus.publish(ExperimentResult.NEW_RESULT, event.dataset.testId, result), + runExperiments(event.dataset, result -> messageBus.publish(MessageBusChannels.EXPERIMENT_RESULT_NEW, event.dataset.testId, result), logs -> logs.forEach(log -> log.persist()), event.notify); } diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/LogServiceImpl.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/LogServiceImpl.java index 340e6d999..ca5f15754 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/LogServiceImpl.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/LogServiceImpl.java @@ -5,6 +5,7 @@ import java.util.List; import java.util.stream.Collectors; +import io.hyperfoil.tools.horreum.bus.MessageBusChannels; import jakarta.annotation.PostConstruct; import jakarta.annotation.security.RolesAllowed; import jakarta.enterprise.context.ApplicationScoped; @@ -51,7 +52,7 @@ public class LogServiceImpl implements LogService { @PostConstruct void init() { - messageBus.subscribe(TestDAO.EVENT_DELETED, "LogService", TestDAO.class, this::onTestDelete); + messageBus.subscribe(MessageBusChannels.TEST_DELETED, "LogService", TestDAO.class, this::onTestDelete); } private Integer withDefault(Integer value, Integer defValue) { diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/NotificationServiceImpl.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/NotificationServiceImpl.java index a0fe94b80..537b55b9b 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/NotificationServiceImpl.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/NotificationServiceImpl.java @@ -9,6 +9,7 @@ import java.util.function.Consumer; import java.util.stream.Collectors; +import io.hyperfoil.tools.horreum.bus.MessageBusChannels; import jakarta.annotation.PostConstruct; import jakarta.annotation.security.PermitAll; import jakarta.annotation.security.RolesAllowed; @@ -71,8 +72,8 @@ public class NotificationServiceImpl implements NotificationService { @PostConstruct public void init() { notificationPlugins.forEach(plugin -> plugins.put(plugin.method(), plugin)); - messageBus.subscribe(DatasetChanges.EVENT_NEW, "NotificationService", DatasetChanges.class, this::onNewChanges); - messageBus.subscribe(DataSetDAO.EVENT_MISSING_VALUES, "NotificationService", MissingValuesEvent.class, this::onMissingValues); + messageBus.subscribe(MessageBusChannels.DATASET_CHANGES_NEW, "NotificationService", DatasetChanges.class, this::onNewChanges); + messageBus.subscribe(MessageBusChannels.DATASET_MISSING_VALUES, "NotificationService", MissingValuesEvent.class, this::onMissingValues); } @WithRoles(extras = Roles.HORREUM_SYSTEM) diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/ReportServiceImpl.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/ReportServiceImpl.java index fd7a7a2c6..69b45f120 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/ReportServiceImpl.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/ReportServiceImpl.java @@ -1,7 +1,6 @@ package io.hyperfoil.tools.horreum.svc; import java.io.ByteArrayOutputStream; -import java.sql.Timestamp; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; @@ -14,6 +13,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import io.hyperfoil.tools.horreum.bus.MessageBusChannels; import io.hyperfoil.tools.horreum.hibernate.JsonBinaryType; import jakarta.annotation.PostConstruct; import jakarta.annotation.security.PermitAll; @@ -35,9 +35,7 @@ import org.graalvm.polyglot.Value; import org.hibernate.Hibernate; import org.hibernate.query.NativeQuery; -import org.hibernate.type.CustomType; import org.hibernate.type.StandardBasicTypes; -import org.hibernate.type.spi.TypeConfiguration; import org.jboss.logging.Logger; import com.fasterxml.jackson.core.JsonProcessingException; @@ -77,7 +75,7 @@ public class ReportServiceImpl implements ReportService { @PostConstruct void init() { - messageBus.subscribe(TestDAO.EVENT_DELETED, "ReportService", TestDAO.class, this::onTestDelete); + messageBus.subscribe(MessageBusChannels.TEST_DELETED, "ReportService", TestDAO.class, this::onTestDelete); } @PermitAll diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/RunServiceImpl.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/RunServiceImpl.java index def9e028c..29362f1a5 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/RunServiceImpl.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/RunServiceImpl.java @@ -14,7 +14,6 @@ import java.util.Objects; import java.util.TreeMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -23,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.hyperfoil.tools.horreum.api.data.DataSet; +import io.hyperfoil.tools.horreum.bus.MessageBusChannels; import io.hyperfoil.tools.horreum.hibernate.JsonBinaryType; import io.hyperfoil.tools.horreum.mapper.DataSetMapper; import io.hypersistence.utils.hibernate.query.MapResultTransformer; @@ -75,9 +75,7 @@ import org.hibernate.ScrollMode; import org.hibernate.ScrollableResults; import org.hibernate.query.NativeQuery; -import org.hibernate.type.CustomType; import org.hibernate.type.StandardBasicTypes; -import org.hibernate.type.spi.TypeConfiguration; import org.jboss.logging.Logger; import org.jboss.resteasy.reactive.multipart.FileUpload; @@ -140,7 +138,7 @@ public class RunServiceImpl implements RunService { void init() { sqlService.registerListener("calculate_datasets", this::onCalculateDataSets); sqlService.registerListener("new_or_updated_schema", this::onNewOrUpdatedSchema); - messageBus.subscribe(TestDAO.EVENT_DELETED, "RunService", TestDAO.class, this::onTestDeleted); + messageBus.subscribe(MessageBusChannels.TEST_DELETED, "RunService", TestDAO.class, this::onTestDeleted); } @Transactional @@ -442,7 +440,7 @@ public void waitForDatasets(int runId) { // wait for at least one (1) dataset. we do not know how many datasets will be produced CountDownLatch dsAvailableLatch = new CountDownLatch(1); // create new dataset listener - messageBus.subscribe(DataSetDAO.EVENT_NEW,"DatasetService", DataSet.EventNew.class, (event) -> { + messageBus.subscribe(MessageBusChannels.DATASET_NEW,"DatasetService", DataSet.EventNew.class, (event) -> { if (event.dataset.runId == runId) { dsAvailableLatch.countDown(); } @@ -595,7 +593,7 @@ private Integer addAuthenticated(RunDAO run, TestDAO test) { throw ServiceException.serverError("Failed to persist run"); } log.debugf("Upload flushed, run ID %d", run.id); - messageBus.publish(RunDAO.EVENT_NEW, test.id, RunMapper.from(run)); + messageBus.publish(MessageBusChannels.RUN_NEW, test.id, RunMapper.from(run)); return run.id; } @@ -906,10 +904,10 @@ private void trashInternal(int id, boolean trashed) { List datasets = DataSetDAO.list("run.id", id); log.debugf("Trashing run %d (test %d, %d datasets)", (long)run.id, (long)run.testid, datasets.size()); for (var dataset : datasets) { - messageBus.publish(DataSetDAO.EVENT_DELETED, run.testid, DataSetMapper.fromInfo( dataset.getInfo())); + messageBus.publish(MessageBusChannels.DATASET_DELETED, run.testid, DataSetMapper.fromInfo( dataset.getInfo())); dataset.delete(); } - messageBus.publish(RunDAO.EVENT_TRASHED, run.testid, id); + messageBus.publish(MessageBusChannels.RUN_TRASHED, run.testid, id); } else { transform(id, true); } @@ -1049,7 +1047,7 @@ int transform(int runId, boolean isRecalculation) { // We need to make sure all old datasets are gone before creating new; otherwise we could // break the runid,ordinal uniqueness constraint for (DataSetDAO old : DataSetDAO.list("run.id", runId)) { - messageBus.publish(DataSetDAO.EVENT_DELETED, old.testid, DataSetMapper.fromInfo( old.getInfo())); + messageBus.publish(MessageBusChannels.DATASET_DELETED, old.testid, DataSetMapper.fromInfo( old.getInfo())); old.delete(); } @@ -1245,7 +1243,7 @@ private String limitLength(String str) { private void createDataset(DataSetDAO ds, boolean isRecalculation) { try { ds.persist(); - messageBus.publish(DataSetDAO.EVENT_NEW, ds.testid, new DataSet.EventNew(DataSetMapper.from(ds), isRecalculation)); + messageBus.publish(MessageBusChannels.DATASET_NEW, ds.testid, new DataSet.EventNew(DataSetMapper.from(ds), isRecalculation)); } catch (TransactionRequiredException tre) { log.error("Failed attempt to persist and send DataSet event during inactive Transaction. Likely due to prior error.", tre); } diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/SchemaServiceImpl.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/SchemaServiceImpl.java index 04fac3473..cc031c7b8 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/SchemaServiceImpl.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/SchemaServiceImpl.java @@ -3,10 +3,10 @@ import com.networknt.schema.JsonMetaSchema; import io.hyperfoil.tools.horreum.api.data.*; import io.hyperfoil.tools.horreum.api.data.Extractor; +import io.hyperfoil.tools.horreum.bus.MessageBusChannels; import io.hyperfoil.tools.horreum.entity.data.*; import io.hyperfoil.tools.horreum.mapper.DataSetMapper; import io.hyperfoil.tools.horreum.mapper.LabelMapper; -import io.hyperfoil.tools.horreum.mapper.RunMapper; import io.hyperfoil.tools.horreum.mapper.SchemaMapper; import io.hyperfoil.tools.horreum.mapper.TransformerMapper; import io.hyperfoil.tools.horreum.api.services.SchemaService; @@ -68,7 +68,6 @@ import com.networknt.schema.uri.URIFactory; import com.networknt.schema.uri.URIFetcher; import com.networknt.schema.uri.URLFactory; -//import com.vladmihalcea.hibernate.type.json.JsonNodeBinaryType; @Startup public class SchemaServiceImpl implements SchemaService { @@ -288,7 +287,7 @@ void validateRunData(int runId, Predicate schemaFilter) { validateData(run.metadata, schemaFilter, run.validationErrors::add); } run.persist(); - messageBus.publish(RunDAO.EVENT_VALIDATED, run.testid, + messageBus.publish(MessageBusChannels.RUN_VALIDATED, run.testid, new Schema.ValidationEvent(run.id, run.validationErrors.stream().map(ValidationErrorMapper::fromValidationError).collect(Collectors.toList()) )); } @@ -321,7 +320,7 @@ void validateDatasetData(int datasetId, Predicate schemaFilter) { } } dataset.persist(); - messageBus.publish(DataSetDAO.EVENT_VALIDATED, dataset.testid, new Schema.ValidationEvent(dataset.id, DataSetMapper.from(dataset).validationErrors )); + messageBus.publish(MessageBusChannels.DATASET_VALIDATED, dataset.testid, new Schema.ValidationEvent(dataset.id, DataSetMapper.from(dataset).validationErrors )); } private void revalidateAll(String params) { diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/SubscriptionServiceImpl.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/SubscriptionServiceImpl.java index cec1caed4..bee73388c 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/SubscriptionServiceImpl.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/SubscriptionServiceImpl.java @@ -10,6 +10,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import io.hyperfoil.tools.horreum.bus.MessageBusChannels; import jakarta.annotation.PostConstruct; import jakarta.annotation.security.RolesAllowed; import jakarta.enterprise.context.ApplicationScoped; @@ -48,7 +49,7 @@ public class SubscriptionServiceImpl implements SubscriptionService { @PostConstruct void init() { - messageBus.subscribe(TestDAO.EVENT_DELETED, "SubscriptionService", TestDAO.class, this::onTestDelete); + messageBus.subscribe(MessageBusChannels.TEST_DELETED, "SubscriptionService", TestDAO.class, this::onTestDelete); } private static Set merge(Set set, String item) { diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/TestServiceImpl.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/TestServiceImpl.java index 9816ec436..5acaa8025 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/TestServiceImpl.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/TestServiceImpl.java @@ -2,6 +2,7 @@ import io.hyperfoil.tools.horreum.api.SortDirection; import io.hyperfoil.tools.horreum.api.data.*; +import io.hyperfoil.tools.horreum.bus.MessageBusChannels; import io.hyperfoil.tools.horreum.entity.data.*; import io.hyperfoil.tools.horreum.hibernate.JsonBinaryType; import io.hyperfoil.tools.horreum.mapper.ActionMapper; @@ -47,8 +48,6 @@ import org.hibernate.ScrollableResults; import org.hibernate.query.NativeQuery; import org.hibernate.transform.Transformers; -import org.hibernate.type.CustomType; -import org.hibernate.type.spi.TypeConfiguration; import org.jboss.logging.Logger; import com.fasterxml.jackson.core.JsonProcessingException; @@ -116,7 +115,7 @@ public void delete(int id){ } log.debugf("Deleting test %s (%d)", test.name, test.id); test.delete(); - messageBus.publish(TestDAO.EVENT_DELETED, test.id, test); + messageBus.publish(MessageBusChannels.TEST_DELETED, test.id, test); } @Override @@ -229,7 +228,7 @@ void addAuthenticated(TestDAO test) { throw new WebApplicationException(e, Response.serverError().build()); } } - messageBus.publish(TestDAO.EVENT_NEW, test.id, test); + messageBus.publish(MessageBusChannels.TEST_NEW, test.id, test); } } diff --git a/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/bus/MessageBusTest.java b/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/bus/MessageBusTest.java index 27472474a..de49cd34f 100644 --- a/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/bus/MessageBusTest.java +++ b/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/bus/MessageBusTest.java @@ -48,7 +48,7 @@ public void testRetry() throws InterruptedException { AtomicInteger counter = new AtomicInteger(); CountDownLatch firstLatch = new CountDownLatch(1); CountDownLatch secondLatch = new CountDownLatch(1); - messageBus.subscribe(CHANNEL, "test", String.class, str -> { + messageBus.subscribe(MessageBusChannels.FOOBAR, "test", String.class, str -> { if (counter.getAndIncrement() == 0) { firstLatch.countDown(); throw RETRY_EXCEPTION; @@ -56,7 +56,7 @@ public void testRetry() throws InterruptedException { secondLatch.countDown(); }); Util.withTx(tm, () -> { - messageBus.publish(CHANNEL, 1, "foo"); + messageBus.publish(MessageBusChannels.FOOBAR, 1, "foo"); return null; }); assertTrue(firstLatch.await(10, TimeUnit.SECONDS)); @@ -81,7 +81,7 @@ public void testRetryManyWithExceptions() throws InterruptedException, TimeoutEx AtomicInteger alive = new AtomicInteger(100); Phaser phaser = new Phaser(100); int currentPhase = phaser.getPhase(); - messageBus.subscribe(CHANNEL, "testMany", String.class, str -> { + messageBus.subscribe(MessageBusChannels.FOOBAR, "testMany", String.class, str -> { if (ThreadLocalRandom.current().nextBoolean()) { int value = alive.decrementAndGet(); log.debugf("Decreased to %d", value); @@ -93,7 +93,7 @@ public void testRetryManyWithExceptions() throws InterruptedException, TimeoutEx }); Util.withTx(tm, () -> { for (int i = 0; i < 100; ++i) { - messageBus.publish(CHANNEL, 1, "foo" + i); + messageBus.publish(MessageBusChannels.FOOBAR, 1, "foo" + i); } return null; }); diff --git a/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/ActionServiceTest.java b/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/ActionServiceTest.java index aebcda897..be9cfa84f 100644 --- a/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/ActionServiceTest.java +++ b/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/ActionServiceTest.java @@ -8,6 +8,7 @@ import java.util.Arrays; import io.hyperfoil.tools.horreum.api.data.Test; +import io.hyperfoil.tools.horreum.bus.MessageBusChannels; import io.hyperfoil.tools.horreum.test.HorreumTestProfile; import jakarta.ws.rs.core.HttpHeaders; import org.junit.jupiter.api.TestInfo; @@ -35,7 +36,7 @@ public void testFailingHttp(TestInfo testInfo) { Test test = createTest(createExampleTest(getTestName(testInfo))); addAllowedSite("http://some-non-existent-domain.com"); - addTestHttpAction(test, RunDAO.EVENT_NEW, "http://some-non-existent-domain.com"); + addTestHttpAction(test, MessageBusChannels.RUN_NEW, "http://some-non-existent-domain.com"); uploadRun(JsonNodeFactory.instance.objectNode(), test.name); @@ -49,14 +50,14 @@ public void testFailingHttp(TestInfo testInfo) { @org.junit.jupiter.api.Test public void testAddGlobalAction() { - String responseType = addGlobalAction(TestDAO.EVENT_NEW, "https://attacker.com") + String responseType = addGlobalAction(MessageBusChannels.TEST_NEW, "https://attacker.com") .then().statusCode(400).extract().header(HttpHeaders.CONTENT_TYPE); // constraint violations are mapped to 400 + JSON response, we want explicit error assertTrue(responseType.startsWith("text/plain")); // text/plain;charset=UTF-8 addAllowedSite("https://example.com"); - ActionDAO action = addGlobalAction(TestDAO.EVENT_NEW, "https://example.com/foo/bar").then().statusCode(200).extract().body().as(ActionDAO.class); + ActionDAO action = addGlobalAction(MessageBusChannels.TEST_NEW, "https://example.com/foo/bar").then().statusCode(200).extract().body().as(ActionDAO.class); assertNotNull(action.id); assertTrue(action.active); given().auth().oauth2(getAdminToken()).delete("/api/action/" + action.id); diff --git a/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/AlertingServiceTest.java b/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/AlertingServiceTest.java index f28c0af53..03cc7e38e 100644 --- a/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/AlertingServiceTest.java +++ b/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/AlertingServiceTest.java @@ -23,6 +23,7 @@ import io.hyperfoil.tools.horreum.api.alerting.DataPoint; import io.hyperfoil.tools.horreum.api.alerting.RunExpectation; import io.hyperfoil.tools.horreum.api.data.DataSet; +import io.hyperfoil.tools.horreum.bus.MessageBusChannels; import jakarta.inject.Inject; import io.hyperfoil.tools.horreum.api.alerting.Change; @@ -80,7 +81,7 @@ public void testNotifications(TestInfo info) throws InterruptedException { Schema schema = createExampleSchema(info); setTestVariables(test, "Value", "value"); - BlockingQueue dpe = eventConsumerQueue(DataPoint.Event.class, DataPointDAO.EVENT_NEW, e -> e.testId == test.id); + BlockingQueue dpe = eventConsumerQueue(DataPoint.Event.class, MessageBusChannels.DATAPOINT_NEW, e -> e.testId == test.id); uploadRun(runWithValue(42, schema).toString(), test.name); DataPoint.Event event1 = dpe.poll(10, TimeUnit.SECONDS); @@ -114,7 +115,7 @@ public void testLogging(TestInfo info) throws InterruptedException { ObjectNode runJson = JsonNodeFactory.instance.objectNode(); runJson.put("$schema", schema.uri); - BlockingQueue missingQueue = eventConsumerQueue(MissingValuesEvent.class, DataSetDAO.EVENT_MISSING_VALUES, e -> e.dataset.testId == test.id); + BlockingQueue missingQueue = eventConsumerQueue(MissingValuesEvent.class, MessageBusChannels.DATASET_MISSING_VALUES, e -> e.dataset.testId == test.id); missingQueue.drainTo(new ArrayList<>()); int runId = uploadRun(runJson, test.name); @@ -147,8 +148,8 @@ public void testChangeDetection(TestInfo info) throws InterruptedException { Schema schema = createExampleSchema(info); ChangeDetection cd = addChangeDetectionVariable(test); - BlockingQueue datapointQueue = eventConsumerQueue(DataPoint.Event.class, DataPointDAO.EVENT_NEW, e -> e.testId == test.id); - BlockingQueue changeQueue = eventConsumerQueue(Change.Event.class, Change.EVENT_NEW, e -> e.dataset.testId == test.id); + BlockingQueue datapointQueue = eventConsumerQueue(DataPoint.Event.class, MessageBusChannels.DATAPOINT_NEW, e -> e.testId == test.id); + BlockingQueue changeQueue = eventConsumerQueue(Change.Event.class, MessageBusChannels.CHANGE_NEW, e -> e.dataset.testId == test.id); long ts = System.currentTimeMillis(); uploadRun(ts, ts, runWithValue(1, schema), test.name); @@ -222,8 +223,8 @@ public void testChangeDetectionWithFingerprint(TestInfo info) throws Interrupted addChangeDetectionVariable(test); - BlockingQueue datapointQueue = eventConsumerQueue(DataPoint.Event.class, DataPointDAO.EVENT_NEW, e -> e.testId == testId); - BlockingQueue changeQueue = eventConsumerQueue(Change.Event.class, Change.EVENT_NEW, e -> e.dataset.testId == testId); + BlockingQueue datapointQueue = eventConsumerQueue(DataPoint.Event.class, MessageBusChannels.DATAPOINT_NEW, e -> e.testId == testId); + BlockingQueue changeQueue = eventConsumerQueue(Change.Event.class, MessageBusChannels.CHANGE_NEW, e -> e.dataset.testId == testId); long ts = System.currentTimeMillis(); for (int i = 0; i < 12; i += 3) { @@ -269,7 +270,7 @@ public void testFingerprintLabelsChange(TestInfo info) throws Exception { addLabel(schema, "bar", null, new Extractor("bar", "$.bar", false)); uploadRun(runWithValue(42, schema).put("foo", "aaa").put("bar", "bbb"), test.name); - BlockingQueue datapointQueue = eventConsumerQueue(DataPoint.Event.class, DataPointDAO.EVENT_NEW, e -> e.testId == testId); + BlockingQueue datapointQueue = eventConsumerQueue(DataPoint.Event.class, MessageBusChannels.DATAPOINT_NEW, e -> e.testId == testId); assertValue(datapointQueue, 42); List fingerprintsBefore = FingerprintDAO.listAll(); @@ -302,7 +303,7 @@ public void testFingerprintFilter(TestInfo info) throws Exception { addLabel(schema, "foo", null, new Extractor("foo", "$.foo", false)); addLabel(schema, "bar", null, new Extractor("bar", "$.bar", false)); - BlockingQueue datapointQueue = eventConsumerQueue(DataPoint.Event.class, DataPointDAO.EVENT_NEW, e -> e.testId == testId); + BlockingQueue datapointQueue = eventConsumerQueue(DataPoint.Event.class, MessageBusChannels.DATAPOINT_NEW, e -> e.testId == testId); uploadRun(runWithValue(1, schema).put("foo", "aaa").put("bar", "bbb"), test.name); assertValue(datapointQueue, 1); @@ -364,7 +365,7 @@ public void testMissingRules(TestInfo info) throws InterruptedException { int firstRuleId = addMissingDataRule(test, "my rule", jsonArray("value"), "value => value > 2", 10000); assertTrue(firstRuleId > 0); - BlockingQueue newDatasetQueue = eventConsumerQueue(DataSet.EventNew.class, DataSetDAO.EVENT_NEW, e -> e.dataset.testid.equals(test.id)); + BlockingQueue newDatasetQueue = eventConsumerQueue(DataSet.EventNew.class, MessageBusChannels.DATASET_NEW, e -> e.dataset.testid.equals(test.id)); long now = System.currentTimeMillis(); uploadRun(now - 20000, runWithValue(3, schema), test.name); DataSet.EventNew firstEvent = newDatasetQueue.poll(10, TimeUnit.SECONDS); @@ -549,8 +550,8 @@ public void testRecalculateDatasets(TestInfo info) throws InterruptedException { Schema schema = createExampleSchema(info); addChangeDetectionVariable(test); - BlockingQueue datapointQueue = eventConsumerQueue(DataPoint.Event.class, DataPointDAO.EVENT_NEW, e -> e.testId == test.id); - BlockingQueue datapointDeletedQueue = eventConsumerQueue(DataPoint.Event.class, DataPointDAO.EVENT_DELETED, e -> e.testId == test.id); + BlockingQueue datapointQueue = eventConsumerQueue(DataPoint.Event.class, MessageBusChannels.DATAPOINT_NEW, e -> e.testId == test.id); + BlockingQueue datapointDeletedQueue = eventConsumerQueue(DataPoint.Event.class, MessageBusChannels.DATAPOINT_DELETED, e -> e.testId == test.id); uploadRun(runWithValue(42, schema), test.name); DataPoint first = assertValue(datapointQueue, 42); @@ -606,8 +607,8 @@ public void testFixedThresholds(TestInfo info) throws InterruptedException { rd.config = config; setTestVariables(test, "Value", "value", rd); - BlockingQueue datapointQueue = eventConsumerQueue(DataPoint.Event.class, DataPointDAO.EVENT_NEW, e -> e.testId == test.id); - BlockingQueue changeQueue = eventConsumerQueue(Change.Event.class, Change.EVENT_NEW, e -> e.dataset.testId == test.id); + BlockingQueue datapointQueue = eventConsumerQueue(DataPoint.Event.class, MessageBusChannels.DATAPOINT_NEW, e -> e.testId == test.id); + BlockingQueue changeQueue = eventConsumerQueue(Change.Event.class, MessageBusChannels.CHANGE_NEW, e -> e.dataset.testId == test.id); long ts = System.currentTimeMillis(); uploadRun(ts, ts, runWithValue(4, schema), test.name); @@ -638,7 +639,7 @@ public void testCustomTimeline(TestInfo info) throws InterruptedException { addChangeDetectionVariable(test); setChangeDetectionTimeline(test, Collections.singletonList("timestamp"), null); - BlockingQueue datapointQueue = eventConsumerQueue(DataPoint.Event.class, DataPointDAO.EVENT_NEW, e -> e.testId == test.id); + BlockingQueue datapointQueue = eventConsumerQueue(DataPoint.Event.class, MessageBusChannels.DATAPOINT_NEW, e -> e.testId == test.id); long ts = System.currentTimeMillis(); uploadRun(ts, ts, runWithValue(1, schema).put("timestamp", 1662023776000L), test.name); @@ -688,7 +689,7 @@ public void testLabelsChange(TestInfo info) throws InterruptedException { LabelDAO l = LabelDAO.find("name", "value").firstResult(); Label label = LabelMapper.from(l); - BlockingQueue datapointQueue = eventConsumerQueue(DataPoint.Event.class, DataPointDAO.EVENT_NEW, e -> e.testId == test.id); + BlockingQueue datapointQueue = eventConsumerQueue(DataPoint.Event.class, MessageBusChannels.DATAPOINT_NEW, e -> e.testId == test.id); long ts = System.currentTimeMillis(); uploadRun(ts, ts, runWithValue(1, schema), test.name); @@ -713,8 +714,8 @@ public void testRandomOrder(TestInfo info) throws InterruptedException { addLabel(schema, "timestamp", null, new Extractor("ts", "$.timestamp", false)); setChangeDetectionTimeline(test, Collections.singletonList("timestamp"), null); - BlockingQueue datapointQueue = eventConsumerQueue(DataPoint.Event.class, DataPointDAO.EVENT_NEW, e -> e.testId == test.id); - BlockingQueue changeQueue = eventConsumerQueue(Change.Event.class, Change.EVENT_NEW, e -> e.dataset.testId == test.id); + BlockingQueue datapointQueue = eventConsumerQueue(DataPoint.Event.class, MessageBusChannels.DATAPOINT_NEW, e -> e.testId == test.id); + BlockingQueue changeQueue = eventConsumerQueue(Change.Event.class, MessageBusChannels.CHANGE_NEW, e -> e.dataset.testId == test.id); int[] order = new int[] { 5, 0, 1, 7, 4, 8, 2, 3, 9, 6 }; double[] values = new double[] { 1, 2, 2, 2, 1, 1, 2, 1, 1, 2}; diff --git a/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/BaseServiceTest.java b/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/BaseServiceTest.java index af7b1b540..426c87026 100644 --- a/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/BaseServiceTest.java +++ b/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/BaseServiceTest.java @@ -31,11 +31,11 @@ import io.hyperfoil.tools.horreum.api.services.AlertingService; import io.hyperfoil.tools.horreum.api.services.ExperimentService; import io.hyperfoil.tools.horreum.api.services.RunService; +import io.hyperfoil.tools.horreum.bus.MessageBusChannels; import io.hyperfoil.tools.horreum.hibernate.JsonBinaryType; import io.quarkus.arc.impl.ParameterizedTypeImpl; import jakarta.inject.Inject; import jakarta.persistence.EntityManager; -import jakarta.persistence.criteria.CriteriaBuilder; import jakarta.transaction.Status; import jakarta.transaction.TransactionManager; import jakarta.ws.rs.core.HttpHeaders; @@ -491,7 +491,7 @@ protected List variables(Integer testId) { .then().statusCode(200).extract().body().as(new ParameterizedTypeImpl(List.class, Variable.class)); } - protected BlockingQueue eventConsumerQueue(Class eventClass, String eventType, Predicate filter) { + protected BlockingQueue eventConsumerQueue(Class eventClass, MessageBusChannels eventType, Predicate filter) { BlockingQueue queue = new LinkedBlockingDeque<>(); AutoCloseable closeable = messageBus.subscribe(eventType, getClass().getName() + "_" + ThreadLocalRandom.current().nextLong(), eventClass, msg -> { if (eventClass.isInstance(msg)) { @@ -524,14 +524,14 @@ protected ArrayNode jsonArray(String... items) { } protected BlockingQueue trashRun(int runId) throws InterruptedException { - BlockingQueue trashedQueue = eventConsumerQueue(Integer.class, RunDAO.EVENT_TRASHED, r -> true); + BlockingQueue trashedQueue = eventConsumerQueue(Integer.class, MessageBusChannels.RUN_TRASHED, r -> true); jsonRequest().post("/api/run/" + runId + "/trash").then().statusCode(204); assertEquals(runId, trashedQueue.poll(10, TimeUnit.SECONDS)); return trashedQueue; } protected T withExampleDataset(Test test, JsonNode data, Function testLogic) { - BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, DataSetDAO.EVENT_NEW, e -> e.dataset.testid.equals(test.id)); + BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, MessageBusChannels.DATASET_NEW, e -> e.dataset.testid.equals(test.id)); try { RunDAO run = new RunDAO(); tm.begin(); @@ -655,18 +655,18 @@ protected void addAllowedSite(String prefix) { .body(prefix).post("/api/action/allowedSites").then().statusCode(200); } - protected Response addTestHttpAction(Test test, String event, String url) { + protected Response addTestHttpAction(Test test, MessageBusChannels event, String url) { Action action = new Action(); - action.event = event; + action.event = event.name(); action.type = HttpAction.TYPE_HTTP; action.active = true; action.config = JsonNodeFactory.instance.objectNode().put("url", url); return jsonRequest().body(action).post("/api/test/" + test.id + "/action"); } - protected Response addTestGithubIssueCommentAction(Test test, String event, String formatter, String owner, String repo, String issue, String secretToken) { + protected Response addTestGithubIssueCommentAction(Test test, MessageBusChannels event, String formatter, String owner, String repo, String issue, String secretToken) { Action action = new Action(); - action.event = event; + action.event = event.name(); action.type = GitHubIssueCommentAction.TYPE_GITHUB_ISSUE_COMMENT; action.active = true; action.config = JsonNodeFactory.instance.objectNode() @@ -678,9 +678,9 @@ protected Response addTestGithubIssueCommentAction(Test test, String event, Stri return jsonRequest().body(action).post("/api/test/" + test.id + "/action"); } - protected Response addGlobalAction(String event, String url) { + protected Response addGlobalAction(MessageBusChannels event, String url) { Action action = new Action(); - action.event = event; + action.event = event.name(); action.type = "http"; action.active = true; action.config = JsonNodeFactory.instance.objectNode().put("url", url); diff --git a/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/DatasetServiceTest.java b/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/DatasetServiceTest.java index c931f7e5b..cc1cc45c2 100644 --- a/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/DatasetServiceTest.java +++ b/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/DatasetServiceTest.java @@ -16,6 +16,7 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import io.hyperfoil.tools.horreum.bus.MessageBusChannels; import jakarta.inject.Inject; import com.fasterxml.jackson.databind.JsonNode; @@ -180,7 +181,7 @@ public void testDatasetLabelChanged() { int labelB = addLabel(schemas[1], "B", "v => v + 1", new Extractor("value", "$.value", false)); int labelC = addLabel(schemas[1], "C", null, new Extractor("value", "$.value", false)); Test test = createTest(createExampleTest("dummy")); - BlockingQueue updateQueue = eventConsumerQueue(DataSet.LabelsUpdatedEvent.class, DataSetDAO.EVENT_LABELS_UPDATED, e -> checkTestId(e.datasetId, test.id)); + BlockingQueue updateQueue = eventConsumerQueue(DataSet.LabelsUpdatedEvent.class, MessageBusChannels.DATASET_UPDATED_LABELS, e -> checkTestId(e.datasetId, test.id)); withExampleDataset(test, createABData(), ds -> { waitForUpdate(updateQueue, ds); List values = LabelDAO.Value.find("datasetId", ds.id).list(); @@ -210,7 +211,7 @@ public void testDatasetLabelChanged() { private List withLabelValues(ArrayNode data) { Test test = createTest(createExampleTest("dummy")); - BlockingQueue updateQueue = eventConsumerQueue(DataSet.LabelsUpdatedEvent.class, DataSetDAO.EVENT_LABELS_UPDATED, e -> checkTestId(e.datasetId, test.id)); + BlockingQueue updateQueue = eventConsumerQueue(DataSet.LabelsUpdatedEvent.class, MessageBusChannels.DATASET_UPDATED_LABELS, e -> checkTestId(e.datasetId, test.id)); return withExampleDataset(test, data, ds -> { waitForUpdate(updateQueue, ds); return LabelDAO.Value.find("datasetId", ds.id).list().stream().map(LabelMapper::fromValue).collect(Collectors.toList()); @@ -220,8 +221,8 @@ private List withLabelValues(ArrayNode data) { @org.junit.jupiter.api.Test public void testSchemaAfterData() throws InterruptedException { Test test = createTest(createExampleTest("xxx")); - BlockingQueue dsQueue = eventConsumerQueue(DataSet.EventNew.class, DataSetDAO.EVENT_NEW, e -> e.dataset.testid.equals(test.id)); - BlockingQueue labelQueue = eventConsumerQueue(DataSet.LabelsUpdatedEvent.class, DataSetDAO.EVENT_LABELS_UPDATED, e -> checkTestId(e.datasetId, test.id)); + BlockingQueue dsQueue = eventConsumerQueue(DataSet.EventNew.class, MessageBusChannels.DATASET_NEW, e -> e.dataset.testid.equals(test.id)); + BlockingQueue labelQueue = eventConsumerQueue(DataSet.LabelsUpdatedEvent.class, MessageBusChannels.DATASET_UPDATED_LABELS, e -> checkTestId(e.datasetId, test.id)); JsonNode data = JsonNodeFactory.instance.arrayNode() .add(JsonNodeFactory.instance.objectNode().put("$schema", "urn:another")) .add(JsonNodeFactory.instance.objectNode().put("$schema", "urn:foobar").put("value", 42)); @@ -291,7 +292,7 @@ public void testDatasetView() { int labelA = addLabel(schemas[0], "a", null, valuePath); int labelB = addLabel(schemas[1], "b", null, valuePath); // view update should happen in the same transaction as labels update so we can use the event - BlockingQueue updateQueue = eventConsumerQueue(DataSet.LabelsUpdatedEvent.class, DataSetDAO.EVENT_LABELS_UPDATED, e -> checkTestId(e.datasetId, test.id)); + BlockingQueue updateQueue = eventConsumerQueue(DataSet.LabelsUpdatedEvent.class, MessageBusChannels.DATASET_UPDATED_LABELS, e -> checkTestId(e.datasetId, test.id)); withExampleDataset(test, createABData(), ds -> { waitForUpdate(updateQueue, ds); JsonNode datasets = fetchDatasetsByTest(test.id); @@ -385,7 +386,7 @@ public void testEverythingPrivate() throws InterruptedException { test = createTest(test); int testId = test.id; - BlockingQueue updateQueue = eventConsumerQueue(DataSet.LabelsUpdatedEvent.class, DataSetDAO.EVENT_LABELS_UPDATED, e -> checkTestId(e.datasetId, testId)); + BlockingQueue updateQueue = eventConsumerQueue(DataSet.LabelsUpdatedEvent.class, MessageBusChannels.DATASET_UPDATED_LABELS, e -> checkTestId(e.datasetId, testId)); long timestamp = System.currentTimeMillis(); uploadRun(timestamp, timestamp, JsonNodeFactory.instance.objectNode().put("$schema", schema.uri).put("value", 42), diff --git a/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/ReportServiceTest.java b/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/ReportServiceTest.java index 675960663..ac4f628de 100644 --- a/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/ReportServiceTest.java +++ b/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/ReportServiceTest.java @@ -24,7 +24,7 @@ import io.hyperfoil.tools.horreum.api.data.Schema; import io.hyperfoil.tools.horreum.api.data.Test; import io.hyperfoil.tools.horreum.api.report.TableReportConfig; -import io.hyperfoil.tools.horreum.entity.data.DataSetDAO; +import io.hyperfoil.tools.horreum.bus.MessageBusChannels; import io.hyperfoil.tools.horreum.test.HorreumTestProfile; import io.hyperfoil.tools.horreum.test.PostgresResource; import io.quarkus.test.common.QuarkusTestResource; @@ -145,7 +145,7 @@ private TableReportConfig newExampleTableReportConfig(Test test) { } private void uploadExampleRuns(Test test) throws InterruptedException { - BlockingQueue queue = eventConsumerQueue(DataSet.LabelsUpdatedEvent.class, DataSetDAO.EVENT_LABELS_UPDATED, e -> checkTestId(e.datasetId, test.id)); + BlockingQueue queue = eventConsumerQueue(DataSet.LabelsUpdatedEvent.class, MessageBusChannels.DATASET_UPDATED_LABELS, e -> checkTestId(e.datasetId, test.id)); long ts = System.currentTimeMillis(); uploadRun(ts - 1, createRunData("production", "windows", "jvm", 1, 0.5, 150_000_000, 123) , test.name); @@ -212,7 +212,7 @@ public void testMissingValues() throws InterruptedException { Test test = createTest(createExampleTest("missing")); createComparisonSchema(); - BlockingQueue queue = eventConsumerQueue(DataSet.LabelsUpdatedEvent.class, DataSetDAO.EVENT_LABELS_UPDATED, e -> checkTestId(e.datasetId, test.id)); + BlockingQueue queue = eventConsumerQueue(DataSet.LabelsUpdatedEvent.class, MessageBusChannels.DATASET_UPDATED_LABELS, e -> checkTestId(e.datasetId, test.id)); int runId = uploadRun(JsonNodeFactory.instance.objectNode(), test.name); assertNotNull(queue.poll(10, TimeUnit.SECONDS)); diff --git a/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/RunServiceTest.java b/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/RunServiceTest.java index 4c98ba7ce..2dead9a2a 100644 --- a/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/RunServiceTest.java +++ b/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/RunServiceTest.java @@ -29,6 +29,7 @@ import io.hyperfoil.tools.horreum.api.services.DatasetService; import io.hyperfoil.tools.horreum.api.services.ExperimentService; import io.hyperfoil.tools.horreum.api.services.RunService; +import io.hyperfoil.tools.horreum.bus.MessageBusChannels; import jakarta.ws.rs.BadRequestException; import jakarta.ws.rs.core.HttpHeaders; @@ -66,7 +67,7 @@ public void testTransformationNoSchemaInData(TestInfo info) throws InterruptedEx Test exampleTest = createExampleTest(getTestName(info)); Test test = createTest(exampleTest); - BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, DataSetDAO.EVENT_NEW, e -> e.dataset.testid.equals(test.id)); + BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, MessageBusChannels.DATASET_NEW, e -> e.dataset.testid.equals(test.id)); Extractor path = new Extractor("foo", "$.value", false); Schema schema = createExampleSchema(info); @@ -82,7 +83,7 @@ public void testTransformationNoSchemaInData(TestInfo info) throws InterruptedEx @org.junit.jupiter.api.Test public void testTransformationWithoutSchema(TestInfo info) throws InterruptedException { Test test = createTest(createExampleTest(getTestName(info))); - BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, DataSetDAO.EVENT_NEW, e -> e.dataset.testid.equals(test.id)); + BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, MessageBusChannels.DATASET_NEW, e -> e.dataset.testid.equals(test.id)); Schema schema = createExampleSchema(info); @@ -122,7 +123,7 @@ private void assertNewDataset(BlockingQueue dataSetQueue, int @org.junit.jupiter.api.Test public void testTransformationWithoutSchemaInUpload(TestInfo info) throws InterruptedException { Test test = createTest(createExampleTest(getTestName(info))); - BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, DataSetDAO.EVENT_NEW, e -> e.dataset.testid.equals(test.id)); + BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, MessageBusChannels.DATASET_NEW, e -> e.dataset.testid.equals(test.id)); setTestVariables(test, "Value", "value"); @@ -138,7 +139,7 @@ public void testTransformationWithoutExtractorsAndBlankFunction(TestInfo info) t Test exampleTest = createExampleTest(getTestName(info)); Test test = createTest(exampleTest); - BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, DataSetDAO.EVENT_NEW, e -> e.dataset.testid.equals(test.id)); + BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, MessageBusChannels.DATASET_NEW, e -> e.dataset.testid.equals(test.id)); Schema schema = createExampleSchema(info); Transformer transformer = createTransformer("acme", schema, ""); @@ -159,7 +160,7 @@ public void testTransformationWithExtractorAndBlankFunction(TestInfo info) throw Test exampleTest = createExampleTest(getTestName(info)); Test test = createTest(exampleTest); - BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, DataSetDAO.EVENT_NEW, e -> e.dataset.testid.equals(test.id)); + BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, MessageBusChannels.DATASET_NEW, e -> e.dataset.testid.equals(test.id)); Schema schema = createExampleSchema("AcneCorp", "AcneInc", "AcneRrUs", false); Extractor path = new Extractor("foo", "$.value", false); @@ -189,7 +190,7 @@ public void testTransformationWithNestedSchema(TestInfo info) throws Interrupted Test test = createTest(exampleTest); addTransformer(test, acmeTransformer, roadRunnerTransformer); - BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, DataSetDAO.EVENT_NEW, e -> e.dataset.testid.equals(test.id)); + BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, MessageBusChannels.DATASET_NEW, e -> e.dataset.testid.equals(test.id)); String data = runWithValue(42.0d, acmeSchema, roadRunnerSchema).toString(); int runId = uploadRun(data, test.name); @@ -211,7 +212,7 @@ public void testTransformationSingleSchemaTestWithoutTransformer(TestInfo info) Test exampleTest = createExampleTest(getTestName(info)); Test test = createTest(exampleTest); - BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, DataSetDAO.EVENT_NEW, e -> e.dataset.testid.equals(test.id)); + BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, MessageBusChannels.DATASET_NEW, e -> e.dataset.testid.equals(test.id)); Schema acmeSchema = createExampleSchema("AceCorp", "AceInc", "AceRrUs", false); uploadRun(runWithValue(42.0d, acmeSchema), test.name); @@ -231,7 +232,7 @@ public void testTransformationSingleSchemaTestWithoutTransformer(TestInfo info) @org.junit.jupiter.api.Test public void testTransformationNestedSchemasWithoutTransformers(TestInfo info) throws InterruptedException { Test test = createTest(createExampleTest(getTestName(info))); - BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, DataSetDAO.EVENT_NEW, e -> e.dataset.testid.equals(test.id)); + BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, MessageBusChannels.DATASET_NEW, e -> e.dataset.testid.equals(test.id)); Schema schemaA = createExampleSchema("Ada", "Ada", "Ada", false); Schema schemaB = createExampleSchema("Bdb", "Bdb", "Bdb", false); Schema schemaC = createExampleSchema("Cdc", "Cdc", "Cdc", false); @@ -265,7 +266,7 @@ public void testTransformationUsingSameSchemaInBothLevelsTestWithoutTransformer( Test exampleTest = createExampleTest(getTestName(info)); Test test = createTest(exampleTest); - BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, DataSetDAO.EVENT_NEW, e -> e.dataset.testid.equals(test.id)); + BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, MessageBusChannels.DATASET_NEW, e -> e.dataset.testid.equals(test.id)); Schema appleSchema = createExampleSchema("AppleCorp", "AppleInc", "AppleRrUs", false); @@ -307,7 +308,7 @@ public void testTransformationUsingSingleSchemaTransformersProcessScalarPlusArra Test test = createTest(exampleTest); addTransformer(test, arrayTransformer, scalarTransformer); - BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, DataSetDAO.EVENT_NEW, e -> e.dataset.testid.equals(test.id)); + BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, MessageBusChannels.DATASET_NEW, e -> e.dataset.testid.equals(test.id)); ObjectNode data = runWithValue(42.0d, schema); @@ -339,7 +340,7 @@ public void testTransformationChoosingSchema(TestInfo info) throws InterruptedEx Test test = createTest(createExampleTest(getTestName(info))); addTransformer(test, transformerA, transformerB); - BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, DataSetDAO.EVENT_NEW, e -> e.dataset.testid.equals(test.id)); + BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, MessageBusChannels.DATASET_NEW, e -> e.dataset.testid.equals(test.id)); uploadRun(runWithValue(42, schemaB), test.name); DataSet.EventNew event = dataSetQueue.poll(POLL_DURATION_SECONDS, TimeUnit.SECONDS); @@ -374,7 +375,7 @@ public void testSchemaTransformerWithExtractorProducingNullValue(TestInfo info) Test test = createTest(exampleTest); addTransformer(test, scalarTransformer); - BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, DataSetDAO.EVENT_NEW, e -> e.dataset.testid.equals(test.id)); + BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, MessageBusChannels.DATASET_NEW, e -> e.dataset.testid.equals(test.id)); ObjectNode data = runWithValue(42.0d, schema); @@ -403,7 +404,7 @@ private void testTransformationWithoutMatch(TestInfo info, Schema schema, Object Test test = createTest(createExampleTest(getTestName(info))); addTransformer(test, transformerNoFunc, transformerFunc, transformerCombined); - BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, DataSetDAO.EVENT_NEW, e -> e.dataset.testid.equals(test.id)); + BlockingQueue dataSetQueue = eventConsumerQueue(DataSet.EventNew.class, MessageBusChannels.DATASET_NEW, e -> e.dataset.testid.equals(test.id)); uploadRun(data, test.name); DataSet.EventNew event = dataSetQueue.poll(POLL_DURATION_SECONDS, TimeUnit.SECONDS); @@ -578,7 +579,7 @@ public void testUploadWithMetadata() throws InterruptedException { metadata.add(simpleObject("urn:bar", "bar", "yyy")); metadata.add(simpleObject("urn:goo", "goo", "zzz")); - BlockingQueue dsQueue = eventConsumerQueue(DataSet.EventNew.class, DataSetDAO.EVENT_NEW, e -> e.dataset.testid == (int) test.id); + BlockingQueue dsQueue = eventConsumerQueue(DataSet.EventNew.class, MessageBusChannels.DATASET_NEW, e -> e.dataset.testid == (int) test.id); int run1 = uploadRun(now, data, metadata, test.name); diff --git a/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/SchemaServiceTest.java b/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/SchemaServiceTest.java index ca0afe954..0c0bdff56 100644 --- a/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/SchemaServiceTest.java +++ b/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/SchemaServiceTest.java @@ -1,13 +1,11 @@ package io.hyperfoil.tools.horreum.svc; -import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -21,6 +19,7 @@ import io.hyperfoil.tools.horreum.api.data.Label; import io.hyperfoil.tools.horreum.api.data.Transformer; import io.hyperfoil.tools.horreum.api.services.SchemaService; +import io.hyperfoil.tools.horreum.bus.MessageBusChannels; import io.hyperfoil.tools.horreum.test.HorreumTestProfile; import io.hyperfoil.tools.horreum.api.data.Extractor; import io.hyperfoil.tools.horreum.api.data.Schema; @@ -48,8 +47,8 @@ public void testValidateRun() throws IOException, InterruptedException { Schema allowNoneSchema = createSchema("none", allowNone.path("$id").asText(), allowNone); Test test = createTest(createExampleTest("schemaTest")); - BlockingQueue runValidations = eventConsumerQueue(Schema.ValidationEvent.class, RunDAO.EVENT_VALIDATED, e -> checkRunTestId(e.id, test.id)); - BlockingQueue datasetValidations = eventConsumerQueue(Schema.ValidationEvent.class, DataSetDAO.EVENT_VALIDATED, e -> checkTestId(e.id, test.id)); + BlockingQueue runValidations = eventConsumerQueue(Schema.ValidationEvent.class, MessageBusChannels.RUN_VALIDATED, e -> checkRunTestId(e.id, test.id)); + BlockingQueue datasetValidations = eventConsumerQueue(Schema.ValidationEvent.class, MessageBusChannels.DATASET_VALIDATED, e -> checkTestId(e.id, test.id)); ArrayNode data = JsonNodeFactory.instance.arrayNode(); data.addObject().put("$schema", allowAnySchema.uri).put("foo", "bar"); diff --git a/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/TestServiceTest.java b/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/TestServiceTest.java index 23a509102..772b16d81 100644 --- a/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/TestServiceTest.java +++ b/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/TestServiceTest.java @@ -15,6 +15,7 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import io.hyperfoil.tools.horreum.bus.MessageBusChannels; import io.hyperfoil.tools.horreum.hibernate.JsonBinaryType; import io.hyperfoil.tools.horreum.test.HorreumTestProfile; import io.hyperfoil.tools.horreum.api.alerting.Watch; @@ -24,16 +25,12 @@ import io.hyperfoil.tools.horreum.entity.alerting.*; import io.hyperfoil.tools.horreum.entity.data.*; import org.hibernate.query.NativeQuery; -import org.hibernate.type.CustomType; -import org.hibernate.type.StandardBasicTypes; -import org.hibernate.type.spi.TypeConfiguration; import org.junit.jupiter.api.TestInfo; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import io.hyperfoil.tools.horreum.action.ExperimentResultToMarkdown; -import io.hyperfoil.tools.horreum.api.services.ExperimentService; import io.hyperfoil.tools.horreum.api.services.TestService; import io.hyperfoil.tools.horreum.entity.ExperimentProfileDAO; import io.hyperfoil.tools.horreum.server.CloseMe; @@ -61,7 +58,7 @@ public void testCreateDelete(TestInfo info) throws InterruptedException { int runId = uploadRun("{ \"foo\" : \"bar\" }", test.name); deleteTest(test); - BlockingQueue events = eventConsumerQueue(Integer.class, RunDAO.EVENT_TRASHED, id -> id == runId); + BlockingQueue events = eventConsumerQueue(Integer.class, MessageBusChannels.RUN_TRASHED, id -> id == runId); assertNotNull(events.poll(10, TimeUnit.SECONDS)); em.clear(); @@ -81,7 +78,7 @@ public void testRecalculate(TestInfo info) throws InterruptedException { Test test = createTest(createExampleTest(getTestName(info))); Schema schema = createExampleSchema(info); - BlockingQueue newDatasetQueue = eventConsumerQueue(DataSet.EventNew.class, DataSetDAO.EVENT_NEW, e -> e.dataset.testid.equals(test.id)); + BlockingQueue newDatasetQueue = eventConsumerQueue(DataSet.EventNew.class, MessageBusChannels.DATASET_NEW, e -> e.dataset.testid.equals(test.id)); final int NUM_DATASETS = 5; for (int i = 0; i < NUM_DATASETS; ++i) { uploadRun(runWithValue(i, schema), test.name); @@ -118,11 +115,11 @@ public void testRecalculate(TestInfo info) throws InterruptedException { @org.junit.jupiter.api.Test public void testAddTestAction(TestInfo info) { Test test = createTest(createExampleTest(getTestName(info))); - addTestHttpAction(test, RunDAO.EVENT_NEW, "https://attacker.com").then().statusCode(400); + addTestHttpAction(test, MessageBusChannels.RUN_NEW, "https://attacker.com").then().statusCode(400); addAllowedSite("https://example.com"); - Action action = addTestHttpAction(test, RunDAO.EVENT_NEW, "https://example.com/foo/bar").then().statusCode(200).extract().body().as(Action.class); + Action action = addTestHttpAction(test, MessageBusChannels.RUN_NEW, "https://example.com/foo/bar").then().statusCode(200).extract().body().as(Action.class); assertNotNull(action.id); assertTrue(action.active); action.active = false; @@ -136,7 +133,7 @@ public void testUpdateView(TestInfo info) throws InterruptedException { Test test = createTest(createExampleTest(getTestName(info))); Schema schema = createExampleSchema(info); - BlockingQueue newDatasetQueue = eventConsumerQueue(DataSet.EventNew.class, DataSetDAO.EVENT_NEW, e -> e.dataset.testid.equals(test.id)); + BlockingQueue newDatasetQueue = eventConsumerQueue(DataSet.EventNew.class, MessageBusChannels.DATASET_NEW, e -> e.dataset.testid.equals(test.id)); uploadRun(runWithValue(42, schema), test.name); DataSet.EventNew event = newDatasetQueue.poll(10, TimeUnit.SECONDS); assertNotNull(event); @@ -172,7 +169,7 @@ public void testLabelValues(TestInfo info) throws InterruptedException { Test test = createTest(createExampleTest(getTestName(info))); Schema schema = createExampleSchema(info); - BlockingQueue newDatasetQueue = eventConsumerQueue(DataSet.LabelsUpdatedEvent.class, DataSetDAO.EVENT_LABELS_UPDATED, e -> checkTestId(e.datasetId, test.id)); + BlockingQueue newDatasetQueue = eventConsumerQueue(DataSet.LabelsUpdatedEvent.class, MessageBusChannels.DATASET_UPDATED_LABELS, e -> checkTestId(e.datasetId, test.id)); uploadRun(runWithValue(42, schema), test.name); uploadRun(JsonNodeFactory.instance.objectNode(), test.name); assertNotNull(newDatasetQueue.poll(10, TimeUnit.SECONDS)); @@ -213,8 +210,8 @@ private void testImportExport(boolean wipe) { view.components = Collections.singletonList(vc); updateView(test.id, view); - addTestHttpAction(test, RunDAO.EVENT_NEW, "http://example.com"); - addTestGithubIssueCommentAction(test, ExperimentService.ExperimentResult.NEW_RESULT, + addTestHttpAction(test, MessageBusChannels.RUN_NEW, "http://example.com"); + addTestGithubIssueCommentAction(test, MessageBusChannels.EXPERIMENT_RESULT_NEW, ExperimentResultToMarkdown.NAME, "hyperfoil", "horreum", "123", "super-secret-github-token"); addChangeDetectionVariable(test); From a57a35f024fa2e7ae2213780ebb03e085422a209 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A5le=20Pedersen?= Date: Thu, 14 Sep 2023 22:03:04 +0200 Subject: [PATCH 2/2] the path was changed to /test, need to change the param to query --- .../io/hyperfoil/tools/horreum/api/services/RunService.java | 2 +- .../java/io/hyperfoil/tools/horreum/svc/BaseServiceTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/services/RunService.java b/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/services/RunService.java index 88105a0f2..797aee41c 100644 --- a/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/services/RunService.java +++ b/horreum-api/src/main/java/io/hyperfoil/tools/horreum/api/services/RunService.java @@ -83,7 +83,7 @@ void updateAccess(@PathParam("id") int id, @POST @Path("test") @Consumes(MediaType.APPLICATION_JSON) - Response add(@PathParam("test") String testNameOrId, + Response add(@QueryParam("test") String testNameOrId, @QueryParam("owner") String owner, @QueryParam("access") Access access, @QueryParam("token") String token, diff --git a/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/BaseServiceTest.java b/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/BaseServiceTest.java index 426c87026..751f205c0 100644 --- a/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/BaseServiceTest.java +++ b/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/BaseServiceTest.java @@ -837,7 +837,7 @@ protected void populateDataFromFiles() throws IOException { .auth() .oauth2(getUploaderToken()) .body(r) - .post("/api/run/test/"); + .post("/api/run/test"); assertEquals(200, response.statusCode()); }