diff --git a/src/main/java/io/neonbee/NeonBee.java b/src/main/java/io/neonbee/NeonBee.java index 316557f2..11fa490b 100644 --- a/src/main/java/io/neonbee/NeonBee.java +++ b/src/main/java/io/neonbee/NeonBee.java @@ -54,10 +54,13 @@ import io.neonbee.hook.HookRegistry; import io.neonbee.hook.HookType; import io.neonbee.hook.internal.DefaultHookRegistry; +import io.neonbee.internal.Registry; import io.neonbee.internal.ReplyInboundInterceptor; import io.neonbee.internal.SharedDataAccessor; +import io.neonbee.internal.WriteSafeRegistry; import io.neonbee.internal.buffer.ImmutableBuffer; import io.neonbee.internal.cluster.ClusterHelper; +import io.neonbee.internal.cluster.entity.ClusterEntityRegistry; import io.neonbee.internal.codec.DataExceptionMessageCodec; import io.neonbee.internal.codec.DataQueryMessageCodec; import io.neonbee.internal.codec.EntityWrapperMessageCodec; @@ -71,9 +74,6 @@ import io.neonbee.internal.json.ConfigurableJsonFactory.ConfigurableJsonCodec; import io.neonbee.internal.json.ImmutableJsonArray; import io.neonbee.internal.json.ImmutableJsonObject; -import io.neonbee.internal.registry.Registry; -import io.neonbee.internal.registry.SelfCleaningRegistry; -import io.neonbee.internal.registry.SelfCleaningRegistryHook; import io.neonbee.internal.scanner.HookScanner; import io.neonbee.internal.tracking.MessageDirection; import io.neonbee.internal.tracking.TrackingDataHandlingStrategy; @@ -292,15 +292,10 @@ static Future create(Function> vertxFactory configFuture = succeededFuture(config); } - Future healthCheckRegistryFuture = HealthCheckRegistry.create(vertx); - Future> entityRegistryFuture = - SelfCleaningRegistry.create(vertx, EntityVerticle.REGISTRY_NAME); - // create a NeonBee instance, hook registry and close handler - Future neonBeeFuture = all(configFuture, healthCheckRegistryFuture, entityRegistryFuture) - .map(loadedConfig -> new NeonBee(vertx, options, configFuture.result(), - compositeMeterRegistry, healthCheckRegistryFuture.result(), - entityRegistryFuture.result())); + Future neonBeeFuture = configFuture.map(loadedConfig -> { + return new NeonBee(vertx, options, loadedConfig, compositeMeterRegistry); + }); // boot NeonBee, on failure close Vert.x return neonBeeFuture.compose(NeonBee::boot).recover(closeVertx).compose(unused -> neonBeeFuture); } catch (Throwable t) { @@ -387,10 +382,6 @@ Future registerHealthChecks() { private Future registerHooks() { if (options.shouldIgnoreClassPath()) { - if (vertx.isClustered()) { - // We still need the cleanup Hooks for SelfCleaningRegistry - return getHookRegistry().registerHooks(SelfCleaningRegistryHook.class, CORRELATION_ID).mapEmpty(); - } return succeededFuture(); } @@ -612,16 +603,18 @@ private Future deployModules() { } @VisibleForTesting - NeonBee(Vertx vertx, NeonBeeOptions options, NeonBeeConfig config, - CompositeMeterRegistry compositeMeterRegistry, HealthCheckRegistry healthCheckRegistry, - Registry entityRegistry) { + NeonBee(Vertx vertx, NeonBeeOptions options, NeonBeeConfig config, CompositeMeterRegistry compositeMeterRegistry) { this.vertx = vertx; this.options = options; this.config = config; - this.healthRegistry = healthCheckRegistry; + this.healthRegistry = new HealthCheckRegistry(vertx); this.modelManager = new EntityModelManager(this); - this.entityRegistry = entityRegistry; + if (vertx.isClustered()) { + this.entityRegistry = new ClusterEntityRegistry(vertx, EntityVerticle.REGISTRY_NAME); + } else { + this.entityRegistry = new WriteSafeRegistry<>(vertx, EntityVerticle.REGISTRY_NAME); + } this.compositeMeterRegistry = compositeMeterRegistry; @@ -781,9 +774,9 @@ public EntityModelManager getModelManager() { } /** - * The entity registry is used to store all available entity verticle addresses for a given entity type. + * Get the {@link WriteSafeRegistry} for {@link EntityVerticle}. * - * @return the entity registry + * @return the entity verticle {@link WriteSafeRegistry} */ public Registry getEntityRegistry() { return entityRegistry; diff --git a/src/main/java/io/neonbee/data/DataVerticle.java b/src/main/java/io/neonbee/data/DataVerticle.java index 3c115a03..79304db4 100644 --- a/src/main/java/io/neonbee/data/DataVerticle.java +++ b/src/main/java/io/neonbee/data/DataVerticle.java @@ -458,18 +458,12 @@ public void start(Promise promise) { } @Override - public void stop(Promise stopPromise) throws Exception { + public void stop() throws Exception { NeonBee neonBee = NeonBee.get(vertx); if (neonBee != null) { // NeonBee can be null, when the close hook has removed NeonBee - Vert.x mapping before neonBee.unregisterLocalConsumer(getAddress()); } - Future.future(stopPromiseFromSuper -> { - try { - super.stop(stopPromiseFromSuper); - } catch (Exception e) { - stopPromiseFromSuper.fail(e); - } - }).onComplete(stopPromise); + super.stop(); } /** diff --git a/src/main/java/io/neonbee/entity/EntityVerticle.java b/src/main/java/io/neonbee/entity/EntityVerticle.java index 4d53db08..100e17f8 100644 --- a/src/main/java/io/neonbee/entity/EntityVerticle.java +++ b/src/main/java/io/neonbee/entity/EntityVerticle.java @@ -6,13 +6,13 @@ import static io.neonbee.internal.verticle.ConsolidationVerticle.ENTITY_TYPE_NAME_HEADER; import static io.vertx.core.Future.failedFuture; import static io.vertx.core.Future.succeededFuture; -import static java.util.stream.Collectors.toList; import java.util.List; +import java.util.Optional; import java.util.Set; -import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.olingo.commons.api.edm.FullQualifiedName; import org.apache.olingo.server.api.uri.UriInfo; @@ -25,18 +25,20 @@ import io.neonbee.data.DataQuery; import io.neonbee.data.DataRequest; import io.neonbee.data.DataVerticle; +import io.neonbee.internal.Registry; +import io.neonbee.internal.WriteSafeRegistry; import io.neonbee.internal.helper.AsyncHelper; -import io.neonbee.internal.registry.Registry; import io.neonbee.internal.verticle.ConsolidationVerticle; import io.neonbee.logging.LoggingFacade; import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.Vertx; +import io.vertx.core.json.JsonArray; public abstract class EntityVerticle extends DataVerticle { /** - * Name for the {@link NeonBee#getEntityRegistry()}. + * Name for the {@link WriteSafeRegistry}. */ public static final String REGISTRY_NAME = "EntityVerticleRegistry"; @@ -196,7 +198,12 @@ protected static Future parseUriInfo(NeonBee neonBee, DataQuery query) * @return A list of all (entity) verticle names as qualified names */ public static Future> getVerticlesForEntityType(Vertx vertx, FullQualifiedName entityTypeName) { - return getRegistry(vertx).get(sharedEntityMapName(entityTypeName)); + return Future + .future(asyncGet -> getRegistry(vertx).get(sharedEntityMapName(entityTypeName)) + .onSuccess(asyncGet::complete).onFailure(asyncGet::fail)) + .map(qualifiedNames -> ((List) Optional.ofNullable((JsonArray) qualifiedNames) + .orElseGet(JsonArray::new).getList()).stream().map(Object::toString).distinct() + .collect(Collectors.toList())); } /** @@ -246,35 +253,15 @@ public void start(Promise promise) { * adding the EntityTypes to a shared map in a secure and cluster-wide thread safe manner. */ private Future announceEntityVerticle(Vertx vertx) { - return announceOrConcealEntities(entityType -> getRegistry(vertx).register(entityType, getQualifiedName())); - } - - /** - * Conceals that this EntityVerticle is handling certain {@link #entityTypeNames()} to the rest of the cluster by - * removing the EntityTypes from a shared map in a secure and cluster-wide thread safe manner. - */ - private Future concealEntityVerticle() { - return announceOrConcealEntities(entityType -> getRegistry(vertx).unregister(entityType, getQualifiedName())); - } - - @Override - public void stop(Promise stopPromise) throws Exception { - concealEntityVerticle().compose(v -> Future.future(stopPromiseFromSuper -> { - try { - super.stop(stopPromiseFromSuper); - } catch (Exception e) { - stopPromiseFromSuper.fail(e); - } - })).onComplete(stopPromise); - } - - private Future announceOrConcealEntities(Function> announceOrConceal) { + // in case this entity verticle does not listen to any entityTypeNames, do not add it to the shared map return entityTypeNames() .map(entityTypeNames -> entityTypeNames != null ? entityTypeNames : Set.of()) .compose(entityTypeNames -> { List> announceFutures = - entityTypeNames.stream().map(EntityVerticle::sharedEntityMapName) - .map(name -> announceOrConceal.apply(name)).collect(toList()); + entityTypeNames.stream().map(EntityVerticle::sharedEntityMapName).map(name -> { + String qualifiedName = getQualifiedName(); + return getRegistry(vertx).register(name, qualifiedName); + }).collect(Collectors.toList()); return Future.all(announceFutures).mapEmpty(); }); } diff --git a/src/main/java/io/neonbee/health/HealthCheckRegistry.java b/src/main/java/io/neonbee/health/HealthCheckRegistry.java index 82ad161d..a5cfe910 100644 --- a/src/main/java/io/neonbee/health/HealthCheckRegistry.java +++ b/src/main/java/io/neonbee/health/HealthCheckRegistry.java @@ -23,9 +23,7 @@ import io.neonbee.data.DataVerticle; import io.neonbee.data.internal.DataContextImpl; import io.neonbee.health.internal.HealthCheck; -import io.neonbee.internal.registry.Registry; -import io.neonbee.internal.registry.SelfCleaningRegistry; -import io.neonbee.internal.verticle.HealthCheckVerticle; +import io.neonbee.internal.WriteSafeRegistry; import io.neonbee.logging.LoggingFacade; import io.vertx.core.Future; import io.vertx.core.Handler; @@ -38,11 +36,8 @@ import io.vertx.ext.healthchecks.Status; public class HealthCheckRegistry { - - /** - * Name for the internally used registry. - */ - public static final String REGISTRY_NAME = HealthCheckRegistry.class.getSimpleName(); + @VisibleForTesting + static final String REGISTRY_NAME = HealthCheckRegistry.class.getSimpleName(); private static final String UP = "UP"; @@ -61,31 +56,20 @@ public class HealthCheckRegistry { @VisibleForTesting final Map checks; - @VisibleForTesting - final Registry healthVerticleRegistry; - @VisibleForTesting HealthChecks healthChecks; private final Vertx vertx; /** - * Creates a new {@link HealthCheckRegistry}. + * Constructs a new instance of {@link HealthCheckRegistry}. * - * @param vertx the related {@link Vertx} instance - * @return a succeeded or failed future depending on the success of the HealthCheckRegistry creation. + * @param vertx the current Vert.x instance */ - public static Future create(Vertx vertx) { - return SelfCleaningRegistry.create(vertx, REGISTRY_NAME) - .map(registry -> new HealthCheckRegistry(vertx, registry)); - } - - @VisibleForTesting - HealthCheckRegistry(Vertx vertx, Registry healthVerticleRegistry) { + public HealthCheckRegistry(Vertx vertx) { this.vertx = vertx; checks = new HashMap<>(); healthChecks = HealthChecks.create(vertx); - this.healthVerticleRegistry = healthVerticleRegistry; } /** @@ -113,16 +97,6 @@ public HealthCheck registerGlobalCheck(String id, long retentionTime, return register(id, retentionTime, true, procedure, config); } - /** - * Registers a HealthCheckVerticle to collect data from. - * - * @param verticle The related HealthCheckVerticle - * @return a succeeded or failed Future, depends on the success of the registration - */ - public Future registerVerticle(HealthCheckVerticle verticle) { - return healthVerticleRegistry.register(SHARED_MAP_KEY, verticle.getQualifiedName()); - } - /** * Registers a node health check, which checks node specific parameters like CPU or RAM. * @@ -228,7 +202,9 @@ Future> getLocalHealthCheckResults() { } private Future> getClusteredHealthCheckResults(DataContext dataContext) { - return healthVerticleRegistry.get(SHARED_MAP_KEY) + WriteSafeRegistry registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME); + return registry.get(SHARED_MAP_KEY) + .map(qualifiedNames -> qualifiedNames == null ? new JsonArray() : qualifiedNames) .compose(qualifiedNames -> { List> asyncCheckResults = sendDataRequests(qualifiedNames, dataContext); @@ -242,8 +218,8 @@ private Future> getClusteredHealthCheckResults(DataContext data }); } - private List> sendDataRequests(List qualifiedNames, DataContext dataContext) { - return qualifiedNames.stream().map(DataRequest::new) + private List> sendDataRequests(JsonArray qualifiedNames, DataContext dataContext) { + return qualifiedNames.stream().map(Object::toString).map(DataRequest::new) .map(dr -> DataVerticle.requestData(vertx, dr, dataContext).onSuccess(data -> { if (LOGGER.isDebugEnabled()) { LOGGER.correlateWith(dataContext).debug("Retrieved health check of verticle {}", diff --git a/src/main/java/io/neonbee/internal/Registry.java b/src/main/java/io/neonbee/internal/Registry.java new file mode 100644 index 00000000..09899fa6 --- /dev/null +++ b/src/main/java/io/neonbee/internal/Registry.java @@ -0,0 +1,36 @@ +package io.neonbee.internal; + +import io.vertx.core.Future; +import io.vertx.core.json.JsonArray; + +/** + * Interface for an asynchronous registry implementation. + */ +public interface Registry { + + /** + * Register a value in the registry. + * + * @param key a key + * @param value the value to register + * @return the future + */ + Future register(String key, T value); + + /** + * Unregister a value. + * + * @param key a key + * @param value the value to unregister + * @return the future + */ + Future unregister(String key, T value); + + /** + * Get the registered values for the key. + * + * @param key a key + * @return future with a JsonArray of the registered values + */ + Future get(String key); +} diff --git a/src/main/java/io/neonbee/internal/WriteSafeRegistry.java b/src/main/java/io/neonbee/internal/WriteSafeRegistry.java new file mode 100644 index 00000000..73637dd3 --- /dev/null +++ b/src/main/java/io/neonbee/internal/WriteSafeRegistry.java @@ -0,0 +1,132 @@ +package io.neonbee.internal; + +import static io.vertx.core.Future.succeededFuture; + +import java.util.function.Supplier; + +import io.neonbee.NeonBee; +import io.neonbee.logging.LoggingFacade; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonArray; +import io.vertx.core.shareddata.AsyncMap; + +/** + * A registry to manage values in the {@link SharedDataAccessor} shared map. + *

