Skip to content

Commit

Permalink
INTERNAL: Refactor api using broadcast operation.
Browse files Browse the repository at this point in the history
  • Loading branch information
brido4125 committed Aug 1, 2023
1 parent 6998801 commit d1b7be0
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 306 deletions.
95 changes: 13 additions & 82 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.jar.JarFile;
import java.util.jar.Manifest;
Expand Down Expand Up @@ -127,6 +126,7 @@
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.internal.SMGetFuture;
import net.spy.memcached.internal.PipedCollectionFuture;
import net.spy.memcached.internal.BroadcastFuture;
import net.spy.memcached.ops.BTreeFindPositionOperation;
import net.spy.memcached.ops.BTreeFindPositionWithGetOperation;
import net.spy.memcached.ops.BTreeGetBulkOperation;
Expand Down Expand Up @@ -1914,97 +1914,28 @@ public OperationFuture<Boolean> flush(final String prefix) {

@Override
public OperationFuture<Boolean> flush(final String prefix, final int delay) {
final AtomicReference<Boolean> flushResult = new AtomicReference<Boolean>(null);
final ConcurrentLinkedQueue<Operation> ops = new ConcurrentLinkedQueue<Operation>();

final CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() {
public Operation newOp(final MemcachedNode n,
final CountDownLatch latch) {
Collection<MemcachedNode> nodes = getAllNodes();
final BroadcastFuture<Boolean> rv
= new BroadcastFuture<Boolean>(operationTimeout, !nodes.isEmpty(), nodes.size());
broadcastOp(new BroadcastOpFactory() {
public Operation newOp(final MemcachedNode n) {
Operation op = opFact.flush(prefix, delay, false,
new OperationCallback() {
public void receivedStatus(OperationStatus s) {
flushResult.set(s.isSuccess());
if (!s.isSuccess()) {
rv.setResult(s.isSuccess());
}
}

public void complete() {
latch.countDown();
rv.complete();
}
});
ops.add(op);
rv.addOp(op);
return op;
}
});

return new OperationFuture<Boolean>(blatch, flushResult,
operationTimeout) {
@Override
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
rv = true;
op.cancel("by application.");
}
}
return rv;
}

@Override
public boolean isCancelled() {
for (Operation op : ops) {
if (op.isCancelled()) {
return true;
}
}
return false;
}

@Override
public Boolean get(long duration, TimeUnit units)
throws InterruptedException, TimeoutException, ExecutionException {

if (!blatch.await(duration, units)) {
// whenever timeout occurs, continuous timeout counter will increase by 1.
Collection<Operation> timedoutOps = new HashSet<Operation>();
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
MemcachedConnection.opTimedOut(op);
timedoutOps.add(op);
} else {
MemcachedConnection.opSucceeded(op);
}
}
if (timedoutOps.size() > 0) {
throw new CheckedOperationTimeoutException(duration, units, timedoutOps);
}
} else {
// continuous timeout counter will be reset
MemcachedConnection.opsSucceeded(ops);
}

for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
}

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
}
}

return flushResult.get();
}

@Override
public boolean isDone() {
for (Operation op : ops) {
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
return false;
}
}
return true;
}
};
}, nodes);
return rv;
}

@Override
Expand Down
4 changes: 1 addition & 3 deletions src/main/java/net/spy/memcached/BroadcastOpFactory.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package net.spy.memcached;

import java.util.concurrent.CountDownLatch;

import net.spy.memcached.ops.Operation;

/**
Expand All @@ -13,5 +11,5 @@ public interface BroadcastOpFactory {
* Construct a new operation for delivery to the given node.
* Each operation should count the given latch down upon completion.
*/
Operation newOp(MemcachedNode n, CountDownLatch latch);
Operation newOp(MemcachedNode n);
}
Loading

0 comments on commit d1b7be0

Please sign in to comment.