Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CLENAUP: Remove TranscoderService from reactiveAsyncGet() #700

Open
wants to merge 1 commit into
base: reactive
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/main/java/net/spy/memcached/ArcusClientPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -239,6 +240,11 @@ public ReactiveOperationFuture<Object> reactiveAsyncGet(String key) {
return this.getClient().reactiveAsyncGet(key);
}

@Override
public <T> CompletableFuture<T> reactiveAsyncGet(String key, Transcoder<T> tc) {
return this.getClient().reactiveAsyncGet(key, tc);
}

@Override
public <T> OperationFuture<CASValue<T>> asyncGets(String key, Transcoder<T> tc) {
return this.getClient().asyncGets(key, tc);
Expand Down
54 changes: 3 additions & 51 deletions src/main/java/net/spy/memcached/MemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -44,7 +43,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -951,65 +949,19 @@ public <T> ReactiveOperationFuture<T> reactiveAsyncGet(final String key, final T

Operation op = opFact.get(key, new GetOperation.Callback() {
private volatile T val = null;
private volatile CompletableFuture<T> transcodeFuture = null;

private volatile boolean completed = false;
private final AtomicInteger transcoding = new AtomicInteger(0);

public void receivedStatus(OperationStatus status) {
rv.set(transcodeFuture, status);
rv.setStatus(status);
}

public void gotData(String k, int flags, byte[] data) {
assert key.equals(k) : "Wrong key returned";
transcoding.incrementAndGet();
transcodeFuture = tcService.reactiveDecode(tc, new CachedData(flags, data, tc.getMaxSize()))
.whenComplete(this::whenComplete);
val = tc.decode(new CachedData(flags, data, tc.getMaxSize()));
}

public void complete() {
completed = true;
latch.countDown();

transcodeComplete(val);
}

private void whenComplete(T v, Throwable t) {
rv.transcodeCompleted();

if (t != null) {
rv.completeExceptionally(t);
return;
}

val = v;
transcoding.decrementAndGet();
transcodeComplete(v);
}

private void transcodeComplete(T val) {
if (transcoding.intValue() == 0 && completed) {
completeFuture(val);
}
}

private void completeFuture(T val) {
Exception exception = null;
if (latch.getCount() > 0) {
exception = rv.createException(operationTimeout);
} else {
exception = rv.createExecutionException();
}

if (exception != null) {
rv.completeExceptionally(exception);
return;
}

if (localCacheManager != null) {
localCacheManager.put(key, val);
}
rv.complete(val);
latch.countDown();
}
});
rv.setOperation(op);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package net.spy.memcached.reactive;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -41,25 +40,18 @@
public class ReactiveOperationFuture<T> extends SpyCompletableFuture<T> {

protected final CountDownLatch latch;
protected final AtomicReference<CompletableFuture<T>> objRef;
protected final AtomicReference<T> objRef;
protected OperationStatus status;
protected final long timeout;
protected Operation op;

private final long createdAt = System.currentTimeMillis();
private long transcodeCompletedAt = 0;
private Long operationSetAt = null;
private Long completedAt = null;

public ReactiveOperationFuture(CountDownLatch l, long opTimeout) {
this(l, new AtomicReference<>(null), opTimeout);
}

public ReactiveOperationFuture(CountDownLatch l,
AtomicReference<CompletableFuture<T>> oref,
long opTimeout) {
super();
latch = l;
objRef = oref;
timeout = opTimeout;
this.latch = l;
this.timeout = opTimeout;
this.objRef = new AtomicReference<>(null);
}

public boolean cancel(boolean ign) {
Expand All @@ -80,6 +72,7 @@ public T get() throws InterruptedException, ExecutionException {

public T get(long duration, TimeUnit units)
throws InterruptedException, TimeoutException, ExecutionException {

Exception exception = createException(
!latch.await(duration, units), TimeUnit.MILLISECONDS.convert(duration, units));

Expand All @@ -90,7 +83,7 @@ public T get(long duration, TimeUnit units)
throw (ExecutionException) exception;
}

return objRef.get().get();
return objRef.get();
}

public OperationStatus getStatus() {
Expand All @@ -106,17 +99,33 @@ public OperationStatus getStatus() {
return status;
}

public void set(CompletableFuture<T> value, OperationStatus s) {
objRef.set(value);
public void setStatus(OperationStatus s) {
status = s;
}

public void setOperation(Operation to) {
if (operationSetAt == null) {
operationSetAt = System.currentTimeMillis();
}

op = to;
}

public Exception createException(long timeoutMillis) {
return createException(false, timeoutMillis);
@Override
public boolean complete(T value) {
if (completedAt == null) {
completedAt = System.currentTimeMillis();
}

objRef.set(value);
Exception exception = createException(false, timeout);

if (exception != null) {
this.completeExceptionally(exception);
return false;
}

return super.complete(value);
}

private Exception createException(boolean forceTimeout, long timeoutMillis) {
Expand All @@ -128,7 +137,7 @@ private Exception createException(boolean forceTimeout, long timeoutMillis) {
}

private TimeoutException createTimeoutException(boolean forceTimeout, long timeoutMillis) {
if (forceTimeout || transcodeCompletedAt - createdAt > timeoutMillis) {
if (forceTimeout || completedAt - operationSetAt > timeoutMillis) {
// whenever timeout occurs, continuous timeout counter will increase by 1.
MemcachedConnection.opTimedOut(op);
return new CheckedOperationTimeoutException(timeoutMillis, TimeUnit.MILLISECONDS, op);
Expand All @@ -139,7 +148,7 @@ private TimeoutException createTimeoutException(boolean forceTimeout, long timeo
return null;
}

public ExecutionException createExecutionException() {
private ExecutionException createExecutionException() {
if (op != null && op.hasErrored()) {
return new ExecutionException(op.getException());
}
Expand All @@ -149,10 +158,6 @@ public ExecutionException createExecutionException() {
return null;
}

public void transcodeCompleted() {
transcodeCompletedAt = System.currentTimeMillis();
}

public boolean isCancelled() {
assert op != null : "No operation";
return op.isCancelled();
Expand Down