Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RunService api fix #641

Merged
merged 2 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.time.Instant;

public class Change {
public static final String EVENT_NEW = "change/new";
@JsonProperty( required = true )
public int id;
@JsonProperty( required = true )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
import java.time.Instant;

public class DataPoint {
public static final String EVENT_NEW = "datapoint/new";
public static final String EVENT_DELETED = "datapoint/deleted";
public static final String EVENT_DATASET_PROCESSED = "datapoint/dataset_processed";
public Integer id;
public Instant timestamp;
public double value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.util.Objects;

public class MissingDataRuleResult {
private Pk pk;
private Pk pk;
public Instant timestamp;
public MissingDataRule rule;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public Variable(Integer id, int testId, String name, String group, int order, Js
}

public String toString() {
return "VariableDTO{id=" + this.id + ", testId=" + this.testId + ", name='" + this.name + '\'' + ", group='" + this.group + '\'' + ", order=" + this.order + ", labels=" + this.labels + ", calculation='" + this.calculation + '\'' + ", changeDetection=" + this.changeDetection + '}';
return "Variable{id=" + this.id + ", testId=" + this.testId + ", name='" + this.name + '\'' + ", group='" + this.group + '\'' + ", order=" + this.order + ", labels=" + this.labels + ", calculation='" + this.calculation + '\'' + ", changeDetection=" + this.changeDetection + '}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ enum BetterOrWorse {

@Schema(name = "ExperimentResult")
class ExperimentResult {
public static final String NEW_RESULT = "experiment_result/new";

public ExperimentProfile profile;
public List<DatasetLog> logs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ void updateAccess(@PathParam("id") int id,
@POST
@Path("test")
@Consumes(MediaType.APPLICATION_JSON)
Response add(@PathParam("test") String testNameOrId,
Response add(@QueryParam("test") String testNameOrId,
@QueryParam("owner") String owner,
@QueryParam("access") Access access,
@QueryParam("token") String token,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,15 @@ void destroy() {
}

@Transactional(Transactional.TxType.MANDATORY)
public void publish(String channel, int testId, Object payload) {
public void publish(MessageBusChannels channel, int testId, Object payload) {
JsonNode json = Util.OBJECT_MAPPER.valueToTree(payload);
Integer componentFlag = flags.get(channel);
Integer componentFlag = flags.get(channel.name());
BigInteger id;
if (componentFlag != null && componentFlag != 0) {
try (CloseMe ignored = roleManager.withRoles(Collections.singleton(Roles.HORREUM_MESSAGEBUS))) {
id = (BigInteger) em.createNativeQuery("INSERT INTO messagebus (id, \"timestamp\", channel, testid, message, flags) VALUES (nextval('messagebus_seq'), NOW(), ?1, ?2, ?3, ?4) RETURNING id", BigInteger.class)
.unwrap(NativeQuery.class)
.setParameter(1, channel)
.setParameter(1, channel.name())
.setParameter(2, testId)
.setParameter(3, json, JsonBinaryType.INSTANCE)
.setParameter(4, componentFlag)
Expand All @@ -120,7 +120,7 @@ public void publish(String channel, int testId, Object payload) {
log.debugf("Publishing %d on test %d with flag %X on %s: %s", id.longValue(), testId, flag, channel, payload);
Util.doAfterCommitThrowing(tm, () -> {
log.debugf("Sending %d on test %d with flag %X to eventbus %s ", id.longValue(), testId, flag, channel);
eventBus.publish(channel, new Message(id.longValue(), testId, flag, payload));
eventBus.publish(channel.name(), new Message(id.longValue(), testId, flag, payload));
});
} catch (RollbackException e) {
log.debug("Not publishing the event as the transaction has been marked rollback-only");
Expand All @@ -129,8 +129,8 @@ public void publish(String channel, int testId, Object payload) {
}
}

public <T> AutoCloseable subscribe(String channel, String component, Class<T> payloadClass, Handler<T> handler) {
payloadClasses.compute(channel, (c, current) -> {
public <T> AutoCloseable subscribe(MessageBusChannels channel, String component, Class<T> payloadClass, Handler<T> handler) {
payloadClasses.compute(channel.name(), (c, current) -> {
if (current == null || current.isAssignableFrom(payloadClass)) {
return payloadClass;
} else if (payloadClass.isAssignableFrom(current)) {
Expand All @@ -141,14 +141,14 @@ public <T> AutoCloseable subscribe(String channel, String component, Class<T> pa
});
int index = registerIndex(channel, component);
log.debugf("Channel %s, component %s has index %d", channel, component, index);
MessageConsumer<Object> consumer = eventBus.consumer(channel, event -> {
MessageConsumer<Object> consumer = eventBus.consumer(channel.name(), event -> {
if (!(event.body() instanceof Message)) {
log.errorf("Not a message on %s: %s", channel, event.body());
log.errorf("Not a message on %s: %s", channel.name(), event.body());
return;
}
Message msg = (Message) event.body();
if ((msg.componentFlags & (1 << index)) == 0) {
log.debugf("%s ignoring message %d on %s with flags %X: doesn't match index %d", component, msg.id, channel, msg.componentFlags, index);
log.debugf("%s ignoring message %d on %s with flags %X: doesn't match index %d", component, msg.id, channel.name(), msg.componentFlags, index);
return;
}
executeForTest(msg.testId, () -> {
Expand All @@ -162,9 +162,9 @@ public <T> AutoCloseable subscribe(String channel, String component, Class<T> pa
// so we'll remove the record with a trigger
int updateCount = em.createNativeQuery("UPDATE messagebus SET flags = flags & ~(1 << ?1) WHERE id = ?2")
.setParameter(1, index).setParameter(2, msg.id).executeUpdate();
log.debugf("%s consumed %d on %s - %d records updated", component, msg.id, channel, updateCount);
log.debugf("%s consumed %d on %s - %d records updated", component, msg.id, channel.name(), updateCount);
} else {
log.debugf("Rolling back, %s cannot consume %d on %s", component, msg.id, channel);
log.debugf("Rolling back, %s cannot consume %d on %s", component, msg.id, channel.name());
}
} catch (SystemException e) {
log.error("Exception querying transaction status", e);
Expand All @@ -177,16 +177,16 @@ public <T> AutoCloseable subscribe(String channel, String component, Class<T> pa
return null;
});
} catch (Throwable t) {
errorReporter.reportException(t, ERROR_SUBJECT, "Exception in handler for message bus channel %s, message %s%n%n", channel, msg.payload);
errorReporter.reportException(t, ERROR_SUBJECT, "Exception in handler for message bus channel %s, message %s%n%n", channel.name(), msg.payload);
}
});
});
unregisters.add(consumer::unregister);
return () -> {
removeIndex(channel, index);
int newFlags = flags.compute(channel, (c, current) -> current != null ? current & ~(1 << index) : 0);
removeIndex(channel.name(), index);
int newFlags = flags.compute(channel.name(), (c, current) -> current != null ? current & ~(1 << index) : 0);
consumer.unregister();
log.debugf("Unregistered index %d on channel %s, new flags: %d", index, channel, Integer.valueOf(newFlags));
log.debugf("Unregistered index %d on channel %s, new flags: %d", index, channel.name(), Integer.valueOf(newFlags));
};
}

Expand All @@ -199,14 +199,14 @@ else if (entity instanceof TestDAO)
return entity;
}

private int registerIndex(String channel, String component) {
private int registerIndex(MessageBusChannels channel, String component) {
Integer index;
do {
index = tryRegisterIndex(channel, component);
index = tryRegisterIndex(channel.name(), component);
Thread.yield();
} while (index == null);
int finalIndex = index;
flags.compute(channel, (c, current) -> (current == null ? 0 : current) | (1 << finalIndex));
flags.compute(channel.name(), (c, current) -> (current == null ? 0 : current) | (1 << finalIndex));
return index;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.hyperfoil.tools.horreum.bus;

public enum MessageBusChannels {
DATAPOINT_NEW,
DATAPOINT_DELETED,
DATAPOINT_PROCESSED,
DATASET_NEW,
DATASET_UPDATED_LABELS,
DATASET_MISSING_VALUES,
DATASET_DELETED,
DATASET_VALIDATED,
DATASET_CHANGES_NEW,
TEST_NEW,
TEST_DELETED,
RUN_NEW,
RUN_TRASHED,
RUN_VALIDATED,
CHANGE_NEW,
EXPERIMENT_RESULT_NEW,
FOOBAR



}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.time.Instant;

import io.quarkus.hibernate.orm.panache.PanacheEntity;
import io.quarkus.hibernate.orm.panache.PanacheEntityBase;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
@Entity(name = "Change")
@Table(name = "change")
public class ChangeDAO extends PanacheEntityBase {
public static final String EVENT_NEW = "change/new";

@Id
@GeneratedValue
Expand Down Expand Up @@ -75,28 +74,4 @@ public static ChangeDAO fromDatapoint(DataPointDAO dp) {
return change;
}

public static class Event {
public ChangeDAO change;
public String testName;
public DataSetDAO.Info dataset;
public boolean notify;

public Event() {}

public Event(ChangeDAO change, String testName, DataSetDAO.Info dataset, boolean notify) {
this.change = change;
this.testName = testName;
this.dataset = dataset;
this.notify = notify;
}

@Override
public String toString() {
return "Change.Event{" +
"change=" + change +
", dataset=" + dataset +
", notify=" + notify +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
@Entity(name = "DataPoint")
@Table(name = "DataPoint")
public class DataPointDAO extends PanacheEntityBase {
public static final String EVENT_NEW = "datapoint/new";
public static final String EVENT_DELETED = "datapoint/deleted";
public static final String EVENT_DATASET_PROCESSED = "datapoint/dataset_processed";

@Id
@GeneratedValue(strategy = GenerationType.AUTO)
Expand Down Expand Up @@ -64,38 +61,4 @@ public int getDatasetId() {
return dataset.id;
}

public static class Event {
public DataPointDAO dataPoint;
public int testId;
public boolean notify;

public Event() {
}

public Event(DataPointDAO dataPoint, int testId, boolean notify) {
this.dataPoint = dataPoint;
this.testId = testId;
this.notify = notify;
}

@Override
public String toString() {
return "DataPoint.Event{" +
"dataPoint=" + dataPoint +
", notify=" + notify +
'}';
}
}

public static class DatasetProcessedEvent {
public DataSetDAO.Info dataset;
public boolean notify;

public DatasetProcessedEvent() {}

public DatasetProcessedEvent(DataSetDAO.Info dataset, boolean notify) {
this.dataset = dataset;
this.notify = notify;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import org.hibernate.id.enhanced.SequenceStyleGenerator;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;

@Entity(name = "Action")
public class ActionDAO extends PanacheEntityBase {
Expand Down Expand Up @@ -73,21 +71,4 @@ public ActionDAO(Integer id, String event, String type, JsonNode config, JsonNod
this.runAlways = runAlways;
}

public void setSecrets(JsonNode secrets) {
this.secrets = secrets;
}

// Had we called this simply `getSecrets` Quarkus would rewrite (??!!) some property
// accesses to use of that method
public JsonNode getMaskedSecrets() {
if (secrets != null && secrets.isObject()) {
ObjectNode masked = JsonNodeFactory.instance.objectNode();
secrets.fieldNames().forEachRemaining(name -> {
masked.put(name, "********");
});
return masked;
} else {
return JsonNodeFactory.instance.objectNode();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,13 @@

import io.hyperfoil.tools.horreum.entity.ValidationErrorDAO;
import io.smallrye.common.constraint.NotNull;
import org.hibernate.type.CustomType;
import org.hibernate.type.spi.TypeConfiguration;

/**
* Purpose of this object is to represent derived run data.
*/
@Entity(name="dataset")
@JsonIgnoreType
public class DataSetDAO extends OwnedEntityBase {
public static final String EVENT_NEW = "dataset/new";
public static final String EVENT_LABELS_UPDATED = "dataset/updatedlabels";
public static final String EVENT_MISSING_VALUES = "dataset/missing_values";
public static final String EVENT_DELETED = "dataset/deleted";
public static final String EVENT_VALIDATED = "dataset/validated";

@Id
@SequenceGenerator(
Expand Down Expand Up @@ -128,21 +121,6 @@ public String toString() {
}
}

public static class LabelsUpdatedEvent {
public int testId;
public int datasetId;
public boolean isRecalculation;

public LabelsUpdatedEvent() {
}

public LabelsUpdatedEvent(int testId, int datasetId, boolean isRecalculation) {
this.testId = testId;
this.datasetId = datasetId;
this.isRecalculation = isRecalculation;
}
}

public DataSetDAO() {}

public DataSetDAO(RunDAO run, int ordinal, String description, JsonNode data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
@DynamicUpdate // We don't want to trigger schema analysis when trashing the run
@JsonIgnoreType
public class RunDAO extends ProtectedBaseEntity {
public static final String EVENT_NEW = "run/new";
public static final String EVENT_TRASHED = "run/trashed";
public static final String EVENT_VALIDATED = "run/validated";

@Id
@SequenceGenerator(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package io.hyperfoil.tools.horreum.entity.data;

import java.util.Collection;

import com.fasterxml.jackson.annotation.JsonIgnoreType;
import io.hyperfoil.tools.horreum.entity.ValidationErrorDAO;

import io.hyperfoil.tools.horreum.hibernate.JsonBinaryType;
import org.hibernate.annotations.GenericGenerator;
Expand Down Expand Up @@ -91,13 +88,4 @@ public class SchemaDAO extends ProtectedBaseEntity {
@Column(columnDefinition = "jsonb")
public JsonNode schema;

public static class ValidationEvent {
public int id; // context = run/dataset depends on event name
public Collection<ValidationErrorDAO> errors;

public ValidationEvent(int id, Collection<ValidationErrorDAO> errors) {
this.id = id;
this.errors = errors;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@
@Entity(name="test")
@JsonIgnoreType
public class TestDAO extends PanacheEntityBase {
public static final String EVENT_NEW = "test/new";
public static final String EVENT_DELETED = "test/deleted";

@Id
@GenericGenerator(
name = "testIdGenerator",
Expand Down
Loading
Loading