diff --git a/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumer.java b/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumer.java index 8eea206c9a..966c3372ff 100644 --- a/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumer.java +++ b/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumer.java @@ -204,10 +204,11 @@ public Future start() { } private void createReceiver() { - receiver = (PubsubMessage message, AckReplyConsumer consumer) -> handleCommandMessage(message); + receiver = this::handleCommandMessage; } - Future handleCommandMessage(final PubsubMessage message) { + Future handleCommandMessage(final PubsubMessage message, final AckReplyConsumer consumer) { + consumer.ack(); final PubSubBasedCommand command; try { command = PubSubBasedCommand.fromRoutedCommandMessage(message); diff --git a/clients/command-pubsub/src/test/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumerTest.java b/clients/command-pubsub/src/test/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumerTest.java index 03db73bcd9..98b4ff625a 100644 --- a/clients/command-pubsub/src/test/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumerTest.java +++ b/clients/command-pubsub/src/test/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumerTest.java @@ -17,6 +17,7 @@ import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -51,6 +52,7 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.pubsub.v1.PubsubMessage; @@ -81,6 +83,8 @@ public class PubSubBasedInternalCommandConsumerTest { private TenantClient tenantClient; + private AckReplyConsumer ackReplyConsumer; + private PubSubBasedInternalCommandConsumer internalCommandConsumer; @BeforeEach @@ -91,6 +95,9 @@ void setUp() { final Tracer tracer = TracingMockSupport.mockTracer(TracingMockSupport.mockSpan()); commandHandlers = new CommandHandlers(); tenantClient = mock(TenantClient.class); + ackReplyConsumer = mock(AckReplyConsumer.class); + + doNothing().when(ackReplyConsumer).ack(); doAnswer(invocation -> { final String tenantId = invocation.getArgument(0); @@ -109,7 +116,7 @@ void setUp() { .thenReturn(Future.succeededFuture(topicAndSubscription)); when(adminClientManager .getOrCreateSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId)) - .thenReturn(Future.succeededFuture(topicAndSubscription)); + .thenReturn(Future.succeededFuture(topicAndSubscription)); subscriber = mock(PubSubSubscriberClient.class); when(subscriber.subscribe(true)).thenReturn(Future.succeededFuture()); @@ -156,7 +163,8 @@ public void testHandleCommandMessageSendErrorResponse() { Future.failedFuture( StatusCodeMapper.from(HttpURLConnection.HTTP_UNAVAILABLE, "failed to retrieve tenant"))); - internalCommandConsumer.handleCommandMessage(message); + internalCommandConsumer.handleCommandMessage(message, ackReplyConsumer); + verify(ackReplyConsumer).ack(); verify(commandHandler, never()).apply(any(PubSubBasedCommandContext.class)); verify(commandResponseSender).sendCommandResponse( argThat(t -> t.getTenantId().equals(tenantId)), @@ -166,7 +174,8 @@ public void testHandleCommandMessageSendErrorResponse() { } /** - * Verifies that the consumer handles an invalid command message with missing subject by invoking the matching handler. + * Verifies that the consumer handles an invalid command message with missing subject by invoking the matching + * handler. */ @Test public void testHandleCommandMessageWithInvalidAttribute() { @@ -175,10 +184,11 @@ public void testHandleCommandMessageWithInvalidAttribute() { final Context context = VertxMockSupport.mockContext(mock(Vertx.class)); commandHandlers.putCommandHandler(tenantId, deviceId, null, commandHandler, context); - internalCommandConsumer.handleCommandMessage(message); + internalCommandConsumer.handleCommandMessage(message, ackReplyConsumer); final ArgumentCaptor commandContextArgumentCaptor = ArgumentCaptor .forClass(CommandContext.class); + verify(ackReplyConsumer).ack(); verify(commandHandler).apply(commandContextArgumentCaptor.capture()); assertThat(commandContextArgumentCaptor.getValue()).isNotNull(); assertThat(commandContextArgumentCaptor.getValue().getCommand().isValid()).isFalse(); @@ -196,10 +206,11 @@ public void testHandleCommandMessageWithHandlerForDevice() { final Context context = VertxMockSupport.mockContext(mock(Vertx.class)); commandHandlers.putCommandHandler(tenantId, deviceId, null, commandHandler, context); - internalCommandConsumer.handleCommandMessage(message); + internalCommandConsumer.handleCommandMessage(message, ackReplyConsumer); final ArgumentCaptor commandContextArgumentCaptor = ArgumentCaptor .forClass(CommandContext.class); + verify(ackReplyConsumer).ack(); verify(commandHandler).apply(commandContextArgumentCaptor.capture()); assertThat(commandContextArgumentCaptor.getValue()).isNotNull(); assertThat(commandContextArgumentCaptor.getValue().getCommand().isValid()).isTrue(); @@ -213,7 +224,8 @@ public void testHandleCommandMessageWithNoHandlerForDevice() { final PubsubMessage message = getPubSubMessage(subject, "true"); final Function> commandHandler = mock(Function.class); - internalCommandConsumer.handleCommandMessage(message); + internalCommandConsumer.handleCommandMessage(message, ackReplyConsumer); + verify(ackReplyConsumer).ack(); verify(commandHandler, never()).apply(any(PubSubBasedCommandContext.class)); verify(commandResponseSender).sendCommandResponse( argThat(t -> t.getTenantId().equals(tenantId)), @@ -223,8 +235,8 @@ public void testHandleCommandMessageWithNoHandlerForDevice() { } /** - * Verifies that a failed future is returned if an IllegalArgumentException is thrown when no command can be - * created by the PubSubMessage. + * Verifies that a failed future is returned if an IllegalArgumentException is thrown when no command can be created + * by the PubSubMessage. */ @Test public void testHandleCommandMessageFailedWhenMessageContainsNoAttributes() { @@ -233,8 +245,9 @@ public void testHandleCommandMessageFailedWhenMessageContainsNoAttributes() { final Context context = VertxMockSupport.mockContext(mock(Vertx.class)); commandHandlers.putCommandHandler(tenantId, deviceId, null, commandHandler, context); - final Future result = internalCommandConsumer.handleCommandMessage(message); + final Future result = internalCommandConsumer.handleCommandMessage(message, ackReplyConsumer); assertThat(result.failed()).isTrue(); + verify(ackReplyConsumer).ack(); } private PubsubMessage getPubSubMessage(final String subject, final String responseRequired) { @@ -246,7 +259,8 @@ private PubsubMessage getPubSubMessage(final String subject, final String respon Optional.ofNullable(subject) .ifPresent(ok -> attributes.put(MessageHelper.SYS_PROPERTY_SUBJECT, subject)); Optional.ofNullable(responseRequired) - .ifPresent(ok -> attributes.put(PubSubMessageHelper.PUBSUB_PROPERTY_RESPONSE_REQUIRED, responseRequired)); + .ifPresent( + ok -> attributes.put(PubSubMessageHelper.PUBSUB_PROPERTY_RESPONSE_REQUIRED, responseRequired)); return PubsubMessage.newBuilder().putAllAttributes(attributes).build(); }