From 38d8257f3e676f2094c98afd9f4f2b9cae6d9d4d Mon Sep 17 00:00:00 2001 From: uhm0311 Date: Tue, 19 Dec 2023 18:10:54 +0900 Subject: [PATCH] CLENAUP: Remove TranscoderService from reactiveAsyncGet() --- .../net/spy/memcached/ArcusClientPool.java | 6 ++ .../net/spy/memcached/MemcachedClient.java | 54 +----------------- .../reactive/ReactiveOperationFuture.java | 55 ++++++++++--------- 3 files changed, 39 insertions(+), 76 deletions(-) diff --git a/src/main/java/net/spy/memcached/ArcusClientPool.java b/src/main/java/net/spy/memcached/ArcusClientPool.java index 281bac8fd..001770e88 100644 --- a/src/main/java/net/spy/memcached/ArcusClientPool.java +++ b/src/main/java/net/spy/memcached/ArcusClientPool.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -239,6 +240,11 @@ public ReactiveOperationFuture reactiveAsyncGet(String key) { return this.getClient().reactiveAsyncGet(key); } + @Override + public CompletableFuture reactiveAsyncGet(String key, Transcoder tc) { + return this.getClient().reactiveAsyncGet(key, tc); + } + @Override public OperationFuture> asyncGets(String key, Transcoder tc) { return this.getClient().asyncGets(key, tc); diff --git a/src/main/java/net/spy/memcached/MemcachedClient.java b/src/main/java/net/spy/memcached/MemcachedClient.java index 2a84a7786..3f78975fb 100644 --- a/src/main/java/net/spy/memcached/MemcachedClient.java +++ b/src/main/java/net/spy/memcached/MemcachedClient.java @@ -35,7 +35,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; @@ -44,7 +43,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -951,65 +949,19 @@ public ReactiveOperationFuture reactiveAsyncGet(final String key, final T Operation op = opFact.get(key, new GetOperation.Callback() { private volatile T val = null; - private volatile CompletableFuture transcodeFuture = null; - - private volatile boolean completed = false; - private final AtomicInteger transcoding = new AtomicInteger(0); public void receivedStatus(OperationStatus status) { - rv.set(transcodeFuture, status); + rv.setStatus(status); } public void gotData(String k, int flags, byte[] data) { assert key.equals(k) : "Wrong key returned"; - transcoding.incrementAndGet(); - transcodeFuture = tcService.reactiveDecode(tc, new CachedData(flags, data, tc.getMaxSize())) - .whenComplete(this::whenComplete); + val = tc.decode(new CachedData(flags, data, tc.getMaxSize())); } public void complete() { - completed = true; - latch.countDown(); - - transcodeComplete(val); - } - - private void whenComplete(T v, Throwable t) { - rv.transcodeCompleted(); - - if (t != null) { - rv.completeExceptionally(t); - return; - } - - val = v; - transcoding.decrementAndGet(); - transcodeComplete(v); - } - - private void transcodeComplete(T val) { - if (transcoding.intValue() == 0 && completed) { - completeFuture(val); - } - } - - private void completeFuture(T val) { - Exception exception = null; - if (latch.getCount() > 0) { - exception = rv.createException(operationTimeout); - } else { - exception = rv.createExecutionException(); - } - - if (exception != null) { - rv.completeExceptionally(exception); - return; - } - - if (localCacheManager != null) { - localCacheManager.put(key, val); - } rv.complete(val); + latch.countDown(); } }); rv.setOperation(op); diff --git a/src/main/java/net/spy/memcached/reactive/ReactiveOperationFuture.java b/src/main/java/net/spy/memcached/reactive/ReactiveOperationFuture.java index 42722fece..faee25b6a 100644 --- a/src/main/java/net/spy/memcached/reactive/ReactiveOperationFuture.java +++ b/src/main/java/net/spy/memcached/reactive/ReactiveOperationFuture.java @@ -16,7 +16,6 @@ */ package net.spy.memcached.reactive; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -41,25 +40,18 @@ public class ReactiveOperationFuture extends SpyCompletableFuture { protected final CountDownLatch latch; - protected final AtomicReference> objRef; + protected final AtomicReference objRef; protected OperationStatus status; protected final long timeout; protected Operation op; - private final long createdAt = System.currentTimeMillis(); - private long transcodeCompletedAt = 0; + private Long operationSetAt = null; + private Long completedAt = null; public ReactiveOperationFuture(CountDownLatch l, long opTimeout) { - this(l, new AtomicReference<>(null), opTimeout); - } - - public ReactiveOperationFuture(CountDownLatch l, - AtomicReference> oref, - long opTimeout) { - super(); - latch = l; - objRef = oref; - timeout = opTimeout; + this.latch = l; + this.timeout = opTimeout; + this.objRef = new AtomicReference<>(null); } public boolean cancel(boolean ign) { @@ -80,6 +72,7 @@ public T get() throws InterruptedException, ExecutionException { public T get(long duration, TimeUnit units) throws InterruptedException, TimeoutException, ExecutionException { + Exception exception = createException( !latch.await(duration, units), TimeUnit.MILLISECONDS.convert(duration, units)); @@ -90,7 +83,7 @@ public T get(long duration, TimeUnit units) throw (ExecutionException) exception; } - return objRef.get().get(); + return objRef.get(); } public OperationStatus getStatus() { @@ -106,17 +99,33 @@ public OperationStatus getStatus() { return status; } - public void set(CompletableFuture value, OperationStatus s) { - objRef.set(value); + public void setStatus(OperationStatus s) { status = s; } public void setOperation(Operation to) { + if (operationSetAt == null) { + operationSetAt = System.currentTimeMillis(); + } + op = to; } - public Exception createException(long timeoutMillis) { - return createException(false, timeoutMillis); + @Override + public boolean complete(T value) { + if (completedAt == null) { + completedAt = System.currentTimeMillis(); + } + + objRef.set(value); + Exception exception = createException(false, timeout); + + if (exception != null) { + this.completeExceptionally(exception); + return false; + } + + return super.complete(value); } private Exception createException(boolean forceTimeout, long timeoutMillis) { @@ -128,7 +137,7 @@ private Exception createException(boolean forceTimeout, long timeoutMillis) { } private TimeoutException createTimeoutException(boolean forceTimeout, long timeoutMillis) { - if (forceTimeout || transcodeCompletedAt - createdAt > timeoutMillis) { + if (forceTimeout || completedAt - operationSetAt > timeoutMillis) { // whenever timeout occurs, continuous timeout counter will increase by 1. MemcachedConnection.opTimedOut(op); return new CheckedOperationTimeoutException(timeoutMillis, TimeUnit.MILLISECONDS, op); @@ -139,7 +148,7 @@ private TimeoutException createTimeoutException(boolean forceTimeout, long timeo return null; } - public ExecutionException createExecutionException() { + private ExecutionException createExecutionException() { if (op != null && op.hasErrored()) { return new ExecutionException(op.getException()); } @@ -149,10 +158,6 @@ public ExecutionException createExecutionException() { return null; } - public void transcodeCompleted() { - transcodeCompletedAt = System.currentTimeMillis(); - } - public boolean isCancelled() { assert op != null : "No operation"; return op.isCancelled();