diff --git a/server/resp/src/main/java/org/infinispan/server/resp/commands/list/blocking/AbstractBlockingPop.java b/server/resp/src/main/java/org/infinispan/server/resp/commands/list/blocking/AbstractBlockingPop.java index 572f0ef0bf43..cc5f36b63ed3 100644 --- a/server/resp/src/main/java/org/infinispan/server/resp/commands/list/blocking/AbstractBlockingPop.java +++ b/server/resp/src/main/java/org/infinispan/server/resp/commands/list/blocking/AbstractBlockingPop.java @@ -29,14 +29,16 @@ import org.infinispan.server.resp.filter.EventListenerConverter; import org.infinispan.server.resp.filter.EventListenerKeysFilter; import org.infinispan.server.resp.logging.Log; +import org.infinispan.server.resp.tx.TransactionContext; import io.netty.channel.ChannelHandlerContext; /** - * @link https://redis.io/commands/blpop/ - * Derogating to the above documentation, when multiple client are blocked - * on a BLPOP, the order in which they will be served is unspecified. + * Derogating to the command documentation, when multiple client are blocked + * on a BLPOP, the order in which they will be served is unspecified. + * * @since 15.0 + * @see Redis documentation */ public abstract class AbstractBlockingPop extends RespCommand implements Resp3Command { private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass(), Log.class); @@ -60,6 +62,13 @@ public CompletionStage perform(Resp3Handler handler, // If all the keys are empty or null, create a listener // otherwise return the left value of the first non empty list var pollStage = pollAllKeys(listMultimap, configuration); + + // Running blocking pop from EXEC should not block. + // In this case, we just return whatever the polling has returned and do not install the listener. + if (TransactionContext.isInTransactionContext(ctx)) { + return handler.stageToReturn(pollStage, ctx, Consumers.COLLECTION_BULK_BICONSUMER); + } + // If no value returned, we need subscribers return handler.stageToReturn(pollStage.thenCompose(v -> { // addSubscriber call can rise exception that needs to be reported diff --git a/server/resp/src/main/java/org/infinispan/server/resp/commands/tx/EXEC.java b/server/resp/src/main/java/org/infinispan/server/resp/commands/tx/EXEC.java index fa171295f0c1..d5b4ae8fd102 100644 --- a/server/resp/src/main/java/org/infinispan/server/resp/commands/tx/EXEC.java +++ b/server/resp/src/main/java/org/infinispan/server/resp/commands/tx/EXEC.java @@ -7,6 +7,7 @@ import org.infinispan.AdvancedCache; import org.infinispan.commons.util.concurrent.CompletableFutures; +import org.infinispan.commons.util.concurrent.CompletionStages; import org.infinispan.server.resp.Consumers; import org.infinispan.server.resp.Resp3Handler; import org.infinispan.server.resp.RespCommand; @@ -16,7 +17,7 @@ import org.infinispan.server.resp.commands.TransactionResp3Command; import org.infinispan.server.resp.tx.RespTransactionHandler; import org.infinispan.server.resp.tx.TransactionCommand; -import org.infinispan.commons.util.concurrent.CompletionStages; +import org.infinispan.server.resp.tx.TransactionContext; import io.netty.channel.ChannelHandlerContext; @@ -85,11 +86,16 @@ private CompletionStage perform(List commands, RespTransa cache.startBatch(); } return CompletableFuture.supplyAsync(() -> { + // Mark the commands are executing from within a transaction context. + TransactionContext.startTransactionContext(ctx); + Resp3Handler.writeArrayPrefix(commands.size(), curr.allocator()); return orderlyExecution(next, ctx, commands, 0, CompletableFutures.completedNull()) .whenComplete((ignore, t) -> { if (batchEnabled) cache.endBatch(true); + + TransactionContext.endTransactionContext(ctx); }); }, ctx.executor()).thenCompose(Function.identity()); } diff --git a/server/resp/src/main/java/org/infinispan/server/resp/tx/TransactionContext.java b/server/resp/src/main/java/org/infinispan/server/resp/tx/TransactionContext.java new file mode 100644 index 000000000000..b230c57c8d3d --- /dev/null +++ b/server/resp/src/main/java/org/infinispan/server/resp/tx/TransactionContext.java @@ -0,0 +1,55 @@ +package org.infinispan.server.resp.tx; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.AttributeKey; + +/** + * Delimit the context of commands run by an {@link org.infinispan.server.resp.commands.tx.EXEC} command. + *

+ * In a transaction execution, the commands are queued and executed (in order) after receiving an {@link org.infinispan.server.resp.commands.tx.EXEC} + * command. This context provides delimitation of the transaction execution so commands are aware whether they are running + * from inside a transaction. + *

+ * + * @since 15.1 + */ +public final class TransactionContext { + + private static final AttributeKey TRANSACTIONAL_CONTEXT = AttributeKey.newInstance("multi-exec"); + + private TransactionContext() { } + + /** + * Start the transaction context. + * + * @param ctx The client context executing the transaction. + * @throws IllegalStateException in case another context is in place. + */ + public static void startTransactionContext(ChannelHandlerContext ctx) { + Boolean existing = ctx.channel().attr(TRANSACTIONAL_CONTEXT).setIfAbsent(Boolean.TRUE); + if (existing != null) + throw new IllegalStateException("Nested transaction context"); + } + + /** + * Finish the transaction context. + * + * @param ctx The client context executing the transaction. + * @throws IllegalStateException in case no transaction context is in place. + */ + public static void endTransactionContext(ChannelHandlerContext ctx) { + Boolean existing = ctx.channel().attr(TRANSACTIONAL_CONTEXT).getAndSet(null); + if (existing == null) + throw new IllegalStateException("Not transaction context to remove"); + } + + /** + * Verify whether the current client is in a transactional context. + * + * @param ctx The client context to verify. + * @return true if running from a transaction, false, otherwise. + */ + public static boolean isInTransactionContext(ChannelHandlerContext ctx) { + return Boolean.TRUE.equals(ctx.channel().attr(TRANSACTIONAL_CONTEXT).get()); + } +} diff --git a/server/resp/src/test/java/org/infinispan/server/resp/TransactionOperationsTest.java b/server/resp/src/test/java/org/infinispan/server/resp/TransactionOperationsTest.java index 8589ec6a2646..c458b71efeb0 100644 --- a/server/resp/src/test/java/org/infinispan/server/resp/TransactionOperationsTest.java +++ b/server/resp/src/test/java/org/infinispan/server/resp/TransactionOperationsTest.java @@ -11,6 +11,7 @@ import org.infinispan.configuration.cache.ConfigurationBuilder; import org.testng.annotations.Test; +import io.lettuce.core.KeyValue; import io.lettuce.core.RedisCommandExecutionException; import io.lettuce.core.TransactionResult; import io.lettuce.core.api.StatefulRedisConnection; @@ -258,4 +259,43 @@ public void testDiscardRemoveListeners() { assertThat(redisConnection.isMulti()).isFalse(); assertThat(redis.get("tx-discard-k2")).isEqualTo("value-inside"); } + + public void testBlpopNotBlocking() { + RedisCommands redis = redisConnection.sync(); + + String key = k(); + String v0 = v(); + String v1 = v(1); + + // Add two entries to the list before starting the TX. + assertThat(redis.lpush(key, v0, v1)).isEqualTo(2); + + assertThat(redis.multi()).isEqualTo(OK); + assertThat(redisConnection.isMulti()).isTrue(); + + // Pop 3 values from the list without any timeout. + redis.blpop(0, key); + redis.blpop(0, key); + redis.blpop(0, key); + + // Execute transaction, the command should not block. + TransactionResult result = redis.exec(); + assertThat(result.wasDiscarded()).isFalse(); + assertThat(result).hasSize(3); + + assertThat((Object) result.get(0)) + .isInstanceOfSatisfying(KeyValue.class, kv -> { + assertThat(kv.getKey()).isEqualTo(key); + assertThat(kv.getValue()).isEqualTo(v1); + }); + + assertThat((Object) result.get(1)) + .isInstanceOfSatisfying(KeyValue.class, kv -> { + assertThat(kv.getKey()).isEqualTo(key); + assertThat(kv.getValue()).isEqualTo(v0); + }); + + // Third pop returns null as there are no more values. + assertThat((Object) result.get(2)).isNull(); + } }