Skip to content

Commit

Permalink
fix: revert SelfCleaningRegistry change, due to conceptual issues
Browse files Browse the repository at this point in the history
This reverts commit 5f98132.
  • Loading branch information
kristian committed Jun 29, 2023
1 parent 138b3fb commit e22979a
Show file tree
Hide file tree
Showing 31 changed files with 855 additions and 1,299 deletions.
37 changes: 15 additions & 22 deletions src/main/java/io/neonbee/NeonBee.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,13 @@
import io.neonbee.hook.HookRegistry;
import io.neonbee.hook.HookType;
import io.neonbee.hook.internal.DefaultHookRegistry;
import io.neonbee.internal.Registry;
import io.neonbee.internal.ReplyInboundInterceptor;
import io.neonbee.internal.SharedDataAccessor;
import io.neonbee.internal.WriteSafeRegistry;
import io.neonbee.internal.buffer.ImmutableBuffer;
import io.neonbee.internal.cluster.ClusterHelper;
import io.neonbee.internal.cluster.entity.ClusterEntityRegistry;
import io.neonbee.internal.codec.DataExceptionMessageCodec;
import io.neonbee.internal.codec.DataQueryMessageCodec;
import io.neonbee.internal.codec.EntityWrapperMessageCodec;
Expand All @@ -71,9 +74,6 @@
import io.neonbee.internal.json.ConfigurableJsonFactory.ConfigurableJsonCodec;
import io.neonbee.internal.json.ImmutableJsonArray;
import io.neonbee.internal.json.ImmutableJsonObject;
import io.neonbee.internal.registry.Registry;
import io.neonbee.internal.registry.SelfCleaningRegistry;
import io.neonbee.internal.registry.SelfCleaningRegistryHook;
import io.neonbee.internal.scanner.HookScanner;
import io.neonbee.internal.tracking.MessageDirection;
import io.neonbee.internal.tracking.TrackingDataHandlingStrategy;
Expand Down Expand Up @@ -292,15 +292,10 @@ static Future<NeonBee> create(Function<VertxOptions, Future<Vertx>> vertxFactory
configFuture = succeededFuture(config);
}

Future<HealthCheckRegistry> healthCheckRegistryFuture = HealthCheckRegistry.create(vertx);
Future<SelfCleaningRegistry<String>> entityRegistryFuture =
SelfCleaningRegistry.create(vertx, EntityVerticle.REGISTRY_NAME);

