Skip to content

Commit

Permalink
ISPN-16750 [RESP] BLPOP should not block on empty list within multi/e…
Browse files Browse the repository at this point in the history
…xec block

* Set a flag on Netty's channel context to identify if executing from a
  transaction context.
  • Loading branch information
jabolina authored and tristantarrant committed Oct 16, 2024
1 parent 40d00b1 commit 56bf1bd
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@
import org.infinispan.server.resp.filter.EventListenerConverter;
import org.infinispan.server.resp.filter.EventListenerKeysFilter;
import org.infinispan.server.resp.logging.Log;
import org.infinispan.server.resp.tx.TransactionContext;

import io.netty.channel.ChannelHandlerContext;

/**
* @link https://redis.io/commands/blpop/
* Derogating to the above documentation, when multiple client are blocked
* on a BLPOP, the order in which they will be served is unspecified.
* Derogating to the command documentation, when multiple client are blocked
* on a BLPOP, the order in which they will be served is unspecified.
*
* @since 15.0
* @see <a href="https://redis.io/commands/blpop/">Redis documentation</a>
*/
public abstract class AbstractBlockingPop extends RespCommand implements Resp3Command {
private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass(), Log.class);
Expand All @@ -60,6 +62,13 @@ public CompletionStage<RespRequestHandler> perform(Resp3Handler handler,
// If all the keys are empty or null, create a listener
// otherwise return the left value of the first non empty list
var pollStage = pollAllKeys(listMultimap, configuration);

// Running blocking pop from EXEC should not block.
// In this case, we just return whatever the polling has returned and do not install the listener.
if (TransactionContext.isInTransactionContext(ctx)) {
return handler.stageToReturn(pollStage, ctx, Consumers.COLLECTION_BULK_BICONSUMER);
}

// If no value returned, we need subscribers
return handler.stageToReturn(pollStage.thenCompose(v -> {
// addSubscriber call can rise exception that needs to be reported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.infinispan.AdvancedCache;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.server.resp.Consumers;
import org.infinispan.server.resp.Resp3Handler;
import org.infinispan.server.resp.RespCommand;
Expand All @@ -16,7 +17,7 @@
import org.infinispan.server.resp.commands.TransactionResp3Command;
import org.infinispan.server.resp.tx.RespTransactionHandler;
import org.infinispan.server.resp.tx.TransactionCommand;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.server.resp.tx.TransactionContext;

import io.netty.channel.ChannelHandlerContext;

Expand Down Expand Up @@ -85,11 +86,16 @@ private CompletionStage<?> perform(List<TransactionCommand> commands, RespTransa
cache.startBatch();
}
return CompletableFuture.supplyAsync(() -> {
// Mark the commands are executing from within a transaction context.
TransactionContext.startTransactionContext(ctx);

Resp3Handler.writeArrayPrefix(commands.size(), curr.allocator());
return orderlyExecution(next, ctx, commands, 0, CompletableFutures.completedNull())
.whenComplete((ignore, t) -> {
if (batchEnabled)
cache.endBatch(true);

TransactionContext.endTransactionContext(ctx);
});
}, ctx.executor()).thenCompose(Function.identity());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.infinispan.server.resp.tx;

import io.netty.channel.ChannelHandlerContext;
import io.netty.util.AttributeKey;

/**
* Delimit the context of commands run by an {@link org.infinispan.server.resp.commands.tx.EXEC} command.
* <p>
* In a transaction execution, the commands are queued and executed (in order) after receiving an {@link org.infinispan.server.resp.commands.tx.EXEC}
* command. This context provides delimitation of the transaction execution so commands are aware whether they are running
* from inside a transaction.
* </p>
*
* @since 15.1
*/
public final class TransactionContext {

private static final AttributeKey<Boolean> TRANSACTIONAL_CONTEXT = AttributeKey.newInstance("multi-exec");

private TransactionContext() { }

/**
* Start the transaction context.
*
* @param ctx The client context executing the transaction.
* @throws IllegalStateException in case another context is in place.
*/
public static void startTransactionContext(ChannelHandlerContext ctx) {
Boolean existing = ctx.channel().attr(TRANSACTIONAL_CONTEXT).setIfAbsent(Boolean.TRUE);
if (existing != null)
throw new IllegalStateException("Nested transaction context");
}

/**
* Finish the transaction context.
*
* @param ctx The client context executing the transaction.
* @throws IllegalStateException in case no transaction context is in place.
*/
public static void endTransactionContext(ChannelHandlerContext ctx) {
Boolean existing = ctx.channel().attr(TRANSACTIONAL_CONTEXT).getAndSet(null);
if (existing == null)
throw new IllegalStateException("Not transaction context to remove");
}

/**
* Verify whether the current client is in a transactional context.
*
* @param ctx The client context to verify.
* @return <code>true</code> if running from a transaction, <code>false</code>, otherwise.
*/
public static boolean isInTransactionContext(ChannelHandlerContext ctx) {
return Boolean.TRUE.equals(ctx.channel().attr(TRANSACTIONAL_CONTEXT).get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.testng.annotations.Test;

import io.lettuce.core.KeyValue;
import io.lettuce.core.RedisCommandExecutionException;
import io.lettuce.core.TransactionResult;
import io.lettuce.core.api.StatefulRedisConnection;
Expand Down Expand Up @@ -258,4 +259,43 @@ public void testDiscardRemoveListeners() {
assertThat(redisConnection.isMulti()).isFalse();
assertThat(redis.get("tx-discard-k2")).isEqualTo("value-inside");
}

public void testBlpopNotBlocking() {
RedisCommands<String, String> redis = redisConnection.sync();

String key = k();
String v0 = v();
String v1 = v(1);

// Add two entries to the list before starting the TX.
assertThat(redis.lpush(key, v0, v1)).isEqualTo(2);

assertThat(redis.multi()).isEqualTo(OK);
assertThat(redisConnection.isMulti()).isTrue();

// Pop 3 values from the list without any timeout.
redis.blpop(0, key);
redis.blpop(0, key);
redis.blpop(0, key);

// Execute transaction, the command should not block.
TransactionResult result = redis.exec();
assertThat(result.wasDiscarded()).isFalse();
assertThat(result).hasSize(3);

assertThat((Object) result.get(0))
.isInstanceOfSatisfying(KeyValue.class, kv -> {
assertThat(kv.getKey()).isEqualTo(key);
assertThat(kv.getValue()).isEqualTo(v1);
});

assertThat((Object) result.get(1))
.isInstanceOfSatisfying(KeyValue.class, kv -> {
assertThat(kv.getKey()).isEqualTo(key);
assertThat(kv.getValue()).isEqualTo(v0);
});

// Third pop returns null as there are no more values.
assertThat((Object) result.get(2)).isNull();
}
}

0 comments on commit 56bf1bd

Please sign in to comment.