Skip to content

Commit

Permalink
Update to executeBlocking API change
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Jul 19, 2023
1 parent ff2144d commit 2845be4
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.shareddata.AsyncMap;
Expand Down Expand Up @@ -129,10 +130,10 @@ public BasicCacheContainer getCacheContainer() {

@Override
public <K, V> void getAsyncMap(String name, Promise<AsyncMap<K, V>> promise) {
vertx.<AsyncMap<K, V>>executeBlocking(prom -> {
vertx.<AsyncMap<K, V>>executeBlocking(() -> {
EmbeddedCacheManagerAdmin administration = cacheManager.administration().withFlags(CacheContainerAdmin.AdminFlag.VOLATILE);
Cache<byte[], byte[]> cache = administration.getOrCreateCache(name, "__vertx.distributed.cache.configuration");
prom.complete(new InfinispanAsyncMapImpl<>(vertx, cache));
return new InfinispanAsyncMapImpl<>(vertx, cache);
}, false).onComplete(promise);
}

Expand All @@ -143,7 +144,8 @@ public <K, V> Map<K, V> getSyncMap(String name) {

@Override
public void getLockWithTimeout(String name, long timeout, Promise<Lock> prom) {
vertx.<Lock>executeBlocking(promise -> {
vertx.executeBlocking(() -> {
PromiseInternal<Lock> promise = vertx.promise();
if (!lockManager.isDefined(name)) {
lockManager.defineLock(name);
}
Expand All @@ -159,16 +161,17 @@ public void getLockWithTimeout(String name, long timeout, Promise<Lock> prom) {
promise.fail(throwable);
}
});
}, false).onComplete(prom);
return promise.future();
}, false).compose(f -> f).onComplete(prom);
}

@Override
public void getCounter(String name, Promise<Counter> promise) {
vertx.<Counter>executeBlocking(prom -> {
vertx.<Counter>executeBlocking(() -> {
if (!counterManager.isDefined(name)) {
counterManager.defineCounter(name, CounterConfiguration.builder(CounterType.UNBOUNDED_STRONG).build());
}
prom.complete(new InfinispanCounter(vertx, counterManager.getStrongCounter(name).sync()));
return new InfinispanCounter(vertx, counterManager.getStrongCounter(name).sync());
}, false).onComplete(promise);
}

Expand Down Expand Up @@ -218,57 +221,47 @@ public void getNodeInfo(String nodeId, Promise<NodeInfo> promise) {

@Override
public void join(Promise<Void> promise) {
vertx.<Void>executeBlocking(prom -> {
vertx.<Void>executeBlocking(() -> {
if (active) {
prom.complete();
return;
return null;
}
active = true;
if (!userProvidedCacheManager) {
try {
FileLookup fileLookup = FileLookupFactory.newInstance();
FileLookup fileLookup = FileLookupFactory.newInstance();

URL ispnConfig = fileLookup.lookupFileLocation(ispnConfigPath, getCTCCL());
if (ispnConfig == null) {
log.warn("Cannot find Infinispan config '" + ispnConfigPath + "', using default");
ispnConfig = fileLookup.lookupFileLocation(DEFAULT_INFINISPAN_XML, getCTCCL());
}
ConfigurationBuilderHolder builderHolder = new ParserRegistry().parse(ispnConfig);
// Workaround Launcher in fatjar issue (context classloader may be null)
ClassLoader classLoader = getCTCCL();
if (classLoader == null) {
classLoader = getClass().getClassLoader();
}
builderHolder.getGlobalConfigurationBuilder().classLoader(classLoader);

if (fileLookup.lookupFileLocation(jgroupsConfigPath, getCTCCL()) != null) {
log.warn("Forcing JGroups config to '" + jgroupsConfigPath + "'");
builderHolder.getGlobalConfigurationBuilder().transport().defaultTransport()
.removeProperty(JGroupsTransport.CHANNEL_CONFIGURATOR)
.addProperty(JGroupsTransport.CONFIGURATION_FILE, jgroupsConfigPath);
}
URL ispnConfig = fileLookup.lookupFileLocation(ispnConfigPath, getCTCCL());
if (ispnConfig == null) {
log.warn("Cannot find Infinispan config '" + ispnConfigPath + "', using default");
ispnConfig = fileLookup.lookupFileLocation(DEFAULT_INFINISPAN_XML, getCTCCL());
}
ConfigurationBuilderHolder builderHolder = new ParserRegistry().parse(ispnConfig);
// Workaround Launcher in fatjar issue (context classloader may be null)
ClassLoader classLoader = getCTCCL();
if (classLoader == null) {
classLoader = getClass().getClassLoader();
}
builderHolder.getGlobalConfigurationBuilder().classLoader(classLoader);

cacheManager = new DefaultCacheManager(builderHolder, true);
} catch (IOException e) {
prom.fail(e);
return;
if (fileLookup.lookupFileLocation(jgroupsConfigPath, getCTCCL()) != null) {
log.warn("Forcing JGroups config to '" + jgroupsConfigPath + "'");
builderHolder.getGlobalConfigurationBuilder().transport().defaultTransport()
.removeProperty(JGroupsTransport.CHANNEL_CONFIGURATOR)
.addProperty(JGroupsTransport.CONFIGURATION_FILE, jgroupsConfigPath);
}

cacheManager = new DefaultCacheManager(builderHolder, true);
}
viewListener = new ClusterViewListener();
cacheManager.addListener(viewListener);
try {

subsCacheHelper = new SubsCacheHelper(vertx, cacheManager, nodeSelector);
subsCacheHelper = new SubsCacheHelper(vertx, cacheManager, nodeSelector);

nodeInfoCache = cacheManager.<String, byte[]>getCache("__vertx.nodeInfo").getAdvancedCache();
nodeInfoCache = cacheManager.<String, byte[]>getCache("__vertx.nodeInfo").getAdvancedCache();

lockManager = (EmbeddedClusteredLockManager) EmbeddedClusteredLockManagerFactory.from(cacheManager);
counterManager = EmbeddedCounterManagerFactory.asCounterManager(cacheManager);
lockManager = (EmbeddedClusteredLockManager) EmbeddedClusteredLockManagerFactory.from(cacheManager);
counterManager = EmbeddedCounterManagerFactory.asCounterManager(cacheManager);

prom.complete();
} catch (Exception e) {
prom.fail(e);
}
return null;
}, false).onComplete(promise);
}

Expand All @@ -278,18 +271,17 @@ private ClassLoader getCTCCL() {

@Override
public void leave(Promise<Void> promise) {
vertx.<Void>executeBlocking(prom -> {
vertx.<Void>executeBlocking(() -> {
if (!active) {
prom.complete();
return;
return null;
}
active = false;
subsCacheHelper.close();
cacheManager.removeListener(viewListener);
if (!userProvidedCacheManager) {
cacheManager.stop();
}
prom.complete();
return null;
}, false).onComplete(promise);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public synchronized CloseableIteratorCollectionStream<I, O> handler(Handler<O> h
close();
} else {
dataHandler = handler;
context.<CloseableIteratorCollection<I>>executeBlocking(fut -> fut.complete(iterableSupplier.get()), false).onComplete(ar -> {
context.<CloseableIteratorCollection<I>>executeBlocking(iterableSupplier::get, false).onComplete(ar -> {
synchronized (this) {
if (ar.succeeded()) {
iterable = ar.result();
Expand Down Expand Up @@ -131,7 +131,7 @@ private synchronized void doRead() {
}
readInProgress = true;
if (iterator == null) {
context.<CloseableIterator<I>>executeBlocking(fut -> fut.complete(iterable.iterator()), false).onComplete(ar -> {
context.<CloseableIterator<I>>executeBlocking(() -> iterable.iterator(), false).onComplete(ar -> {
synchronized (this) {
readInProgress = false;
if (ar.succeeded()) {
Expand All @@ -154,12 +154,12 @@ private synchronized void doRead() {
context.runOnContext(v -> emitQueued());
return;
}
context.<List<I>>executeBlocking(fut -> {
context.<List<I>>executeBlocking(() -> {
List<I> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < BATCH_SIZE && iterator.hasNext(); i++) {
batch.add(iterator.next());
}
fut.complete(batch);
return batch;
}, false).onComplete(ar -> {
synchronized (this) {
if (ar.succeeded()) {
Expand Down Expand Up @@ -208,15 +208,15 @@ public synchronized CloseableIteratorCollectionStream<I, O> endHandler(Handler<V
private void close() {
closed = true;
AtomicReference<CloseableIterator<I>> iteratorRef = new AtomicReference<>();
context.executeBlocking(fut -> {
context.executeBlocking(() -> {
synchronized (this) {
iteratorRef.set(iterator);
}
CloseableIterator<I> iter = iteratorRef.get();
if (iter != null) {
iter.close();
}
fut.complete();
return null;
}, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,28 +140,28 @@ public Future<Void> clear() {

@Override
public Future<Integer> size() {
return vertx.executeBlocking(future -> future.complete(cache.size()), false);
return vertx.executeBlocking(cache::size, false);
}

@Override
public Future<Set<K>> keys() {
return vertx.executeBlocking(promise -> {
return vertx.executeBlocking(() -> {
Set<byte[]> cacheKeys = cache.keySet().stream().collect(CacheCollectors.serializableCollector(Collectors::toSet));
promise.complete(cacheKeys.stream().<K>map(DataConverter::fromCachedObject).collect(toSet()));
return cacheKeys.stream().<K>map(DataConverter::fromCachedObject).collect(toSet());
}, false);
}

@Override
public Future<List<V>> values() {
return vertx.executeBlocking(promise -> {
return vertx.executeBlocking(() -> {
List<byte[]> cacheValues = cache.values().stream().collect(CacheCollectors.serializableCollector(Collectors::toList));
promise.complete(cacheValues.stream().<V>map(DataConverter::fromCachedObject).collect(toList()));
return cacheValues.stream().<V>map(DataConverter::fromCachedObject).collect(toList());
}, false);
}

@Override
public Future<Map<K, V>> entries() {
return vertx.executeBlocking(promise -> {
return vertx.executeBlocking(() -> {
Map<byte[], byte[]> cacheEntries = cache.entrySet().stream()
.collect(CacheCollectors.serializableCollector(() -> toMap(Entry::getKey, Entry::getValue)));
Map<K, V> result = new HashMap<>();
Expand All @@ -170,7 +170,7 @@ public Future<Map<K, V>> entries() {
V v = DataConverter.fromCachedObject(entry.getValue());
result.put(k, v);
}
promise.complete(result);
return result;
}, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ public InfinispanCounter(Vertx vertx, SyncStrongCounter strongCounter) {

@Override
public Future<Long> get() {
return vertx.executeBlocking(future -> {
future.complete(strongCounter.getValue());
}, false);
return vertx.executeBlocking(strongCounter::getValue, false);
}

@Override
Expand All @@ -62,22 +60,16 @@ public Future<Long> decrementAndGet() {

@Override
public Future<Long> addAndGet(long value) {
return vertx.executeBlocking(future -> {
future.complete(strongCounter.addAndGet(value));
}, false);
return vertx.executeBlocking(() -> strongCounter.addAndGet(value), false);
}

@Override
public Future<Long> getAndAdd(long value) {
return vertx.executeBlocking(future -> {
future.complete(strongCounter.addAndGet(value) - value);
}, false);
return vertx.executeBlocking(() -> strongCounter.addAndGet(value) - value, false);
}

@Override
public Future<Boolean> compareAndSet(long expected, long value) {
return vertx.executeBlocking(future -> {
future.complete(strongCounter.compareAndSet(expected, value));
}, false);
return vertx.executeBlocking(() -> strongCounter.compareAndSet(expected, value), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.vertx.core.*;
import io.vertx.core.json.JsonObject;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.ext.healthchecks.Status;
import io.vertx.test.core.VertxTestBase;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -80,7 +81,11 @@ public void testDetailedHealthCheck() {
startNodes(2);
ClusterHealthCheck healthCheck = ClusterHealthCheck.createProcedure(vertices[1], true);
vertices[0].sharedData().getAsyncMap("foo").onComplete(onSuccess(asyncMap -> {
vertices[1].executeBlocking(healthCheck).onComplete(onSuccess(status -> {
vertices[1].executeBlocking(() -> {
Promise<Status> promise = Promise.promise();
healthCheck.handle(promise);
return promise.future().toCompletionStage().toCompletableFuture().get();
}).onComplete(onSuccess(status -> {
JsonObject json = new JsonObject(status.toJson().encode()); // test serialization+deserialization
assertTrue(json.getBoolean("ok"));
assertEquals(Integer.valueOf(2), json.getJsonObject("data").getJsonObject("clusterHealth").getInteger("numberOfNodes"));
Expand Down

0 comments on commit 2845be4

Please sign in to comment.