// create a NeonBee instance, hook registry and close handler
Future<NeonBee> neonBeeFuture = all(configFuture, healthCheckRegistryFuture, entityRegistryFuture)
.map(loadedConfig -> new NeonBee(vertx, options, configFuture.result(),
compositeMeterRegistry, healthCheckRegistryFuture.result(),
entityRegistryFuture.result()));
Future<NeonBee> neonBeeFuture = configFuture.map(loadedConfig -> {
return new NeonBee(vertx, options, loadedConfig, compositeMeterRegistry);
});
// boot NeonBee, on failure close Vert.x
return neonBeeFuture.compose(NeonBee::boot).recover(closeVertx).compose(unused -> neonBeeFuture);
} catch (Throwable t) {
Expand Down Expand Up @@ -387,10 +382,6 @@ Future<Void> registerHealthChecks() {

private Future<Void> registerHooks() {
if (options.shouldIgnoreClassPath()) {
if (vertx.isClustered()) {
// We still need the cleanup Hooks for SelfCleaningRegistry
return getHookRegistry().registerHooks(SelfCleaningRegistryHook.class, CORRELATION_ID).mapEmpty();
}
return succeededFuture();
}

Expand Down Expand Up @@ -612,16 +603,18 @@ private Future<Void> deployModules() {
}

@VisibleForTesting
NeonBee(Vertx vertx, NeonBeeOptions options, NeonBeeConfig config,
CompositeMeterRegistry compositeMeterRegistry, HealthCheckRegistry healthCheckRegistry,
Registry<String> entityRegistry) {
NeonBee(Vertx vertx, NeonBeeOptions options, NeonBeeConfig config, CompositeMeterRegistry compositeMeterRegistry) {
this.vertx = vertx;
this.options = options;
this.config = config;

this.healthRegistry = healthCheckRegistry;
this.healthRegistry = new HealthCheckRegistry(vertx);
this.modelManager = new EntityModelManager(this);
this.entityRegistry = entityRegistry;
if (vertx.isClustered()) {
this.entityRegistry = new ClusterEntityRegistry(vertx, EntityVerticle.REGISTRY_NAME);
} else {
this.entityRegistry = new WriteSafeRegistry<>(vertx, EntityVerticle.REGISTRY_NAME);
}

this.compositeMeterRegistry = compositeMeterRegistry;

Expand Down Expand Up @@ -781,9 +774,9 @@ public EntityModelManager getModelManager() {
}

/**
* The entity registry is used to store all available entity verticle addresses for a given entity type.
* Get the {@link WriteSafeRegistry} for {@link EntityVerticle}.
*
* @return the entity registry
* @return the entity verticle {@link WriteSafeRegistry}
*/
public Registry<String> getEntityRegistry() {
return entityRegistry;
Expand Down
10 changes: 2 additions & 8 deletions src/main/java/io/neonbee/data/DataVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -458,18 +458,12 @@ public void start(Promise<Void> promise) {
}

@Override
public void stop(Promise<Void> stopPromise) throws Exception {
public void stop() throws Exception {
NeonBee neonBee = NeonBee.get(vertx);
if (neonBee != null) { // NeonBee can be null, when the close hook has removed NeonBee - Vert.x mapping before
neonBee.unregisterLocalConsumer(getAddress());
}
Future.<Void>future(stopPromiseFromSuper -> {
try {
super.stop(stopPromiseFromSuper);
} catch (Exception e) {
stopPromiseFromSuper.fail(e);
}
}).onComplete(stopPromise);
super.stop();
}

/**
Expand Down
47 changes: 17 additions & 30 deletions src/main/java/io/neonbee/entity/EntityVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
import static io.neonbee.internal.verticle.ConsolidationVerticle.ENTITY_TYPE_NAME_HEADER;
import static io.vertx.core.Future.failedFuture;
import static io.vertx.core.Future.succeededFuture;
import static java.util.stream.Collectors.toList;

import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.apache.olingo.commons.api.edm.FullQualifiedName;
import org.apache.olingo.server.api.uri.UriInfo;
Expand All @@ -25,18 +25,20 @@
import io.neonbee.data.DataQuery;
import io.neonbee.data.DataRequest;
import io.neonbee.data.DataVerticle;
import io.neonbee.internal.Registry;
import io.neonbee.internal.WriteSafeRegistry;
import io.neonbee.internal.helper.AsyncHelper;
import io.neonbee.internal.registry.Registry;
import io.neonbee.internal.verticle.ConsolidationVerticle;
import io.neonbee.logging.LoggingFacade;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonArray;

public abstract class EntityVerticle extends DataVerticle<EntityWrapper> {

/**
* Name for the {@link NeonBee#getEntityRegistry()}.
* Name for the {@link WriteSafeRegistry}.
*/
public static final String REGISTRY_NAME = "EntityVerticleRegistry";

Expand Down Expand Up @@ -196,7 +198,12 @@ protected static Future<UriInfo> parseUriInfo(NeonBee neonBee, DataQuery query)
* @return A list of all (entity) verticle names as qualified names
*/
public static Future<List<String>> getVerticlesForEntityType(Vertx vertx, FullQualifiedName entityTypeName) {
return getRegistry(vertx).get(sharedEntityMapName(entityTypeName));
return Future
.future(asyncGet -> getRegistry(vertx).get(sharedEntityMapName(entityTypeName))
.onSuccess(asyncGet::complete).onFailure(asyncGet::fail))
.map(qualifiedNames -> ((List<?>) Optional.ofNullable((JsonArray) qualifiedNames)
.orElseGet(JsonArray::new).getList()).stream().map(Object::toString).distinct()
.collect(Collectors.toList()));
}

/**
Expand Down Expand Up @@ -246,35 +253,15 @@ public void start(Promise<Void> promise) {
* adding the EntityTypes to a shared map in a secure and cluster-wide thread safe manner.
*/
private Future<Void> announceEntityVerticle(Vertx vertx) {
return announceOrConcealEntities(entityType -> getRegistry(vertx).register(entityType, getQualifiedName()));
}

/**
* Conceals that this EntityVerticle is handling certain {@link #entityTypeNames()} to the rest of the cluster by
* removing the EntityTypes from a shared map in a secure and cluster-wide thread safe manner.
*/
private Future<Void> concealEntityVerticle() {
return announceOrConcealEntities(entityType -> getRegistry(vertx).unregister(entityType, getQualifiedName()));
}

@Override
public void stop(Promise<Void> stopPromise) throws Exception {
concealEntityVerticle().compose(v -> Future.<Void>future(stopPromiseFromSuper -> {
try {
super.stop(stopPromiseFromSuper);
} catch (Exception e) {
stopPromiseFromSuper.fail(e);
}
})).onComplete(stopPromise);
}

private Future<Void> announceOrConcealEntities(Function<String, Future<Void>> announceOrConceal) {
// in case this entity verticle does not listen to any entityTypeNames, do not add it to the shared map
return entityTypeNames()
.map(entityTypeNames -> entityTypeNames != null ? entityTypeNames : Set.<FullQualifiedName>of())
.compose(entityTypeNames -> {
List<Future<Void>> announceFutures =
entityTypeNames.stream().map(EntityVerticle::sharedEntityMapName)
.map(name -> announceOrConceal.apply(name)).collect(toList());
entityTypeNames.stream().map(EntityVerticle::sharedEntityMapName).map(name -> {
String qualifiedName = getQualifiedName();
return getRegistry(vertx).register(name, qualifiedName);
}).collect(Collectors.toList());
return Future.all(announceFutures).mapEmpty();
});
}
Expand Down
46 changes: 11 additions & 35 deletions src/main/java/io/neonbee/health/HealthCheckRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import io.neonbee.data.DataVerticle;
import io.neonbee.data.internal.DataContextImpl;
import io.neonbee.health.internal.HealthCheck;
import io.neonbee.internal.registry.Registry;
import io.neonbee.internal.registry.SelfCleaningRegistry;
import io.neonbee.internal.verticle.HealthCheckVerticle;
import io.neonbee.internal.WriteSafeRegistry;
import io.neonbee.logging.LoggingFacade;
import io.vertx.core.Future;
import io.vertx.core.Handler;
Expand All @@ -38,11 +36,8 @@
import io.vertx.ext.healthchecks.Status;

public class HealthCheckRegistry {

/**
* Name for the internally used registry.
*/
public static final String REGISTRY_NAME = HealthCheckRegistry.class.getSimpleName();
@VisibleForTesting
static final String REGISTRY_NAME = HealthCheckRegistry.class.getSimpleName();

private static final String UP = "UP";

Expand All @@ -61,31 +56,20 @@ public class HealthCheckRegistry {
@VisibleForTesting
final Map<String, HealthCheck> checks;

@VisibleForTesting
final Registry<String> healthVerticleRegistry;

@VisibleForTesting
HealthChecks healthChecks;

private final Vertx vertx;

/**
* Creates a new {@link HealthCheckRegistry}.
* Constructs a new instance of {@link HealthCheckRegistry}.
*
* @param vertx the related {@link Vertx} instance
* @return a succeeded or failed future depending on the success of the HealthCheckRegistry creation.
* @param vertx the current Vert.x instance
*/
public static Future<HealthCheckRegistry> create(Vertx vertx) {
return SelfCleaningRegistry.<String>create(vertx, REGISTRY_NAME)
.map(registry -> new HealthCheckRegistry(vertx, registry));
}

@VisibleForTesting
HealthCheckRegistry(Vertx vertx, Registry<String> healthVerticleRegistry) {
public HealthCheckRegistry(Vertx vertx) {
this.vertx = vertx;
checks = new HashMap<>();
healthChecks = HealthChecks.create(vertx);
this.healthVerticleRegistry = healthVerticleRegistry;
}

/**
Expand Down Expand Up @@ -113,16 +97,6 @@ public HealthCheck registerGlobalCheck(String id, long retentionTime,
return register(id, retentionTime, true, procedure, config);
}

/**
* Registers a HealthCheckVerticle to collect data from.
*
* @param verticle The related HealthCheckVerticle
* @return a succeeded or failed Future, depends on the success of the registration
*/
public Future<Void> registerVerticle(HealthCheckVerticle verticle) {
return healthVerticleRegistry.register(SHARED_MAP_KEY, verticle.getQualifiedName());
}

/**
* Registers a node health check, which checks node specific parameters like CPU or RAM.
*
Expand Down Expand Up @@ -228,7 +202,9 @@ Future<List<JsonObject>> getLocalHealthCheckResults() {
}

private Future<List<JsonObject>> getClusteredHealthCheckResults(DataContext dataContext) {
return healthVerticleRegistry.get(SHARED_MAP_KEY)
WriteSafeRegistry<String> registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME);
return registry.get(SHARED_MAP_KEY)
.map(qualifiedNames -> qualifiedNames == null ? new JsonArray() : qualifiedNames)
.compose(qualifiedNames -> {
List<Future<JsonArray>> asyncCheckResults = sendDataRequests(qualifiedNames, dataContext);

Expand All @@ -242,8 +218,8 @@ private Future<List<JsonObject>> getClusteredHealthCheckResults(DataContext data
});
}

private List<Future<JsonArray>> sendDataRequests(List<String> qualifiedNames, DataContext dataContext) {
return qualifiedNames.stream().map(DataRequest::new)
private List<Future<JsonArray>> sendDataRequests(JsonArray qualifiedNames, DataContext dataContext) {
return qualifiedNames.stream().map(Object::toString).map(DataRequest::new)
.map(dr -> DataVerticle.<JsonArray>requestData(vertx, dr, dataContext).onSuccess(data -> {
if (LOGGER.isDebugEnabled()) {
LOGGER.correlateWith(dataContext).debug("Retrieved health check of verticle {}",
Expand Down
36 changes: 36 additions & 0 deletions src/main/java/io/neonbee/internal/Registry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.neonbee.internal;

import io.vertx.core.Future;
import io.vertx.core.json.JsonArray;

/**
* Interface for an asynchronous registry implementation.
*/
public interface Registry<T> {

/**
* Register a value in the registry.
*
* @param key a key
* @param value the value to register
* @return the future
*/
Future<Void> register(String key, T value);

/**
* Unregister a value.
*
* @param key a key
* @param value the value to unregister
* @return the future
*/
Future<Void> unregister(String key, T value);

/**
* Get the registered values for the key.
*
* @param key a key
* @return future with a JsonArray of the registered values
*/
Future<JsonArray> get(String key);
}
Loading

0 comments on commit e22979a

Please sign in to comment.