+ * The values under the key are stored in a JsonArray. + */ +public class WriteSafeRegistry implements Registry { + + private final LoggingFacade logger = LoggingFacade.create(); + + private final SharedDataAccessor sharedDataAccessor; + + private final String registryName; + + /** + * Create a new {@link WriteSafeRegistry}. + * + * @param vertx the {@link Vertx} instance + * @param registryName the name of the map registry + */ + public WriteSafeRegistry(Vertx vertx, String registryName) { + this.registryName = registryName; + this.sharedDataAccessor = new SharedDataAccessor(vertx, this.getClass()); + } + + /** + * Register a value in the async shared map of {@link NeonBee} by key. + * + * @param sharedMapKey the shared map key + * @param value the value to register + * @return the future + */ + @Override + public Future register(String sharedMapKey, T value) { + logger.info("register value: \"{}\" in shared map: \"{}\"", sharedMapKey, value); + + return lock(sharedMapKey, () -> addValue(sharedMapKey, value)); + } + + @Override + public Future get(String sharedMapKey) { + return getSharedMap().compose(map -> map.get(sharedMapKey)).map(o -> (JsonArray) o); + } + + /** + * Method that acquires a lock for the sharedMapKey and released the lock after the futureSupplier is executed. + * + * @param sharedMapKey the shared map key + * @param futureSupplier supplier for the future to be secured by the lock + * @return the futureSupplier + */ + private Future lock(String sharedMapKey, Supplier> futureSupplier) { + logger.debug("Get lock for {}", sharedMapKey); + return sharedDataAccessor.getLock(sharedMapKey).onFailure(throwable -> { + logger.error("Error acquiring lock for {}", sharedMapKey, throwable); + }).compose(lock -> futureSupplier.get().onComplete(anyResult -> { + logger.debug("Releasing lock for {}", sharedMapKey); + lock.release(); + })); + } + + private Future addValue(String sharedMapKey, Object value) { + Future> sharedMap = getSharedMap(); + + return sharedMap.compose(map -> map.get(sharedMapKey)) + .map(valueOrNull -> valueOrNull != null ? (JsonArray) valueOrNull : new JsonArray()) + .compose(valueArray -> { + if (!valueArray.contains(value)) { + valueArray.add(value); + } + + if (logger.isInfoEnabled()) { + logger.info("Registered verticle {} in shared map.", value); + } + + return sharedMap.compose(map -> map.put(sharedMapKey, valueArray)); + }); + } + + /** + * Unregister the value in the {@link NeonBee} async shared map from the sharedMapKey. + * + * @param sharedMapKey the shared map key + * @param value the value to unregister + * @return the future + */ + @Override + public Future unregister(String sharedMapKey, T value) { + logger.debug("unregister value: \"{}\" from shared map: \"{}\"", sharedMapKey, value); + + return lock(sharedMapKey, () -> removeValue(sharedMapKey, value)); + } + + private Future removeValue(String sharedMapKey, Object value) { + Future> sharedMap = getSharedMap(); + + return sharedMap.compose(map -> map.get(sharedMapKey)).map(jsonArray -> (JsonArray) jsonArray) + .compose(values -> { + if (values == null) { + return succeededFuture(); + } + + if (logger.isInfoEnabled()) { + logger.info("Unregistered verticle {} in shared map.", value); + } + + values.remove(value); + return sharedMap.compose(map -> map.put(sharedMapKey, values)); + }); + } + + /** + * Shared map that is used as registry. + * + * @return Future to the shared map + */ + public Future> getSharedMap() { + return sharedDataAccessor.getAsyncMap(registryName); + } +} diff --git a/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java b/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java new file mode 100644 index 00000000..da1507b1 --- /dev/null +++ b/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java @@ -0,0 +1,144 @@ +package io.neonbee.internal.cluster.entity; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import com.google.common.annotations.VisibleForTesting; + +import io.neonbee.entity.EntityVerticle; +import io.neonbee.internal.Registry; +import io.neonbee.internal.WriteSafeRegistry; +import io.neonbee.internal.cluster.ClusterHelper; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.core.shareddata.AsyncMap; + +/** + * A special registry implementation that stores cluster information's. + *

+ * This implementation stores note specific entries for the registered {@link EntityVerticle}. The cluster information + * is stored in a JsonObject: + * + *

+ * {
+ *     "qualifiedName": "value",
+ *     "entityName": "key"
+ * }
+ * 
+ */ +public class ClusterEntityRegistry implements Registry { + + private static final String QUALIFIED_NAME = "qualifiedName"; + + private static final String ENTITY_NAME = "entityName"; + + @VisibleForTesting + final WriteSafeRegistry clusteringInformation; + + private final Vertx vertx; + + private final WriteSafeRegistry entityRegistry; + + /** + * Create a new instance of {@link ClusterEntityRegistry}. + * + * @param vertx the {@link Vertx} instance + * @param registryName the name of the map registry + */ + public ClusterEntityRegistry(Vertx vertx, String registryName) { + this.entityRegistry = new WriteSafeRegistry<>(vertx, registryName); + this.clusteringInformation = new WriteSafeRegistry<>(vertx, registryName + "#ClusteringInformation"); + this.vertx = vertx; + } + + @VisibleForTesting + static JsonObject clusterRegistrationInformation(String sharedMapKey, String value) { + return JsonObject.of(QUALIFIED_NAME, value, ENTITY_NAME, sharedMapKey); + } + + @Override + public Future register(String sharedMapKey, String value) { + return Future + .all(entityRegistry.register(sharedMapKey, value), + clusteringInformation.register(getClusterNodeId(), + clusterRegistrationInformation(sharedMapKey, value))) + .mapEmpty(); + } + + @Override + public Future unregister(String sharedMapKey, String value) { + return Future.all(entityRegistry.unregister(sharedMapKey, value), clusteringInformation + .unregister(getClusterNodeId(), clusterRegistrationInformation(sharedMapKey, value))) + .mapEmpty(); + } + + /** + * Get the cluster node ID. + * + * @return the ID of the cluster node + */ + @VisibleForTesting + String getClusterNodeId() { + return ClusterHelper.getClusterNodeId(vertx); + } + + @Override + public Future get(String sharedMapKey) { + return entityRegistry.get(sharedMapKey); + } + + /** + * Get the clustering information for the provided cluster ID from the registry. + * + * @param clusterNodeId the ID of the cluster node + * @return the future + */ + Future getClusteringInformation(String clusterNodeId) { + return clusteringInformation.get(clusterNodeId); + } + + /** + * Remove the entry for a node ID. + * + * @param clusterNodeId the ID of the cluster node + * @return the future + */ + Future removeClusteringInformation(String clusterNodeId) { + return clusteringInformation.getSharedMap().compose(map -> map.remove(clusterNodeId)).mapEmpty(); + } + + /** + * Unregister all registered entities for a node by ID. + * + * @param clusterNodeId the ID of the cluster node + * @return the future + */ + public Future unregisterNode(String clusterNodeId) { + return clusteringInformation.getSharedMap().compose(AsyncMap::entries).compose(map -> { + JsonArray registeredEntities = ((JsonArray) map.remove(clusterNodeId)).copy(); + List> futureList = new ArrayList<>(registeredEntities.size()); + for (Object o : registeredEntities) { + if (remove(map, o)) { + JsonObject jo = (JsonObject) o; + String entityName = jo.getString(ENTITY_NAME); + String qualifiedName = jo.getString(QUALIFIED_NAME); + futureList.add(unregister(entityName, qualifiedName)); + } + } + return Future.join(futureList).mapEmpty(); + }).compose(cf -> removeClusteringInformation(clusterNodeId)); + } + + private boolean remove(Map map, Object o) { + for (Map.Entry node : map.entrySet()) { + JsonArray ja = (JsonArray) node.getValue(); + if (ja.contains(o)) { + return false; + } + } + return true; + } +} diff --git a/src/main/java/io/neonbee/internal/cluster/entity/UnregisterEntityVerticlesHook.java b/src/main/java/io/neonbee/internal/cluster/entity/UnregisterEntityVerticlesHook.java new file mode 100644 index 00000000..6cd129ca --- /dev/null +++ b/src/main/java/io/neonbee/internal/cluster/entity/UnregisterEntityVerticlesHook.java @@ -0,0 +1,92 @@ +package io.neonbee.internal.cluster.entity; + +import static io.neonbee.hook.HookType.CLUSTER_NODE_ID; +import static io.vertx.core.Future.succeededFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.neonbee.NeonBee; +import io.neonbee.hook.Hook; +import io.neonbee.hook.HookContext; +import io.neonbee.hook.HookType; +import io.neonbee.internal.Registry; +import io.neonbee.internal.cluster.ClusterHelper; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; + +/** + * Hooks for unregistering verticle models. + */ +public class UnregisterEntityVerticlesHook { + + private static final Logger LOGGER = LoggerFactory.getLogger(UnregisterEntityVerticlesHook.class); + + /** + * This method is called when a NeonBee instance shutdown gracefully. + * + * @param neonBee the {@link NeonBee} instance + * @param hookContext the {@link HookContext} + * @param promise {@link Promise} to complete the function. + */ + @Hook(HookType.BEFORE_SHUTDOWN) + public void unregisterOnShutdown(NeonBee neonBee, HookContext hookContext, Promise promise) { + LOGGER.info("Unregistering models on shutdown"); + Vertx vertx = neonBee.getVertx(); + String clusterNodeId = ClusterHelper.getClusterNodeId(vertx); + unregister(neonBee, clusterNodeId).onComplete(promise) + .onSuccess(unused -> LOGGER.info("Models unregistered successfully")) + .onFailure(ignoredCause -> LOGGER.error("Failed to unregister models on shutdown")); + } + + /** + * Unregister the entity qualified names for the node by ID. + * + * @param neonBee the {@link NeonBee} instance + * @param clusterNodeId the ID of the cluster node + * @return Future + */ + public static Future unregister(NeonBee neonBee, String clusterNodeId) { + if (!neonBee.getVertx().isClustered()) { + return succeededFuture(); + } + + Registry registry = neonBee.getEntityRegistry(); + if (!(registry instanceof ClusterEntityRegistry)) { + LOGGER.warn("Running in clustered mode but not using the ClusterEntityRegistry."); + return succeededFuture(); + } + + ClusterEntityRegistry clusterEntityRegistry = (ClusterEntityRegistry) registry; + + LOGGER.info("Unregistering entity verticle models for node ID {} ...", clusterNodeId); + Future unregisterFuture = clusterEntityRegistry.unregisterNode(clusterNodeId); + + return unregisterFuture + .onSuccess( + unused -> LOGGER.info("Unregistered entity verticle models for node ID {} ...", clusterNodeId)) + .onFailure(cause -> LOGGER.error("Failed to unregistered entity verticle models for node ID {} ...", + clusterNodeId, cause)); + } + + /** + * This method is called when a NeonBee node has left the cluster. + * + * @param neonBee the {@link NeonBee} instance + * @param hookContext the {@link HookContext} + * @param promise {@link Promise} to completed the function. + */ + @Hook(HookType.NODE_LEFT) + public void cleanup(NeonBee neonBee, HookContext hookContext, Promise promise) { + String clusterNodeId = hookContext.get(CLUSTER_NODE_ID); + LOGGER.info("Cleanup qualified names for node {}", clusterNodeId); + if (ClusterHelper.isLeader(neonBee.getVertx())) { + LOGGER.info("Cleaning registered qualified names ..."); + unregister(neonBee, clusterNodeId).onComplete(promise) + .onSuccess(unused -> LOGGER.info("Qualified names successfully cleaned up")) + .onFailure(ignoredCause -> LOGGER.error("Failed to cleanup qualified names")); + } + } + +} diff --git a/src/main/java/io/neonbee/internal/helper/SharedDataHelper.java b/src/main/java/io/neonbee/internal/helper/SharedDataHelper.java deleted file mode 100644 index ae638801..00000000 --- a/src/main/java/io/neonbee/internal/helper/SharedDataHelper.java +++ /dev/null @@ -1,43 +0,0 @@ -package io.neonbee.internal.helper; - -import java.util.function.Supplier; - -import io.neonbee.internal.SharedDataAccessor; -import io.neonbee.logging.LoggingFacade; -import io.vertx.core.Future; -import io.vertx.core.Vertx; - -public final class SharedDataHelper { - private static final LoggingFacade LOGGER = LoggingFacade.create(); - - private static SharedDataAccessor getSharedDataAccessor(Vertx vertx) { - return new SharedDataAccessor(vertx, SharedDataHelper.class); - } - - /** - * Method that acquires a lock for the key and released the lock after the futureSupplier is executed. - * - * @param vertx the related Vert.x instance - * @param key the shared map key - * @param futureSupplier supplier for the future to be secured by the lock - * @return the futureSupplier - */ - public static Future lock(Vertx vertx, String key, Supplier> futureSupplier) { - LOGGER.debug("Get lock for key \"{}\"", key); - return getSharedDataAccessor(vertx) - .getLock(key) - .onFailure(throwable -> LOGGER.error("Error acquiring lock for key \"{}\"", key, throwable)) - .compose(lock -> { - LOGGER.debug("Received lock for key \"{}\"", key); - return futureSupplier.get().onComplete(anyResult -> { - LOGGER.debug("Releasing lock for key \"{}\"", key); - lock.release(); - }); - }); - } - - /** - * This helper class cannot be instantiated. - */ - private SharedDataHelper() {} -} diff --git a/src/main/java/io/neonbee/internal/registry/NonLockingRegistry.java b/src/main/java/io/neonbee/internal/registry/NonLockingRegistry.java deleted file mode 100644 index 87f33cda..00000000 --- a/src/main/java/io/neonbee/internal/registry/NonLockingRegistry.java +++ /dev/null @@ -1,142 +0,0 @@ -package io.neonbee.internal.registry; - -import static io.vertx.core.Future.succeededFuture; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Set; - -import com.google.common.base.Joiner; - -import io.neonbee.internal.SharedDataAccessor; -import io.neonbee.logging.LoggingFacade; -import io.vertx.core.Future; -import io.vertx.core.Vertx; -import io.vertx.core.json.JsonArray; -import io.vertx.core.shareddata.AsyncMap; - -/** - * A registry that is using a plain {@link SharedDataAccessor#getAsyncMap(String)} shared map. - *

- * Warning: This {@link Registry} implementation is not waiting for locks, or has other overhead. If the - * application logic itself take care about this (e.g. only using unique keys) this could be a super-fast registry - * implementation. If not have a look into {@link WriteSafeRegistry} or {@link SelfCleaningRegistry}. - *

- * The values under the key are stored in a JsonArray, which means only types that can be serialized and deserialized by - * jackson are allowed. - */ -public class NonLockingRegistry implements Registry { - private static final LoggingFacade LOGGER = LoggingFacade.create(); - - protected final String registryName; - - protected final Vertx vertx; - - private final SharedDataAccessor sharedDataAccessor; - - /** - * Create a new {@link NonLockingRegistry}. - * - * @param vertx the {@link Vertx} instance - * @param registryName the name of the map registry - */ - public NonLockingRegistry(Vertx vertx, String registryName) { - this.vertx = vertx; - this.registryName = registryName; - this.sharedDataAccessor = new SharedDataAccessor(vertx, this.getClass()); - } - - @Override - public Future register(String key, Collection values) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Begin to register value \"{}\" for key \"{}\" in registry \"{}\"", toString(values), key, - registryName); - } - Future> sharedMap = getSharedMap(); - - return sharedMap.compose(map -> map.get(key)) - .map(valueOrNull -> valueOrNull != null ? valueOrNull : new JsonArray()) - .compose(valueArray -> { - for (T value : values) { - if (!valueArray.contains(value)) { - valueArray.add(value); - } - } - - return sharedMap.compose(map -> map.put(key, valueArray)) - .onSuccess(v -> { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "Register value \"{}\" for key \"{}\" in registry \"{}\" has been finished", - toString(values), - key, registryName); - } - }); - }); - } - - @Override - public Future unregister(String key, Collection values) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Begin to unregister value \"{}\" for key \"{}\" in registry \"{}\"", toString(values), key, - registryName); - } - Future> sharedMap = getSharedMap(); - - return sharedMap.compose(map -> map.get(key)).compose(registeredValues -> { - if (registeredValues == null) { - return succeededFuture(); - } - - for (T value : values) { - registeredValues.remove(value); - } - - return sharedMap.compose(map -> { - if (registeredValues.isEmpty()) { - LOGGER.debug("No value left for key \"{}\" in registry \"{}\" -> remove key from registry", key, - registryName); - return map.remove(key).mapEmpty(); - } else { - return map.put(key, registeredValues); - } - }).onSuccess(v -> { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "Unregister value \"{}\" for key \"{}\" in registry \"{}\" has been finished", - toString(values), key, registryName); - } - }); - }); - } - - @Override - @SuppressWarnings("unchecked") - public Future> get(String key) { - return getSharedMap().compose(map -> map.get(key)).map(values -> { - if (values == null) { - return List.of(); - } else { - List transformedValues = new ArrayList<>(); - // It is important to call forEach here, because the iterator of JsonArray wraps the JSON values into a - // Java type. - values.forEach(value -> transformedValues.add((T) value)); - return transformedValues; - } - }); - } - - @Override - public Future> getKeys() { - return getSharedMap().compose(AsyncMap::keys); - } - - private Future> getSharedMap() { - return sharedDataAccessor.getAsyncMap(registryName); - } - - private String toString(Collection values) { - return Joiner.on(", ").join(values); - } -} diff --git a/src/main/java/io/neonbee/internal/registry/Registry.java b/src/main/java/io/neonbee/internal/registry/Registry.java deleted file mode 100644 index e06b8392..00000000 --- a/src/main/java/io/neonbee/internal/registry/Registry.java +++ /dev/null @@ -1,80 +0,0 @@ -package io.neonbee.internal.registry; - -import java.util.Collection; -import java.util.List; -import java.util.Optional; -import java.util.Set; - -import io.vertx.core.Future; - -/** - * Interface for an asynchronous registry implementation. - */ -public interface Registry { - - /** - * Register a value in the registry. - * - * @param key a key - * @param value the value to register - * @return the future - */ - default Future register(String key, T value) { - return register(key, List.of(value)); - } - - /** - * Register multiple unique values in the registry. - * - * @param key a key - * @param values the values to register - * @return the future - */ - Future register(String key, Collection values); - - /** - * Unregister a value. - * - * @param key a key - * @param value the value to unregister - * @return the future - */ - default Future unregister(String key, T value) { - return unregister(key, List.of(value)); - } - - /** - * Unregister multiple values. - * - * @param key a key - * @param values the values to unregister - * @return the future - */ - Future unregister(String key, Collection values); - - /** - * Get all registered values for the key. - * - * @param key a key - * @return future with the List of the registered values. If no value is registered an empty List will be returned. - */ - Future> get(String key); - - /** - * Get any registered value for the key. This is a convenient method in case that the registry has a 1:1 - * relationship. - * - * @param key a key - * @return an Optional with any value for the registered key, or an empty one if no value is registered. - */ - default Future> getAny(String key) { - return get(key).map(keys -> keys.stream().findAny()); - } - - /** - * Get all registered keys. - * - * @return future with the Set of the registered keys. If no key is registered an empty Set will be returned. - */ - Future> getKeys(); -} diff --git a/src/main/java/io/neonbee/internal/registry/SelfCleaningRegistry.java b/src/main/java/io/neonbee/internal/registry/SelfCleaningRegistry.java deleted file mode 100644 index 3ce77943..00000000 --- a/src/main/java/io/neonbee/internal/registry/SelfCleaningRegistry.java +++ /dev/null @@ -1,65 +0,0 @@ -package io.neonbee.internal.registry; - -import java.util.Collection; -import java.util.List; -import java.util.Set; - -import io.vertx.core.Future; -import io.vertx.core.Vertx; - -public final class SelfCleaningRegistry extends WriteSafeRegistry { - static final String READ_ONLY_MAP_SUFFIX = "#readOnlyMap"; - - static final String NODE_ID_SEPARATOR = "[###]"; - - private final Registry readOnlyRegistry; - - private final String nodeSuffix; - - private final SelfCleaningRegistryController controller; - - /** - * Creates a new {@link SelfCleaningRegistry} that stores the registered values by cluster node and removes them, - * when the node leaves the cluster. - * - * @param vertx the related {@link Vertx} instance - * @param registryName the name of the map registry - * @param The type for the registry values. Only JSON data types fully supported. - * @return a succeeded or failed future depending on the success of the registry creation. - */ - public static Future> create(Vertx vertx, String registryName) { - SelfCleaningRegistryController controller = new SelfCleaningRegistryController(vertx); - return controller.addRegistry(registryName).map(new SelfCleaningRegistry<>(vertx, registryName, controller)); - } - - private SelfCleaningRegistry(Vertx vertx, String registryName, SelfCleaningRegistryController controller) { - super(vertx, registryName); - this.readOnlyRegistry = new WriteSafeRegistry<>(vertx, registryName + READ_ONLY_MAP_SUFFIX); - this.controller = controller; - this.nodeSuffix = NODE_ID_SEPARATOR + controller.getNodeId(); - } - - @Override - public Future register(String key, Collection values) { - return super.register(addNodeSuffix(key), values, () -> controller.refreshReadOnlyMap(registryName)); - } - - @Override - public Future unregister(String key, Collection values) { - return super.unregister(addNodeSuffix(key), values, () -> controller.refreshReadOnlyMap(registryName)); - } - - @Override - public Future> get(String key) { - return readOnlyRegistry.get(key); - } - - @Override - public Future> getKeys() { - return readOnlyRegistry.getKeys(); - } - - private String addNodeSuffix(String key) { - return key + nodeSuffix; - } -} diff --git a/src/main/java/io/neonbee/internal/registry/SelfCleaningRegistryController.java b/src/main/java/io/neonbee/internal/registry/SelfCleaningRegistryController.java deleted file mode 100644 index 2b46a798..00000000 --- a/src/main/java/io/neonbee/internal/registry/SelfCleaningRegistryController.java +++ /dev/null @@ -1,186 +0,0 @@ -package io.neonbee.internal.registry; - -import static io.neonbee.internal.registry.SelfCleaningRegistry.NODE_ID_SEPARATOR; -import static io.vertx.core.Future.all; -import static java.util.stream.Collectors.toList; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Predicate; -import java.util.function.Supplier; - -import io.neonbee.internal.SharedDataAccessor; -import io.neonbee.internal.cluster.ClusterHelper; -import io.neonbee.internal.helper.SharedDataHelper; -import io.neonbee.logging.LoggingFacade; -import io.vertx.core.Future; -import io.vertx.core.Vertx; -import io.vertx.core.json.JsonArray; -import io.vertx.core.shareddata.AsyncMap; - -public class SelfCleaningRegistryController extends WriteSafeRegistry { - private static final LoggingFacade LOGGER = LoggingFacade.create(); - - private static final String REGISTRY_NAME = "SelfCleaningRegistryControllerRegistry"; - - private static final String DUMMY_NODE_ID = "dummyNodeId"; - - private final SharedDataAccessor selfCleaningRegistryAccessor; - - private final SharedDataAccessor readOnlyMapAccessor; - - private final String nodeId; - - /** - * Creates a new SelfCleaningRegistryController instance. - * - * @param vertx the related Vert.x instance - */ - public SelfCleaningRegistryController(Vertx vertx) { - super(vertx, REGISTRY_NAME); - this.selfCleaningRegistryAccessor = new SharedDataAccessor(vertx, SelfCleaningRegistry.class); - this.readOnlyMapAccessor = new SharedDataAccessor(vertx, WriteSafeRegistry.class); - if (vertx.isClustered()) { - this.nodeId = ClusterHelper.getClusterNodeId(vertx); - } else { - // Isn't clustered node id is irrelevant - this.nodeId = DUMMY_NODE_ID; - } - } - - /** - * Method to determine the cluster node id. - * - * @return The cluster node id, or a dummy id in case Vert.x isn't clustered. - */ - public String getNodeId() { - if (vertx.isClustered()) { - return ClusterHelper.getClusterNodeId(vertx); - } else { - // Isn't clustered node id is irrelevant - return DUMMY_NODE_ID; - } - } - - /** - * Refreshes the read only map of the passed registry. - * - * @param registryName The name of the registry - * @return a succeeded or failed future depending on the success of the refresh. - */ - public Future refreshReadOnlyMap(String registryName) { - LOGGER.debug("Begin to refresh read only map of registry \"{}\" at node \"{}\"", registryName, nodeId); - Future refreshedFuture = modifyReadOnlyMap(registryName, () -> getEntriesOfRegistry(registryName) - .map(this::accumulateValues) - .compose(accumulatedValues -> setReadOnlyValues(registryName, accumulatedValues))); - - return refreshedFuture - .onFailure(t -> LOGGER.debug("Failed to refresh readOnlyMap for registry \"{}\"", registryName, t)) - .onSuccess(v -> LOGGER.debug("Refresh readOnlyMap for registry \"{}\" finished.", registryName)); - } - - /** - * Loops over every registry and removes all entries related to the passed node. - * - * @param nodeId The nodeId to clean up - * @return a succeeded or failed future depending on the success of the cleanup. - */ - public Future cleanUpAllRegistriesForNode(String nodeId) { - return getKeys().compose(registryNames -> { - List> cleanedRegistries = registryNames.stream() - .map(regName -> removeAllKeysForNode(nodeId, regName) - .compose(v -> refreshReadOnlyMap(regName))) - .collect(toList()); - return all(cleanedRegistries).mapEmpty(); - }); - } - - /** - * Registers the passed registry so that it can be cleaned up. - * - * @param registryName The name of the registry to add - * @return a succeeded or failed future depending on the success of the registration. - */ - public Future addRegistry(String registryName) { - return register(registryName, nodeId); - } - - private Future removeAllKeysForNode(String nodeId, String registryName) { - LOGGER.debug("Begin to remove all values for node \"{}\" in registry \"{}\"", nodeId, registryName); - - Future> entriesToRemove = getEntriesOfRegistry(registryName).map(entries -> { - return entries.keySet().stream().filter(entry -> entry.endsWith(nodeId)).collect(toList()); - }); - - Future valuesRemovedFuture = - entriesToRemove.compose(entries -> getRegistryMap(registryName).compose(registryMap -> { - return all(entries.stream().map(registryMap::remove).collect(toList())); - })).mapEmpty(); - - return valuesRemovedFuture - .onFailure(t -> LOGGER.debug("Failed to remove all values for node \"{}\" in registry \"{}\" finished", - nodeId, registryName, t)) - .onSuccess(v -> LOGGER.debug("Remove all values for node \"{}\" in registry \"{}\" finished", nodeId, - registryName)); - } - - private Future modifyReadOnlyMap(String registryName, Supplier> action) { - return SharedDataHelper.lock(vertx, getReadOnlyMapName(registryName), action); - } - - private Future> getEntriesOfRegistry(String registryName) { - return getRegistryMap(registryName).compose(AsyncMap::entries); - } - - private Future> getRegistryMap(String registryName) { - return selfCleaningRegistryAccessor.getAsyncMap(registryName); - } - - private Future> getReadOnlyMap(String registryName) { - return readOnlyMapAccessor.getAsyncMap(getReadOnlyMapName(registryName)); - } - - private Future setReadOnlyValues(String registryName, Map accumulatedValues) { - return getReadOnlyMap(registryName).compose(readOnlyMap -> readOnlyMap.clear().compose(v -> { - List> written = new ArrayList<>(); - accumulatedValues.forEach((key, values) -> written.add(readOnlyMap.put(key, values))); - return all(written).mapEmpty(); - })); - } - - /** - * Loops over the entries of the related registry and merges the node separated values. - * - *

-     * "Users[###]Node1": ["Foo"]
-     * "Users[###]Node2": ["Bar"]
-     *
-     * becomes
-     *
-     * "Users: ["Foo", "Bar"]
-     * 
- * - * - * @param nodeSeparatedValues map with unmerged entries. - * @return a map with merged entries. - */ - private Map accumulateValues(Map nodeSeparatedValues) { - Map accumulatedValues = new HashMap<>(); - nodeSeparatedValues.forEach((currentKeyWithSuffix, valuesOfEntry) -> { - String keyWithoutSuffix = removeNodeSuffix(currentKeyWithSuffix); - JsonArray values = accumulatedValues.computeIfAbsent(keyWithoutSuffix, s -> new JsonArray()); - valuesOfEntry.stream().filter(Predicate.not(values::contains)).forEach(values::add); - }); - return accumulatedValues; - } - - private String removeNodeSuffix(String key) { - return key.substring(0, key.indexOf(NODE_ID_SEPARATOR)); - } - - private String getReadOnlyMapName(String registryName) { - return registryName + SelfCleaningRegistry.READ_ONLY_MAP_SUFFIX; - } -} diff --git a/src/main/java/io/neonbee/internal/registry/SelfCleaningRegistryHook.java b/src/main/java/io/neonbee/internal/registry/SelfCleaningRegistryHook.java deleted file mode 100644 index 23ed1768..00000000 --- a/src/main/java/io/neonbee/internal/registry/SelfCleaningRegistryHook.java +++ /dev/null @@ -1,59 +0,0 @@ -package io.neonbee.internal.registry; - -import static io.neonbee.hook.HookType.CLUSTER_NODE_ID; - -import io.neonbee.NeonBee; -import io.neonbee.hook.Hook; -import io.neonbee.hook.HookContext; -import io.neonbee.hook.HookType; -import io.neonbee.internal.cluster.ClusterHelper; -import io.neonbee.logging.LoggingFacade; -import io.vertx.core.Promise; - -public class SelfCleaningRegistryHook { - private static final LoggingFacade LOGGER = LoggingFacade.create(); - - /** - * This method is called when a NeonBee instance shutdown gracefully. - * - * @param neonBee the {@link NeonBee} instance - * @param hookContext the {@link HookContext} - * @param promise {@link Promise} to complete the function. - */ - @Hook(HookType.BEFORE_SHUTDOWN) - public void unregisterOnShutdown(NeonBee neonBee, HookContext hookContext, Promise promise) { - SelfCleaningRegistryController controller = new SelfCleaningRegistryController(neonBee.getVertx()); - - String nodeId = controller.getNodeId(); - LOGGER.debug("Execute BEFORE_SHUTDOWN hook for SelfCleaningRegistry on node \"{}\"", nodeId); - controller.cleanUpAllRegistriesForNode(nodeId).onSuccess( - v -> LOGGER.debug("Finished BEFORE_SHUTDOWN hook for SelfCleaningRegistry on node \"{}\"", nodeId)) - .onComplete(promise); - } - - /** - * This method is called when a NeonBee node has left the cluster. - * - * @param neonBee the {@link NeonBee} instance - * @param hookContext the {@link HookContext} - * @param promise {@link Promise} to completed the function. - */ - @Hook(HookType.NODE_LEFT) - public void cleanup(NeonBee neonBee, HookContext hookContext, Promise promise) { - if (ClusterHelper.isLeader(neonBee.getVertx())) { - String nodeId = hookContext.get(CLUSTER_NODE_ID); - SelfCleaningRegistryController controller = new SelfCleaningRegistryController(neonBee.getVertx()); - - String currentNodeId = controller.getNodeId(); - LOGGER.debug("Execute NODE_LEFT hook for SelfCleaningRegistry on node \"{}\" for node \"{}\"", - currentNodeId, nodeId); - controller.cleanUpAllRegistriesForNode(nodeId) - .onSuccess(v -> LOGGER.debug( - "Finished NODE_LEFT hook for SelfCleaningRegistry on node \"{}\" for node \"{}\"", - currentNodeId, nodeId)) - .onComplete(promise); - } else { - promise.complete(); - } - } -} diff --git a/src/main/java/io/neonbee/internal/registry/WriteSafeRegistry.java b/src/main/java/io/neonbee/internal/registry/WriteSafeRegistry.java deleted file mode 100644 index ab117850..00000000 --- a/src/main/java/io/neonbee/internal/registry/WriteSafeRegistry.java +++ /dev/null @@ -1,62 +0,0 @@ -package io.neonbee.internal.registry; - -import static io.neonbee.internal.helper.SharedDataHelper.lock; -import static io.vertx.core.Future.succeededFuture; - -import java.util.Collection; -import java.util.function.Supplier; - -import io.vertx.core.Future; -import io.vertx.core.Vertx; - -public class WriteSafeRegistry extends NonLockingRegistry { - /** - * Create a new {@link WriteSafeRegistry} based on a {@link NonLockingRegistry}. - * - * @param vertx the {@link Vertx} instance - * @param registryName the name of the map registry - */ - public WriteSafeRegistry(Vertx vertx, String registryName) { - super(vertx, registryName); - } - - @Override - public Future register(String key, Collection values) { - return register(key, values, () -> succeededFuture()); - } - - /** - * Register multiple unique values in the registry. - * - * @param key a key - * @param values the values to register - * @param alterAction an action that is called after the registry was modified, but before the related lock is - * released. - * @return the future - */ - public Future register(String key, Collection values, Supplier> alterAction) { - return lock(vertx, getLockKey(key), () -> super.register(key, values).compose(v -> alterAction.get())); - } - - @Override - public Future unregister(String key, Collection values) { - return unregister(key, values, () -> succeededFuture()); - } - - /** - * Unregister multiple values. - * - * @param key a key - * @param values the values to unregister - * @param alterAction an action that is called after the registry was modified, but before the related lock is - * released. - * @return the future - */ - public Future unregister(String key, Collection values, Supplier> alterAction) { - return lock(vertx, getLockKey(key), () -> super.unregister(key, values).compose(v -> alterAction.get())); - } - - private String getLockKey(String key) { - return registryName + "-" + key; - } -} diff --git a/src/main/java/io/neonbee/internal/verticle/HealthCheckVerticle.java b/src/main/java/io/neonbee/internal/verticle/HealthCheckVerticle.java index e9fa27a4..8e766123 100644 --- a/src/main/java/io/neonbee/internal/verticle/HealthCheckVerticle.java +++ b/src/main/java/io/neonbee/internal/verticle/HealthCheckVerticle.java @@ -6,13 +6,18 @@ import java.util.List; import java.util.UUID; +import com.google.common.annotations.VisibleForTesting; + import io.neonbee.NeonBee; import io.neonbee.NeonBeeDeployable; import io.neonbee.data.DataContext; import io.neonbee.data.DataQuery; import io.neonbee.data.DataVerticle; +import io.neonbee.health.HealthCheckRegistry; +import io.neonbee.internal.WriteSafeRegistry; import io.vertx.core.Future; import io.vertx.core.Promise; +import io.vertx.core.Vertx; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.ext.healthchecks.CheckResult; @@ -25,6 +30,9 @@ public class HealthCheckVerticle extends DataVerticle { */ public static final String SHARED_MAP_KEY = "healthCheckVerticles"; + @VisibleForTesting + static final String REGISTRY_NAME = HealthCheckRegistry.class.getSimpleName(); + private static final String NAME = "_healthCheckVerticle-" + UUID.randomUUID(); /** @@ -36,7 +44,7 @@ public class HealthCheckVerticle extends DataVerticle { public void start(Promise promise) { Future.future(super::start).compose(v -> { if (NeonBee.get(vertx).getOptions().isClustered()) { - return NeonBee.get(vertx).getHealthCheckRegistry().registerVerticle(this); + return register(vertx); } return Future.succeededFuture(); }).onComplete(promise); @@ -54,4 +62,9 @@ public Future retrieveData(DataQuery query, DataContext context) { public String getName() { return NAME; } + + private Future register(Vertx vertx) { + WriteSafeRegistry registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME); + return registry.register(SHARED_MAP_KEY, getQualifiedName()); + } } diff --git a/src/test/java/io/neonbee/NeonBeeMockHelper.java b/src/test/java/io/neonbee/NeonBeeMockHelper.java index 9a979c17..a5152e04 100644 --- a/src/test/java/io/neonbee/NeonBeeMockHelper.java +++ b/src/test/java/io/neonbee/NeonBeeMockHelper.java @@ -1,7 +1,6 @@ package io.neonbee; import static io.neonbee.test.helper.OptionsHelper.defaultOptions; -import static io.neonbee.test.helper.ReflectionHelper.createObjectWithPrivateConstructor; import static io.vertx.core.Future.succeededFuture; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; @@ -11,8 +10,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; import java.util.List; import java.util.function.Supplier; @@ -21,10 +18,6 @@ import io.micrometer.core.instrument.composite.CompositeMeterRegistry; import io.neonbee.NeonBeeInstanceConfiguration.ClusterManager; import io.neonbee.config.NeonBeeConfig; -import io.neonbee.entity.EntityVerticle; -import io.neonbee.health.HealthCheckRegistry; -import io.neonbee.internal.registry.Registry; -import io.neonbee.internal.registry.WriteSafeRegistry; import io.vertx.core.AsyncResult; import io.vertx.core.Closeable; import io.vertx.core.DeploymentOptions; @@ -167,13 +160,6 @@ public static Vertx defaultVertxMock() { return null; }).when(sharedDataMock).getLocalLock(any(), any()); - // mock global locks (and always grant them) - when(sharedDataMock.getLock(any())).thenReturn(succeededFuture(mock(Lock.class))); - doAnswer(invocation -> { - invocation.>>getArgument(1).handle(succeededFuture(mock(Lock.class))); - return null; - }).when(sharedDataMock).getLock(any(), any()); - return vertxMock; } @@ -279,17 +265,6 @@ public static NeonBee registerNeonBeeMock(Vertx vertx, NeonBeeConfig config) { */ @SuppressWarnings("PMD.EmptyCatchBlock") public static NeonBee registerNeonBeeMock(Vertx vertx, NeonBeeOptions options, NeonBeeConfig config) { - try { - Constructor hcrc = - HealthCheckRegistry.class.getDeclaredConstructor(Vertx.class, Registry.class); - Registry healthRegistry = new WriteSafeRegistry<>(vertx, HealthCheckRegistry.REGISTRY_NAME); - Registry entityRegistry = new WriteSafeRegistry<>(vertx, EntityVerticle.REGISTRY_NAME); - HealthCheckRegistry hcr = createObjectWithPrivateConstructor(hcrc, vertx, healthRegistry); - return new NeonBee(vertx, options, config, new CompositeMeterRegistry(), hcr, entityRegistry); - } catch (NoSuchMethodException | InvocationTargetException | InstantiationException - | IllegalAccessException e) { - // Can't happen the constructor exists .. - throw new RuntimeException(e); - } + return new NeonBee(vertx, options, config, new CompositeMeterRegistry()); } } diff --git a/src/test/java/io/neonbee/NeonBeeTest.java b/src/test/java/io/neonbee/NeonBeeTest.java index b812a45d..d45f62f3 100644 --- a/src/test/java/io/neonbee/NeonBeeTest.java +++ b/src/test/java/io/neonbee/NeonBeeTest.java @@ -89,6 +89,7 @@ @Isolated("Some of the methods in this test class run clustered and use the FakeClusterManager for it. The FakeClusterManager uses a static state and can therefore not be run with other clustered tests.") @Tag(LONG_RUNNING_TEST) +@SuppressWarnings({ "PMD.CouplingBetweenObjects" }) class NeonBeeTest extends NeonBeeTestBase { private Vertx vertx; @@ -366,7 +367,7 @@ void checkTestCloseVertxOnError(String description, boolean ownVertx, Future NeonBee.create(any(), any(), any(), any())).thenCallRealMethod(); - Vertx failingVertxMock = defaultVertxMock(); + Vertx failingVertxMock = mock(Vertx.class); when(failingVertxMock.close()).thenReturn(result); Function> vertxFunction; diff --git a/src/test/java/io/neonbee/entity/EntityVerticleTest.java b/src/test/java/io/neonbee/entity/EntityVerticleTest.java index cb0649da..d3e626ab 100644 --- a/src/test/java/io/neonbee/entity/EntityVerticleTest.java +++ b/src/test/java/io/neonbee/entity/EntityVerticleTest.java @@ -37,6 +37,7 @@ import io.neonbee.data.DataContext; import io.neonbee.data.DataQuery; import io.neonbee.data.DataVerticle; +import io.neonbee.internal.WriteSafeRegistry; import io.neonbee.internal.verticle.ConsolidationVerticle; import io.neonbee.test.base.EntityVerticleTestBase; import io.vertx.core.CompositeFuture; @@ -72,20 +73,22 @@ void deployEntityVerticles(VertxTestContext testContext) { @Test @DisplayName("Check if entity types are registered in shared entity map") void registerEntityTypes(VertxTestContext testContext) { + WriteSafeRegistry registry = + new WriteSafeRegistry<>(getNeonBee().getVertx(), EntityVerticle.REGISTRY_NAME); Checkpoint checkpoint = testContext.checkpoint(2); - getNeonBee().getEntityRegistry().get(sharedEntityMapName(new FullQualifiedName("ERP.Customers"))) + registry.get(sharedEntityMapName(new FullQualifiedName("ERP.Customers"))) .onComplete(testContext.succeeding(result -> { testContext.verify(() -> { - assertThat(result).containsExactly(entityVerticleImpl1.getQualifiedName(), + assertThat((JsonArray) result).containsExactly(entityVerticleImpl1.getQualifiedName(), entityVerticleImpl2.getQualifiedName()); }); checkpoint.flag(); })); - getNeonBee().getEntityRegistry().get(sharedEntityMapName(new FullQualifiedName("Sales.Orders"))) + registry.get(sharedEntityMapName(new FullQualifiedName("Sales.Orders"))) .onComplete(testContext.succeeding(result -> { testContext.verify(() -> { - assertThat(result).containsExactly(entityVerticleImpl1.getQualifiedName()); + assertThat((JsonArray) result).containsExactly(entityVerticleImpl1.getQualifiedName()); }); checkpoint.flag(); })); diff --git a/src/test/java/io/neonbee/health/HealthCheckRegistryTest.java b/src/test/java/io/neonbee/health/HealthCheckRegistryTest.java index a52177fe..a507a5b9 100644 --- a/src/test/java/io/neonbee/health/HealthCheckRegistryTest.java +++ b/src/test/java/io/neonbee/health/HealthCheckRegistryTest.java @@ -2,7 +2,6 @@ import static com.google.common.truth.Truth.assertThat; import static io.neonbee.health.DummyHealthCheck.DUMMY_ID; -import static io.neonbee.health.HealthCheckRegistry.REGISTRY_NAME; import static io.neonbee.internal.verticle.HealthCheckVerticle.SHARED_MAP_KEY; import static io.neonbee.test.helper.OptionsHelper.defaultOptions; import static io.neonbee.test.helper.ReflectionHelper.setValueOfPrivateField; @@ -32,9 +31,8 @@ import io.neonbee.data.internal.DataContextImpl; import io.neonbee.health.internal.HealthCheck; import io.neonbee.internal.SharedDataAccessor; +import io.neonbee.internal.WriteSafeRegistry; import io.neonbee.internal.codec.DataQueryMessageCodec; -import io.neonbee.internal.registry.Registry; -import io.neonbee.internal.registry.WriteSafeRegistry; import io.neonbee.internal.verticle.HealthCheckVerticle; import io.neonbee.test.helper.DeploymentHelper; import io.vertx.core.Future; @@ -62,6 +60,7 @@ void setUp(Vertx vertx) { } catch (IllegalStateException ignored) { // fall through } + neonBee = NeonBeeMockHelper.registerNeonBeeMock(vertx, defaultOptions().setClustered(false), new NeonBeeConfig().setHealthConfig(new HealthConfig().setEnabled(true).setTimeout(2))); } @@ -69,7 +68,7 @@ void setUp(Vertx vertx) { @Test @DisplayName("it can list all health checks") void getHealthChecks(Vertx vertx) { - HealthCheckRegistry registry = neonBee.getHealthCheckRegistry(); + HealthCheckRegistry registry = new HealthCheckRegistry(vertx); assertThat(registry.getHealthChecks()).isEmpty(); registry.checks.put("check-1", mock(HealthCheck.class)); @@ -237,6 +236,10 @@ public Future retrieveData(DataQuery query, DataContext context) { @DisplayName("Don't collect HealthCheck results from other nodes, when clustered but cluster collection is disabled") void testConsolidateHealthCheckResultsClusteredDisabled(Vertx vertx, VertxTestContext testContext) throws Exception { + NeonBeeConfig neonBeeConfig = + new NeonBeeConfig().setHealthConfig(new HealthConfig().setCollectClusteredResults(false)); + neonBee = NeonBeeMockHelper.registerNeonBeeMock(vertx, defaultOptions().setClustered(true), neonBeeConfig); + Checkpoint receivedResultsValidated = testContext.checkpoint(); HealthCheckVerticle healthCheckVerticle = new HealthCheckVerticle() { @@ -252,10 +255,6 @@ public Future retrieveData(DataQuery query, DataContext context) { } }; - NeonBeeConfig neonBeeConfig = - new NeonBeeConfig().setHealthConfig(new HealthConfig().setCollectClusteredResults(false)); - neonBee = NeonBeeMockHelper.registerNeonBeeMock(vertx, defaultOptions().setClustered(true), neonBeeConfig); - AsyncMap sharedMap = new SharedDataAccessor(vertx, HealthCheckVerticle.class) .getAsyncMap("#sharedMap").result(); setValueOfPrivateField(neonBee, "sharedAsyncMap", sharedMap); @@ -280,7 +279,7 @@ public Future retrieveData(DataQuery query, DataContext context) { void testConsolidateHealthCheckResultsNonClustered(Vertx vertx, VertxTestContext testContext) { Checkpoint cp = testContext.checkpoint(2); - HealthCheckRegistry mock = new HealthCheckRegistry(vertx, new WriteSafeRegistry<>(vertx, REGISTRY_NAME)) { + HealthCheckRegistry mock = new HealthCheckRegistry(vertx) { @Override Future> getLocalHealthCheckResults() { cp.flag(); @@ -303,7 +302,7 @@ Future> getLocalHealthCheckResults() { @DisplayName("it requests data for a specific check") void testConsolidateResultsForSpecificCheck(Vertx vertx, VertxTestContext testContext) { String checkName = "dummy-a"; - HealthCheckRegistry registry = neonBee.getHealthCheckRegistry(); + HealthCheckRegistry registry = new HealthCheckRegistry(vertx); registry.register(new DummyHealthCheck(neonBee)) .compose(hc -> registry.register(new DummyHealthCheck(neonBee) { @Override @@ -342,19 +341,19 @@ public boolean isGlobal() { @Test @DisplayName("it does not fail if some verticle addresses are not reachable") void test(Vertx vertx, VertxTestContext testContext) { - neonBee = - NeonBeeMockHelper.registerNeonBeeMock(vertx, defaultOptions().setClustered(true), new NeonBeeConfig()); + neonBee = NeonBeeMockHelper.registerNeonBeeMock(vertx, defaultOptions().setClustered(true)); String verticleName = "not-existing-verticle-name"; - - Registry registry = neonBee.getHealthCheckRegistry().healthVerticleRegistry; + WriteSafeRegistry registry = new WriteSafeRegistry<>(vertx, HealthCheckRegistry.REGISTRY_NAME); Future setupFuture = registry.register(SHARED_MAP_KEY, verticleName); setupFuture - .compose(v -> vertx.deployVerticle(new HealthCheckVerticle())) - .compose(v -> neonBee.getHealthCheckRegistry().register(new DummyHealthCheck(neonBee))) + .compose(unused -> DeploymentHelper.undeployAllVerticlesOfClass(neonBee.getVertx(), + HealthCheckVerticle.class)) + .compose(v -> Future.all(vertx.deployVerticle(new HealthCheckVerticle()), + neonBee.getHealthCheckRegistry().register(new DummyHealthCheck(neonBee)))) .compose(v -> neonBee.getHealthCheckRegistry().collectHealthCheckResults()) .onSuccess(result -> testContext.verify(() -> assertThat(result.getString("status")).isEqualTo("UP"))) - .compose(result -> registry.get(SHARED_MAP_KEY)) + .compose(result -> registry.get(SHARED_MAP_KEY)).map(JsonArray.class::cast) .onSuccess(registeredVerticles -> testContext.verify(() -> { assertThat(registeredVerticles).hasSize(2); assertThat(registeredVerticles).contains(verticleName); diff --git a/src/test/java/io/neonbee/internal/WriteSafeRegistryTest.java b/src/test/java/io/neonbee/internal/WriteSafeRegistryTest.java new file mode 100644 index 00000000..c232e95a --- /dev/null +++ b/src/test/java/io/neonbee/internal/WriteSafeRegistryTest.java @@ -0,0 +1,61 @@ +package io.neonbee.internal; + +import static com.google.common.truth.Truth.assertThat; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonArray; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; + +@ExtendWith(VertxExtension.class) +class WriteSafeRegistryTest { + + public static final String REGISTRY_NAME = "TEST_REGISTRY"; + + @Test + @DisplayName("register value in registry") + void testRegister(Vertx vertx, VertxTestContext context) { + WriteSafeRegistry registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME); + String key = "key"; + String value = "value"; + registry.register(key, value).compose(unused -> registry.get(key)).onSuccess(mapValue -> context.verify(() -> { + assertThat(mapValue).isEqualTo(new JsonArray().add(value)); + context.completeNow(); + })).onFailure(context::failNow); + } + + @Test + @DisplayName("unregister value from registry") + void unregister(Vertx vertx, VertxTestContext context) { + WriteSafeRegistry registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME); + String key = "key"; + String value = "value"; + + registry.register(key, value).compose(unused -> registry.unregister(key, "value2")) + .compose(unused -> registry.get(key)).onSuccess(mapValue -> context.verify(() -> { + assertThat(mapValue).isEqualTo(new JsonArray().add(value)); + })).compose(unused -> registry.unregister(key, value)).compose(unused -> registry.get(key)) + .onSuccess(mapValue -> context.verify(() -> { + assertThat(mapValue).isEqualTo(new JsonArray()); + context.completeNow(); + })).onFailure(context::failNow); + } + + @Test + @DisplayName("get value from registry") + void get(Vertx vertx, VertxTestContext context) { + String key = "key"; + String value = "value"; + + WriteSafeRegistry registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME); + registry.register(key, value).compose(unused -> registry.get(key)).onSuccess(jsonArray -> context.verify(() -> { + assertThat(jsonArray).isNotNull(); + assertThat(jsonArray.contains(value)).isTrue(); + context.completeNow(); + })).onFailure(context::failNow); + } +} diff --git a/src/test/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistryTest.java b/src/test/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistryTest.java new file mode 100644 index 00000000..e91642ca --- /dev/null +++ b/src/test/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistryTest.java @@ -0,0 +1,167 @@ +package io.neonbee.internal.cluster.entity; + +import static com.google.common.truth.Truth.assertThat; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonArray; +import io.vertx.junit5.Checkpoint; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; + +@ExtendWith(VertxExtension.class) +class ClusterEntityRegistryTest { + + private static final String REGISTRY_NAME = "CLUSTER_REGISTRY_NAME"; + + private static final String KEY = "key"; + + private static final String VALUE = "value"; + + @Test + @DisplayName("register value in registry") + void register(Vertx vertx, VertxTestContext context) { + ClusterEntityRegistry registry = new TestClusterEntityRegistry(vertx, REGISTRY_NAME); + registry.register(KEY, VALUE).compose(unused -> registry.get(KEY)).onSuccess(mapValue -> context.verify(() -> { + assertThat(mapValue).isEqualTo(new JsonArray().add(VALUE)); + context.completeNow(); + })).onFailure(context::failNow); + } + + @Test + @DisplayName("unregister value from registry") + void unregister(Vertx vertx, VertxTestContext context) { + ClusterEntityRegistry registry = new TestClusterEntityRegistry(vertx, REGISTRY_NAME); + registry.register(KEY, VALUE).compose(unused -> registry.unregister(KEY, "value2")) + .compose(unused -> registry.get(KEY)).onSuccess(mapValue -> context.verify(() -> { + assertThat(mapValue).isEqualTo(new JsonArray().add(VALUE)); + })).compose(unused -> registry.unregister(KEY, VALUE)).compose(unused -> registry.get(KEY)) + .onSuccess(mapValue -> context.verify(() -> { + assertThat(mapValue).isEqualTo(new JsonArray()); + context.completeNow(); + })).onFailure(context::failNow); + } + + @Test + @DisplayName("get value from registry") + void get(Vertx vertx, VertxTestContext context) { + ClusterEntityRegistry registry = new TestClusterEntityRegistry(vertx, REGISTRY_NAME); + registry.register(KEY, VALUE).compose(unused -> registry.get(KEY)).onSuccess(jsonArray -> context.verify(() -> { + assertThat(jsonArray).isNotNull(); + assertThat(jsonArray.contains(VALUE)).isTrue(); + context.completeNow(); + })).onFailure(context::failNow); + } + + @Test + @DisplayName("get the clustering information from the registry") + void getClusteringInformation(Vertx vertx, VertxTestContext context) { + ClusterEntityRegistry registry = new TestClusterEntityRegistry(vertx, REGISTRY_NAME); + registry.register(KEY, VALUE) + .compose(unused -> registry.getClusteringInformation(TestClusterEntityRegistry.CLUSTER_NODE_ID)) + .onSuccess(mapValue -> context.verify(() -> { + assertThat(mapValue) + .isEqualTo(JsonArray.of(ClusterEntityRegistry.clusterRegistrationInformation(KEY, VALUE))); + context.completeNow(); + })).onFailure(context::failNow); + } + + @Test + @DisplayName("unregister all key from registry") + void removeClusteringInformation(Vertx vertx, VertxTestContext context) { + Checkpoint checkpoint = context.checkpoint(2); + ClusterEntityRegistry registry = new TestClusterEntityRegistry(vertx, REGISTRY_NAME); + registry.register(KEY, VALUE) + .compose(unused -> registry.getClusteringInformation(TestClusterEntityRegistry.CLUSTER_NODE_ID)) + .onSuccess(mapValue -> context.verify(() -> { + assertThat(mapValue).isNotEmpty(); + checkpoint.flag(); + })).compose(unused -> registry.removeClusteringInformation(TestClusterEntityRegistry.CLUSTER_NODE_ID)) + .onSuccess(mapValue -> context.verify(() -> { + assertThat(mapValue).isNull(); + checkpoint.flag(); + })).onFailure(context::failNow); + } + + @Test + @DisplayName("unregister node") + void unregisterNode1(Vertx vertx, VertxTestContext context) { + ClusterEntityRegistry registry = new TestClusterEntityRegistry(vertx, REGISTRY_NAME); + registry.register(KEY, VALUE) + .compose(unused -> registry.getClusteringInformation(TestClusterEntityRegistry.CLUSTER_NODE_ID)) + .compose(unused -> registry.unregisterNode(TestClusterEntityRegistry.CLUSTER_NODE_ID)) + .compose(unused -> registry.getClusteringInformation(TestClusterEntityRegistry.CLUSTER_NODE_ID)) + .onSuccess(mapValue -> context.verify(() -> { + assertThat(mapValue).isNull(); + context.completeNow(); + })).onFailure(context::failNow); + } + + @Test + @DisplayName("unregister node with two nodes with same entities") + void unregisterNode2(Vertx vertx, VertxTestContext context) { + Checkpoint checkpoint = context.checkpoint(5); + String clusterIdNode1 = "TEST_CLUSTER_ID_0000000000000001"; + String clusterIdNode2 = "TEST_CLUSTER_ID_0000000000000002"; + ClusterEntityRegistry registry1 = new TestClusterEntityRegistry(vertx, REGISTRY_NAME, clusterIdNode1); + ClusterEntityRegistry registry2 = new TestClusterEntityRegistry(vertx, REGISTRY_NAME, clusterIdNode2); + + registry1.register(KEY, VALUE).compose(unused -> registry2.register(KEY, VALUE)) + .compose(unused -> registry1.get(KEY)).onSuccess(ja -> context.verify(() -> { + assertThat(ja).containsExactly(VALUE); + checkpoint.flag(); + })).compose(unused -> registry1.getClusteringInformation(clusterIdNode1)) + .compose(unused -> registry1.unregisterNode(clusterIdNode1)).compose(unused -> registry2.get(KEY)) + .onSuccess(ja -> context.verify(() -> { + assertThat(ja).containsExactly(VALUE); + checkpoint.flag(); + })).compose(unused -> registry2.getClusteringInformation(clusterIdNode1)) + .onSuccess(mapValue -> context.verify(() -> { + assertThat(mapValue).isNull(); + checkpoint.flag(); + })).compose(unused -> registry2.getClusteringInformation(clusterIdNode2)) + .onSuccess(mapValue -> context.verify(() -> { + assertThat(mapValue) + .containsExactly(ClusterEntityRegistry.clusterRegistrationInformation("key", VALUE)); + checkpoint.flag(); + })).compose(unused -> registry2.unregisterNode(clusterIdNode2)).compose(unused -> registry2.get(KEY)) + .onSuccess(ja -> context.verify(() -> { + assertThat(ja).isEmpty(); + checkpoint.flag(); + })).onFailure(context::failNow); + } + + @Test + @DisplayName("remove key from registry") + void remove(Vertx vertx, VertxTestContext context) { + ClusterEntityRegistry registry = new TestClusterEntityRegistry(vertx, REGISTRY_NAME); + registry.register(KEY, VALUE).compose(unused -> registry.removeClusteringInformation(KEY)) + .onSuccess(jsonArray -> context.verify(() -> { + assertThat(jsonArray).isNull(); + context.completeNow(); + })).onFailure(context::failNow); + } + + static class TestClusterEntityRegistry extends ClusterEntityRegistry { + static final String CLUSTER_NODE_ID = "TEST_CLUSTER_ID_0000000000000000"; + + final String clusterNodeId; + + TestClusterEntityRegistry(Vertx vertx, String registryName) { + this(vertx, registryName, CLUSTER_NODE_ID); + } + + TestClusterEntityRegistry(Vertx vertx, String registryName, String clusterNodeId) { + super(vertx, registryName); + this.clusterNodeId = clusterNodeId; + } + + @Override + String getClusterNodeId() { + return clusterNodeId; + } + } +} diff --git a/src/test/java/io/neonbee/internal/cluster/entity/UnregisterEntitiesTest.java b/src/test/java/io/neonbee/internal/cluster/entity/UnregisterEntitiesTest.java new file mode 100644 index 00000000..f28a365c --- /dev/null +++ b/src/test/java/io/neonbee/internal/cluster/entity/UnregisterEntitiesTest.java @@ -0,0 +1,123 @@ +package io.neonbee.internal.cluster.entity; + +import static com.google.common.truth.Truth.assertThat; +import static io.neonbee.NeonBeeInstanceConfiguration.ClusterManager.HAZELCAST; +import static io.neonbee.NeonBeeInstanceConfiguration.ClusterManager.INFINISPAN; +import static io.neonbee.NeonBeeProfile.WEB; +import static io.vertx.core.Future.succeededFuture; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.olingo.commons.api.edm.FullQualifiedName; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.neonbee.NeonBee; +import io.neonbee.NeonBeeExtension; +import io.neonbee.NeonBeeInstanceConfiguration; +import io.neonbee.entity.EntityVerticle; +import io.neonbee.internal.cluster.ClusterHelper; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.junit5.VertxTestContext; + +@ExtendWith({ NeonBeeExtension.class }) +class UnregisterEntitiesTest { + + static final String SHARED_ENTITY_MAP_NAME = "entityVerticles[%s]"; + + @Test + @DisplayName("test unregistering entity models (Infinispan cluster)) ") + void testInfinispanUnregisteringEntities(@NeonBeeInstanceConfiguration(activeProfiles = WEB, clustered = true, + clusterManager = INFINISPAN) NeonBee web, VertxTestContext testContext) { + testUnregisteringEntities(web, testContext); + } + + @Test + @DisplayName("test unregistering entity models (Hazelcast cluster)") + void testHazelcastUnregisteringEntities(@NeonBeeInstanceConfiguration(activeProfiles = WEB, clustered = true, + clusterManager = HAZELCAST) NeonBee web, VertxTestContext testContext) { + testUnregisteringEntities(web, testContext); + } + + private void testUnregisteringEntities(NeonBee web, VertxTestContext testContext) { + assertThat(isClustered(web)).isTrue(); + + Vertx vertx = web.getVertx(); + String clusterNodeId = ClusterHelper.getClusterNodeId(vertx); + + ClusterEntityRegistry registry = (ClusterEntityRegistry) web.getEntityRegistry(); + EntityVerticleUnregisterImpl entityVerticle = new EntityVerticleUnregisterImpl(); + vertx.deployVerticle(entityVerticle) + .compose(unused -> registry.clusteringInformation.get(ClusterHelper.getClusterNodeId(web.getVertx()))) + .onSuccess(jsonArray -> testContext.verify(() -> { + assertThat(jsonArray).hasSize(2); + + List jsonObjectList = jsonArray.stream().map(JsonObject.class::cast).sorted( + (o1, o2) -> CharSequence.compare(o1.getString("entityName"), o2.getString("entityName"))) + .collect(Collectors.toList()); + + assertThat(jsonObjectList.get(0)) + .isEqualTo(JsonObject.of("qualifiedName", entityVerticle.getQualifiedName(), "entityName", + sharedEntityMapName(EntityVerticleUnregisterImpl.FQN_ERP_CUSTOMERS))); + + assertThat(jsonObjectList.get(1)) + .isEqualTo(JsonObject.of("qualifiedName", entityVerticle.getQualifiedName(), "entityName", + sharedEntityMapName(EntityVerticleUnregisterImpl.FQN_SALES_ORDERS))); + })) + .compose(unused -> EntityVerticle.getVerticlesForEntityType(vertx, + EntityVerticleUnregisterImpl.FQN_ERP_CUSTOMERS)) + .compose(list -> EntityVerticle + .getVerticlesForEntityType(vertx, EntityVerticleUnregisterImpl.FQN_SALES_ORDERS).map(list2 -> { + list.addAll(list2); + return list; + })) + .onSuccess(list -> testContext.verify(() -> { + assertThat(list).hasSize(2); + })).compose(unused -> UnregisterEntityVerticlesHook.unregister(web, clusterNodeId)) + .compose(unused -> registry.get(sharedEntityMapName(EntityVerticleUnregisterImpl.FQN_ERP_CUSTOMERS))) + .onSuccess(jsonArray -> testContext.verify(() -> { + assertThat(jsonArray).isEqualTo(new JsonArray()); + })).compose(unused -> registry.clusteringInformation.get(clusterNodeId)) + .onSuccess(object -> testContext.verify(() -> { + assertThat(object).isNull(); + testContext.completeNow(); + })) + .compose(unused -> EntityVerticle.getVerticlesForEntityType(vertx, + EntityVerticleUnregisterImpl.FQN_ERP_CUSTOMERS)) + .compose(list -> EntityVerticle + .getVerticlesForEntityType(vertx, EntityVerticleUnregisterImpl.FQN_SALES_ORDERS).map(list2 -> { + list.addAll(list2); + return list; + })) + .onSuccess(list -> testContext.verify(() -> { + assertThat(list).isEmpty(); + })) + + .onFailure(testContext::failNow); + } + + static String sharedEntityMapName(FullQualifiedName entityTypeName) { + return String.format(SHARED_ENTITY_MAP_NAME, entityTypeName.getFullQualifiedNameAsString()); + } + + private boolean isClustered(NeonBee neonBee) { + return ClusterHelper.getClusterManager(neonBee.getVertx()).isPresent(); + } + + public static class EntityVerticleUnregisterImpl extends EntityVerticle { + static final FullQualifiedName FQN_ERP_CUSTOMERS = new FullQualifiedName("ERP", "Customers"); + + static final FullQualifiedName FQN_SALES_ORDERS = new FullQualifiedName("Sales", "Orders"); + + @Override + public Future> entityTypeNames() { + return succeededFuture(Set.of(FQN_ERP_CUSTOMERS, FQN_SALES_ORDERS)); + } + } +} diff --git a/src/test/java/io/neonbee/internal/registry/NonLockingRegistryTest.java b/src/test/java/io/neonbee/internal/registry/NonLockingRegistryTest.java deleted file mode 100644 index 29b0cdb9..00000000 --- a/src/test/java/io/neonbee/internal/registry/NonLockingRegistryTest.java +++ /dev/null @@ -1,14 +0,0 @@ -package io.neonbee.internal.registry; - -import static io.vertx.core.Future.succeededFuture; - -import io.vertx.core.Future; -import io.vertx.core.Vertx; - -class NonLockingRegistryTest extends StringRegistryTestBase { - - @Override - protected Future> createRegistry(Vertx vertx) { - return succeededFuture(new NonLockingRegistry<>(vertx, "testRegistry")); - } -} diff --git a/src/test/java/io/neonbee/internal/registry/RegistryTest.java b/src/test/java/io/neonbee/internal/registry/RegistryTest.java deleted file mode 100644 index 378c8c34..00000000 --- a/src/test/java/io/neonbee/internal/registry/RegistryTest.java +++ /dev/null @@ -1,58 +0,0 @@ -package io.neonbee.internal.registry; - -import static com.google.common.truth.Truth8.assertThat; -import static io.vertx.core.Future.succeededFuture; -import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doCallRealMethod; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.util.List; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; - -import io.vertx.junit5.VertxExtension; -import io.vertx.junit5.VertxTestContext; - -@ExtendWith(VertxExtension.class) -class RegistryTest { - - @Test - void testGetAny(VertxTestContext testContext) { - Registry dummyRegistry = mock(Registry.class); - when(dummyRegistry.get("key")).thenReturn(succeededFuture(List.of("value"))); - doCallRealMethod().when(dummyRegistry).getAny("key"); - - dummyRegistry.getAny("key").onFailure(testContext::failNow).onSuccess(optional -> { - assertThat(optional).hasValue("value"); - testContext.completeNow(); - }); - } - - @Test - void testRegister(VertxTestContext testContext) { - Registry dummyRegistry = mock(Registry.class); - when(dummyRegistry.register(anyString(), anyList())).thenReturn(succeededFuture()); - doCallRealMethod().when(dummyRegistry).register(anyString(), anyString()); - - dummyRegistry.register("key", "value").onFailure(testContext::failNow).onSuccess(values -> { - verify(dummyRegistry).register(anyString(), anyList()); - testContext.completeNow(); - }); - } - - @Test - void testUnregister(VertxTestContext testContext) { - Registry dummyRegistry = mock(Registry.class); - when(dummyRegistry.unregister(anyString(), anyList())).thenReturn(succeededFuture()); - doCallRealMethod().when(dummyRegistry).unregister(anyString(), anyString()); - - dummyRegistry.unregister("key", "value").onFailure(testContext::failNow).onSuccess(values -> { - verify(dummyRegistry).unregister(anyString(), anyList()); - testContext.completeNow(); - }); - } -} diff --git a/src/test/java/io/neonbee/internal/registry/SelfCleaningRegistryTest.java b/src/test/java/io/neonbee/internal/registry/SelfCleaningRegistryTest.java deleted file mode 100644 index 76cc743b..00000000 --- a/src/test/java/io/neonbee/internal/registry/SelfCleaningRegistryTest.java +++ /dev/null @@ -1,12 +0,0 @@ -package io.neonbee.internal.registry; - -import io.vertx.core.Future; -import io.vertx.core.Vertx; - -class SelfCleaningRegistryTest extends StringRegistryTestBase { - - @Override - protected Future> createRegistry(Vertx vertx) { - return SelfCleaningRegistry.create(vertx, "registryName").map(Registry.class::cast); - } -} diff --git a/src/test/java/io/neonbee/internal/registry/StringRegistryTestBase.java b/src/test/java/io/neonbee/internal/registry/StringRegistryTestBase.java deleted file mode 100644 index ac5b5ea0..00000000 --- a/src/test/java/io/neonbee/internal/registry/StringRegistryTestBase.java +++ /dev/null @@ -1,184 +0,0 @@ -package io.neonbee.internal.registry; - -import static com.google.common.truth.Truth.assertThat; -import static io.vertx.core.Future.all; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.BiFunction; -import java.util.stream.Stream; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; - -import io.vertx.core.Future; -import io.vertx.core.Handler; -import io.vertx.core.Vertx; -import io.vertx.junit5.Checkpoint; -import io.vertx.junit5.VertxExtension; -import io.vertx.junit5.VertxTestContext; - -@ExtendWith(VertxExtension.class) -abstract class StringRegistryTestBase { - private Registry stringRegistry; - - private static final List LIST_ONE_VALUE = List.of("value1"); - - private static final List LIST_TWO_VALUES = List.of("value1", "value2"); - - private static final Map> MAP_WITH_ONE_KEY_AND_ONE_VALUE_EACH = Map.of("key1", LIST_ONE_VALUE); - - private static final Map> MAP_WITH_TWO_KEYS_AND_TWO_VALUES_EACH = - Map.of("key1", LIST_TWO_VALUES, "key2", LIST_TWO_VALUES); - - protected abstract Future> createRegistry(Vertx vertx); - - @BeforeEach - void setUp(Vertx vertx, VertxTestContext testContext) { - createRegistry(vertx).map(stringRegistry -> this.stringRegistry = stringRegistry) - .onComplete(testContext.succeedingThenComplete()); - } - - @AfterEach - void tearDown(Vertx vertx, VertxTestContext testContext) { - vertx.close().onComplete(testContext.succeedingThenComplete()); - } - - static Stream testRegister() { - Arguments oneKeyOneValue = Arguments.of("oneKeyOneValue", MAP_WITH_ONE_KEY_AND_ONE_VALUE_EACH, - null); - Arguments oneKeyTwoValues = - Arguments.of("oneKeyTwoValues", Map.of("key1", LIST_TWO_VALUES), null); - Arguments twoKeysTwoValues = Arguments.of("twoKeysTwoValues", MAP_WITH_TWO_KEYS_AND_TWO_VALUES_EACH, null); - Arguments oneKeyTwoSimilarValues = - Arguments.of("oneKeyTwoSimilarValues", Map.of("key1", List.of("value1", "value1")), - MAP_WITH_ONE_KEY_AND_ONE_VALUE_EACH); - - return Stream.of(oneKeyOneValue, oneKeyTwoValues, twoKeysTwoValues, oneKeyTwoSimilarValues); - } - - @MethodSource("testRegister") - @DisplayName("Test register(String, T)") - @ParameterizedTest(name = "{index}: with {0}") - void testRegisterSingle(String scenario, Map> dataToFill, Map> expected, - VertxTestContext testContext) { - Map> effectiveExpected = expected == null ? dataToFill : expected; - Checkpoint entriesVerified = testContext.checkpoint(dataToFill.size()); - - List> registerFutures = new ArrayList<>(); - dataToFill.forEach((key, values) -> registerFutures.add(stringRegistry.register(key, values))); - all(registerFutures).mapEmpty() - .onSuccess(verify(stringRegistry, effectiveExpected, entriesVerified, testContext)) - .onFailure(testContext::failNow); - } - - @MethodSource("testRegister") - @DisplayName("Test register(String, List)") - @ParameterizedTest(name = "{index}: with {0}") - void testRegisterList(String scenario, Map> dataToFill, Map> expected, - VertxTestContext testContext) { - Map> effectiveExpected = expected == null ? dataToFill : expected; - Checkpoint entriesVerified = testContext.checkpoint(dataToFill.size()); - fillRegistry(stringRegistry, dataToFill) - .onSuccess(verify(stringRegistry, effectiveExpected, entriesVerified, testContext)) - .onFailure(testContext::failNow); - } - - static Stream testUnregister() { - Arguments oneKeyOneValue = Arguments.of("oneKeyOneValue", Map.of("key2", LIST_ONE_VALUE), - Map.of("key1", LIST_TWO_VALUES, "key2", List.of("value2"))); - Arguments oneKeyTwoSimilarValues = - Arguments.of("oneKeyTwoSimilarValues", Map.of("key2", List.of("value1", "value1")), - Map.of("key1", LIST_TWO_VALUES, "key2", List.of("value2"))); - Arguments oneKeyTwoValues = Arguments.of("oneKeyTwoValues", Map.of("key2", LIST_TWO_VALUES), - Map.of("key1", LIST_TWO_VALUES, "key2", List.of())); - return Stream.of(oneKeyOneValue, oneKeyTwoSimilarValues, oneKeyTwoValues); - } - - @MethodSource("testUnregister") - @DisplayName("Test unregister(String, T)") - @ParameterizedTest(name = "{index}: with {0}") - void testUnregisterSingle(String scenario, Map> dataToUnregister, - Map> expected, - VertxTestContext testContext) { - Checkpoint entriesVerified = testContext.checkpoint(expected.size()); - fillRegistry(stringRegistry, MAP_WITH_TWO_KEYS_AND_TWO_VALUES_EACH) - .compose(ignore -> removeFromRegistry(stringRegistry, dataToUnregister)) - .onSuccess(verify(stringRegistry, expected, entriesVerified, testContext)) - .onFailure(testContext::failNow); - } - - @MethodSource("testUnregister") - @DisplayName("Test unregister(String, List)") - @ParameterizedTest(name = "{index}: with {0}") - void testUnregisterList(String scenario, Map> dataToUnregister, - Map> expected, - VertxTestContext testContext) { - Checkpoint entriesVerified = testContext.checkpoint(expected.size()); - fillRegistry(stringRegistry, MAP_WITH_TWO_KEYS_AND_TWO_VALUES_EACH) - .compose(ignore -> { - List> unregisterFutures = new ArrayList<>(); - dataToUnregister - .forEach((key, values) -> unregisterFutures.add(stringRegistry.unregister(key, values))); - return all(unregisterFutures).mapEmpty(); - }) - .onSuccess(verify(stringRegistry, expected, entriesVerified, testContext)) - .onFailure(testContext::failNow); - } - - @Test - @DisplayName("Test getKeys") - void testGetKeys(VertxTestContext testContext) { - Checkpoint checkTwoKeys = testContext.checkpoint(2); - Checkpoint checkOneKey = testContext.checkpoint(); - Checkpoint checkNoneKey = testContext.checkpoint(); - - BiFunction, Checkpoint, Future> keySetVerifier = (expectedKeys, checkpoint) -> { - return stringRegistry.getKeys().onSuccess(keys -> testContext.verify(() -> { - assertThat(keys).containsExactlyElementsIn(expectedKeys); - checkpoint.flag(); - })).mapEmpty(); - }; - - fillRegistry(stringRegistry, MAP_WITH_TWO_KEYS_AND_TWO_VALUES_EACH) - .compose(v -> keySetVerifier.apply(Set.of("key1", "key2"), checkTwoKeys)) - .compose(v -> removeFromRegistry(stringRegistry, MAP_WITH_ONE_KEY_AND_ONE_VALUE_EACH)) - .compose(v -> keySetVerifier.apply(Set.of("key1", "key2"), checkTwoKeys)) - .compose(v -> removeFromRegistry(stringRegistry, Map.of("key1", LIST_TWO_VALUES))) - .compose(v -> keySetVerifier.apply(Set.of("key2"), checkOneKey)) - .compose(v -> removeFromRegistry(stringRegistry, Map.of("key2", LIST_TWO_VALUES))) - .compose(v -> keySetVerifier.apply(Set.of(), checkNoneKey)).onFailure(testContext::failNow); - } - - private static Handler verify(Registry stringRegistry, Map> expected, - Checkpoint entriesVerified, VertxTestContext testContext) { - return ignore -> expected.forEach( - (key, valueList) -> stringRegistry.get(key) - .onSuccess(retrievedValues -> testContext.verify(() -> { - assertThat(retrievedValues).containsExactlyElementsIn(valueList); - entriesVerified.flag(); - }))); - } - - private static Future fillRegistry(Registry registry, Map> testData) { - List> registerFutures = new ArrayList<>(); - testData.forEach((key, values) -> registerFutures.add(registry.register(key, values))); - return all(registerFutures).mapEmpty(); - } - - private static Future removeFromRegistry(Registry registry, - Map> dataToUnregister) { - List> unregisterFutures = new ArrayList<>(); - dataToUnregister.forEach((key, valueList) -> valueList - .forEach(value -> unregisterFutures.add(registry.unregister(key, value)))); - return all(unregisterFutures).mapEmpty(); - } -} diff --git a/src/test/java/io/neonbee/internal/registry/WriteSafeRegistryTest.java b/src/test/java/io/neonbee/internal/registry/WriteSafeRegistryTest.java deleted file mode 100644 index 25f37fc5..00000000 --- a/src/test/java/io/neonbee/internal/registry/WriteSafeRegistryTest.java +++ /dev/null @@ -1,14 +0,0 @@ -package io.neonbee.internal.registry; - -import static io.vertx.core.Future.succeededFuture; - -import io.vertx.core.Future; -import io.vertx.core.Vertx; - -public class WriteSafeRegistryTest extends StringRegistryTestBase { - - @Override - protected Future> createRegistry(Vertx vertx) { - return succeededFuture(new WriteSafeRegistry<>(vertx, "testRegistry")); - } -} diff --git a/src/test/java/io/neonbee/internal/registry/clustered/SelfCleaningRegistryTest.java b/src/test/java/io/neonbee/internal/registry/clustered/SelfCleaningRegistryTest.java deleted file mode 100644 index 50178080..00000000 --- a/src/test/java/io/neonbee/internal/registry/clustered/SelfCleaningRegistryTest.java +++ /dev/null @@ -1,204 +0,0 @@ -package io.neonbee.internal.registry.clustered; - -import static com.google.common.truth.Truth.assertThat; -import static io.neonbee.NeonBeeInstanceConfiguration.ClusterManager.INFINISPAN; -import static io.neonbee.NeonBeeProfile.CORE; -import static io.neonbee.hook.HookType.BEFORE_SHUTDOWN; -import static io.neonbee.hook.HookType.CLUSTER_NODE_ID; -import static io.vertx.core.Future.failedFuture; -import static io.vertx.core.Future.succeededFuture; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable; - -import com.google.common.collect.ImmutableList; - -import io.neonbee.NeonBee; -import io.neonbee.NeonBeeExtension; -import io.neonbee.NeonBeeInstanceConfiguration; -import io.neonbee.hook.HookType; -import io.neonbee.internal.cluster.ClusterHelper; -import io.neonbee.internal.registry.SelfCleaningRegistry; -import io.neonbee.internal.registry.SelfCleaningRegistryController; -import io.neonbee.test.helper.ReflectionHelper; -import io.vertx.core.Future; -import io.vertx.junit5.VertxTestContext; - -class SelfCleaningRegistryTest extends NeonBeeExtension.TestBase { - private static final String CUSTOM_VALUE_KEY_NODE_1 = "customValuesNode1"; - - private static final String CUSTOM_VALUE_KEY_NODE_2 = "customValuesNode2"; - - private static final String CUSTOM_VALUE_KEY_NODE_3 = "customValuesNode3"; - - private static final List CUSTOM_VALUES = List.of("value1", "value2"); - - private static final String SHARED_VALUE_KEY = "sharedValues"; - - private static final List SHARED_VALUES_NODE_1 = List.of("node1-value1", "node1-value2"); - - private static final List SHARED_VALUES_NODE_2 = List.of("node2-value1", "node2-value2"); - - private static final List SHARED_VALUES_NODE_3 = List.of("node3-value1", "node3-value2"); - - private static final String REGISTRY_NAME = "clusteredRegistry"; - - private SelfCleaningRegistry registryNode1; - - private SelfCleaningRegistry registryNode2; - - private SelfCleaningRegistry registryNode3; - - @Test - @DisabledIfEnvironmentVariable(named = "CI", matches = "true") - @DisplayName("test SelfCleaningRegistry with Infinispan") - void cycleTestInfinispan( - @NeonBeeInstanceConfiguration(activeProfiles = CORE, clustered = true, - clusterManager = INFINISPAN) NeonBee node1, - @NeonBeeInstanceConfiguration(activeProfiles = CORE, clustered = true, - clusterManager = INFINISPAN) NeonBee node2, - @NeonBeeInstanceConfiguration(activeProfiles = CORE, clustered = true, - clusterManager = INFINISPAN) NeonBee node3, - VertxTestContext testContext) { - // 1. register values for every node - createRegistries(node1, node2, node3).compose(v -> fillRegistries()).compose(v -> { - // 2. verify registration - return verifyAllValuesRegistered(testContext); - }).compose(v -> { - // 3. Shutdown node1 - return node1.getVertx().close(); - }).compose(v -> { - // 4. verify that all registrations from node1 are gone - return verifyRemovalOfNode1Values(testContext); - }).compose(v -> { - // 5. Execute node left hook with clusterId from node2 (because we can't simulate a crash) - String node2Id = new SelfCleaningRegistryController(node2.getVertx()).getNodeId(); - // We need to trigger this hook from the current cluster leader - // in a real world scenario node 2 is crashed, so it can't be the leader, but in our - // test node 2 still exists, so we have to check which node is the leader. - NeonBee leaderNode = ClusterHelper.isLeader(node2.getVertx()) ? node2 : node3; - return leaderNode.getHookRegistry().executeHooks(HookType.NODE_LEFT, Map.of(CLUSTER_NODE_ID, node2Id)); - }).compose(v -> { - // 6. verify that all registrations from node2 are gone - return verifyRemovalOfNode2Values(testContext); - }).onComplete(testContext.succeedingThenComplete()); - } - - @Test - @DisplayName("test SelfCleaningRegistry with Fake ClusterManager") - @SuppressWarnings("PMD.AvoidCatchingThrowable") - void cycleTestFakeClusterManager( - @NeonBeeInstanceConfiguration(activeProfiles = CORE, clustered = true) NeonBee node1, - @NeonBeeInstanceConfiguration(activeProfiles = CORE, clustered = true) NeonBee node2, - @NeonBeeInstanceConfiguration(activeProfiles = CORE, clustered = true) NeonBee node3, - VertxTestContext testContext) { - // 1. register values for every node - createRegistries(node1, node2, node3).compose(v -> fillRegistries()).compose(v -> { - // 2. verify registration - return verifyAllValuesRegistered(testContext); - }).compose(v -> { - // 3. Shutdown node1 - // With FakeClusterManager we can only simulate it because otherwise NODE_LEFT hook will be triggered on - // every node. And NODE_LEFT hook calls ClusterHelper.isLeader() which results in an exception with - // FakeClusterManager. - return node1.getHookRegistry().executeHooks(BEFORE_SHUTDOWN); - }).compose(v -> { - // 4. verify that all registrations from node1 are gone - return verifyRemovalOfNode1Values(testContext); - }).compose(v -> { - // 5. Execute node left hook with clusterId from node2 (because we can't simulate a crash) - String node2Id = new SelfCleaningRegistryController(node2.getVertx()).getNodeId(); - try { - // As described in 3. NODE_LEFT hook calls method ClusterHelper.isLeader(), which results in an error, - // because ClusterHelper isn't aware of FakeClusterManager. But fortunately it directly returns true, - // if Vert.x isn't in cluster mode, before the problematic code gets executed. - - // To trick the ClusterHelper, we set VertxImpl's "clusterManager" field to null and restore it right - // after the hook completes. - VertxTestContext.ExecutionBlock restoreClusterManager = - ReflectionHelper.setValueOfPrivateField(node3.getVertx(), "clusterManager", null); - return node3.getHookRegistry().executeHooks(HookType.NODE_LEFT, Map.of(CLUSTER_NODE_ID, node2Id)) - .compose(v1 -> { - try { - restoreClusterManager.apply(); - return succeededFuture(); - } catch (Throwable e) { - return failedFuture(e); - } - }); - } catch (NoSuchFieldException | IllegalAccessException e) { - return failedFuture(e); - } - }).compose(v -> { - // 6. verify that all registrations from node2 are gone - return verifyRemovalOfNode2Values(testContext); - }).onComplete(testContext.succeedingThenComplete()); - } - - private Future verifyAllValuesRegistered(VertxTestContext testContext) { - List allSharedValues = ImmutableList.builder().addAll(SHARED_VALUES_NODE_1) - .addAll(SHARED_VALUES_NODE_2).addAll(SHARED_VALUES_NODE_3).build(); - - Map> expected = Map.of( - CUSTOM_VALUE_KEY_NODE_1, CUSTOM_VALUES, - CUSTOM_VALUE_KEY_NODE_2, CUSTOM_VALUES, - CUSTOM_VALUE_KEY_NODE_3, CUSTOM_VALUES, - SHARED_VALUE_KEY, allSharedValues); - return verifyEntries(registryNode1, expected, testContext); - } - - private Future verifyRemovalOfNode1Values(VertxTestContext testContext) { - List allSharedValues = - ImmutableList.builder().addAll(SHARED_VALUES_NODE_2).addAll(SHARED_VALUES_NODE_3).build(); - - Map> expected = Map.of( - CUSTOM_VALUE_KEY_NODE_2, CUSTOM_VALUES, - CUSTOM_VALUE_KEY_NODE_3, CUSTOM_VALUES, - SHARED_VALUE_KEY, allSharedValues); - return verifyEntries(registryNode2, expected, testContext); - } - - private Future verifyRemovalOfNode2Values(VertxTestContext testContext) { - Map> expected = Map.of( - CUSTOM_VALUE_KEY_NODE_3, CUSTOM_VALUES, - SHARED_VALUE_KEY, SHARED_VALUES_NODE_3); - return verifyEntries(registryNode3, expected, testContext); - } - - private Future createRegistries(NeonBee node1, NeonBee node2, NeonBee node3) { - return Future.all( - SelfCleaningRegistry.create(node1.getVertx(), REGISTRY_NAME) - .onSuccess(reg -> registryNode1 = reg), - SelfCleaningRegistry.create(node2.getVertx(), REGISTRY_NAME) - .onSuccess(reg -> registryNode2 = reg), - SelfCleaningRegistry.create(node3.getVertx(), REGISTRY_NAME) - .onSuccess(reg -> registryNode3 = reg)) - .mapEmpty(); - } - - private Future fillRegistries() { - List> registrationFutures = new ArrayList<>(); - registrationFutures.add(registryNode1.register(CUSTOM_VALUE_KEY_NODE_1, CUSTOM_VALUES)); - registrationFutures.add(registryNode1.register(SHARED_VALUE_KEY, SHARED_VALUES_NODE_1)); - registrationFutures.add(registryNode2.register(CUSTOM_VALUE_KEY_NODE_2, CUSTOM_VALUES)); - registrationFutures.add(registryNode2.register(SHARED_VALUE_KEY, SHARED_VALUES_NODE_2)); - registrationFutures.add(registryNode3.register(CUSTOM_VALUE_KEY_NODE_3, CUSTOM_VALUES)); - registrationFutures.add(registryNode3.register(SHARED_VALUE_KEY, SHARED_VALUES_NODE_3)); - return Future.all(registrationFutures).mapEmpty(); - } - - private Future verifyEntries(SelfCleaningRegistry registry, Map> expected, - VertxTestContext testContext) { - List> entriesValidated = new ArrayList<>(); - expected.forEach((key, valueList) -> entriesValidated.add(registry.get(key).compose(values -> { - testContext.verify(() -> assertThat(values).containsExactlyElementsIn(valueList)); - return succeededFuture(); - }))); - return Future.all(entriesValidated).mapEmpty(); - } -} diff --git a/src/test/java/io/neonbee/internal/verticle/HealthCheckVerticleTest.java b/src/test/java/io/neonbee/internal/verticle/HealthCheckVerticleTest.java index 3653c83e..c80f3bb1 100644 --- a/src/test/java/io/neonbee/internal/verticle/HealthCheckVerticleTest.java +++ b/src/test/java/io/neonbee/internal/verticle/HealthCheckVerticleTest.java @@ -2,6 +2,10 @@ import static com.google.common.truth.Truth.assertThat; import static io.neonbee.internal.verticle.HealthCheckVerticle.SHARED_MAP_KEY; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -11,10 +15,9 @@ import io.neonbee.NeonBeeOptions; import io.neonbee.data.DataQuery; import io.neonbee.data.DataRequest; +import io.neonbee.internal.WriteSafeRegistry; import io.neonbee.internal.deploy.DeployableVerticle; -import io.neonbee.internal.registry.Registry; import io.neonbee.test.base.DataVerticleTestBase; -import io.neonbee.test.helper.ReflectionHelper; import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.json.JsonArray; @@ -46,29 +49,28 @@ void testRetrieveData(VertxTestContext testContext) { @Test @DisplayName("should register itself in shared map") - void testSharedMap(VertxTestContext testContext) throws NoSuchFieldException, IllegalAccessException { - Registry registry = ReflectionHelper.getValueOfPrivateField(getNeonBee().getHealthCheckRegistry(), - "healthVerticleRegistry"); + void testSharedMap(VertxTestContext testContext) { + WriteSafeRegistry registry = + new WriteSafeRegistry<>(getNeonBee().getVertx(), HealthCheckVerticle.REGISTRY_NAME); registry.get(SHARED_MAP_KEY) .onComplete(testContext.succeeding(qualifiedNamesOrNull -> testContext.verify(() -> { String expectedName = HealthCheckVerticle.QUALIFIED_NAME; - assertThat(qualifiedNamesOrNull).containsExactly(expectedName); + assertThat((JsonArray) qualifiedNamesOrNull).containsExactly(expectedName); testContext.completeNow(); }))); } @Test @DisplayName("should not register in shared map when non-clustered mode") - void testStartNonClustered(Vertx vertx, VertxTestContext testContext) throws NoSuchFieldException, - IllegalAccessException { - Registry registry = ReflectionHelper.getValueOfPrivateField(getNeonBee().getHealthCheckRegistry(), - "healthVerticleRegistry"); + void testStartNonClustered(Vertx vertx, VertxTestContext testContext) { + WriteSafeRegistry registry = new WriteSafeRegistry<>(vertx, HealthCheckVerticle.REGISTRY_NAME); + WriteSafeRegistry registrySpy = spy(registry); DeployableVerticle.fromVerticle(vertx, new HealthCheckVerticle(), new JsonObject()) .compose(deployable -> deployable.deploy(getNeonBee())) - .compose(v -> registry.getKeys()) - .onComplete(testContext.succeeding(keys -> testContext.verify(() -> assertThat(keys).isEmpty()))) - .onComplete(testContext.succeedingThenComplete()); + .onComplete(testContext.succeeding(r -> testContext.verify(() -> { + verify(registrySpy, times(0)).register(any(), any()); + }))).onComplete(testContext.succeedingThenComplete()); } } diff --git a/src/test/java/io/neonbee/test/helper/ReflectionHelper.java b/src/test/java/io/neonbee/test/helper/ReflectionHelper.java index 79fe12b0..124d8223 100644 --- a/src/test/java/io/neonbee/test/helper/ReflectionHelper.java +++ b/src/test/java/io/neonbee/test/helper/ReflectionHelper.java @@ -5,9 +5,7 @@ import java.lang.invoke.MethodHandles.Lookup; import java.lang.invoke.VarHandle; -import java.lang.reflect.Constructor; import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Modifier; import io.vertx.junit5.VertxTestContext.ExecutionBlock; @@ -136,23 +134,6 @@ public static ExecutionBlock setValueOfPrivateStaticField(Class clazz, String return () -> setValueOfPrivateField(clazz, null, fieldName, oldValue); } - /** - * Creates an object by using its private constructor. - * - * @param constructor The constructor to be used - * @param params The parameters to pass into the constructor - * @return An instance of the created object - * @param the type of the object - * @throws InvocationTargetException If an exception is thrown in the constructor itself. - * @throws InstantiationException If e.g. the class is an abstract class - * @throws IllegalAccessException If JVM doesn't grant access to the field - */ - public static T createObjectWithPrivateConstructor(Constructor constructor, Object... params) - throws InvocationTargetException, InstantiationException, IllegalAccessException { - constructor.setAccessible(true); - return constructor.newInstance(params); - } - private static Class resolveClass(Object object) { Class c = Class.class.isInstance(object) ? (Class) object : object.getClass(); return c.isAnonymousClass() ? resolveClass(c.getSuperclass()) : c;