Skip to content

Commit

Permalink
Add 'pauseConsumer' and 'resumeConsumer' to 'JetStreamManagement'
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
MauriceVanVeen committed Mar 2, 2024
1 parent d7f005b commit d8a3b85
Show file tree
Hide file tree
Showing 9 changed files with 233 additions and 0 deletions.
24 changes: 24 additions & 0 deletions src/main/java/io/nats/client/JetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.nats.client.api.*;

import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.List;

/**
Expand Down Expand Up @@ -131,6 +132,29 @@ public interface JetStreamManagement {
*/
boolean deleteConsumer(String streamName, String consumerName) throws IOException, JetStreamApiException;

/**
* Pauses a consumer.
* @param streamName name of the stream
* @param consumerName the name of the consumer.
* @param pauseUntil consumer is paused until this time.
* @return PauseResponse the pause response
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data, for instance the consumer does not exist.
*/
ConsumerPauseResponse pauseConsumer(String streamName, String consumerName, ZonedDateTime pauseUntil) throws IOException, JetStreamApiException;

/**
* Resumes a paused consumer.
* @param streamName name of the stream
* @param consumerName the name of the consumer.
* @return true if the resume succeeded
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data, for instance the consumer does not exist.
*/
boolean resumeConsumer(String streamName, String consumerName) throws IOException, JetStreamApiException;

/**
* Gets the info for an existing consumer.
* @param streamName name of the stream
Expand Down
52 changes: 52 additions & 0 deletions src/main/java/io/nats/client/api/ConsumerPauseRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client.api;

import io.nats.client.support.JsonSerializable;
import io.nats.client.support.JsonUtils;
import java.time.ZonedDateTime;

import static io.nats.client.support.ApiConstants.CONFIG;
import static io.nats.client.support.ApiConstants.PAUSE_UNTIL;
import static io.nats.client.support.ApiConstants.STREAM_NAME;
import static io.nats.client.support.JsonUtils.addField;
import static io.nats.client.support.JsonUtils.beginJson;
import static io.nats.client.support.JsonUtils.endJson;

/**
* Object used to make a request to pause a consumer. Used Internally
*/
public class ConsumerPauseRequest implements JsonSerializable {
private final ZonedDateTime pauseUntil;

public ConsumerPauseRequest(ZonedDateTime pauseUntil) {
this.pauseUntil = pauseUntil;
}

@Override
public String toJson() {
StringBuilder sb = beginJson();

addField(sb, PAUSE_UNTIL, pauseUntil);

return endJson(sb).toString();
}

@Override
public String toString() {
return "ConsumerPauseRequest{" +
"pauseUntil='" + pauseUntil + "'" +
'}';
}
}
64 changes: 64 additions & 0 deletions src/main/java/io/nats/client/api/ConsumerPauseResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client.api;

import io.nats.client.Message;
import java.time.Duration;
import java.time.ZonedDateTime;

import static io.nats.client.support.ApiConstants.PAUSED;
import static io.nats.client.support.ApiConstants.PAUSE_REMAINING;
import static io.nats.client.support.ApiConstants.PAUSE_UNTIL;
import static io.nats.client.support.JsonValueUtils.readBoolean;
import static io.nats.client.support.JsonValueUtils.readDate;
import static io.nats.client.support.JsonValueUtils.readLong;
import static io.nats.client.support.JsonValueUtils.readNanos;

public class ConsumerPauseResponse extends ApiResponse<ConsumerPauseResponse> {

private final boolean paused;
private final ZonedDateTime pauseUntil;
private final Duration pauseRemaining;

public ConsumerPauseResponse(Message msg) {
super(msg);
paused = readBoolean(jv, PAUSED);
pauseUntil = readDate(jv, PAUSE_UNTIL);
pauseRemaining = readNanos(jv, PAUSE_REMAINING);
}

/**
* Returns true if the consumer was paused
* @return whether the consumer is paused
*/
public boolean isPaused() {
return paused;
}

/**
* Returns the time until the consumer is paused
* @return pause until time
*/
public ZonedDateTime getPauseUntil() {
return pauseUntil;
}

/**
* Returns how much time is remaining for this consumer to be paused
* @return remaining paused time
*/
public Duration getPauseRemaining() {
return pauseRemaining;
}
}
27 changes: 27 additions & 0 deletions src/main/java/io/nats/client/impl/NatsJetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.ZonedDateTime;
import java.util.List;

import static io.nats.client.support.Validator.*;
Expand Down Expand Up @@ -142,6 +143,32 @@ public boolean deleteConsumer(String streamName, String consumerName) throws IOE
return new SuccessApiResponse(resp).throwOnHasError().getSuccess();
}

/**
* {@inheritDoc}
*/
@Override
public ConsumerPauseResponse pauseConsumer(String streamName, String consumerName, ZonedDateTime pauseUntil) throws IOException, JetStreamApiException {
validateNotNull(streamName, "Stream Name");
validateNotNull(consumerName, "Consumer Name");
String subj = String.format(JSAPI_CONSUMER_PAUSE, streamName, consumerName);
ConsumerPauseRequest pauseRequest = new ConsumerPauseRequest(pauseUntil);
Message resp = makeRequestResponseRequired(subj, pauseRequest.serialize(), jso.getRequestTimeout());
return new ConsumerPauseResponse(resp).throwOnHasError();
}

/**
* {@inheritDoc}
*/
@Override
public boolean resumeConsumer(String streamName, String consumerName) throws IOException, JetStreamApiException {
validateNotNull(streamName, "Stream Name");
validateNotNull(consumerName, "Consumer Name");
String subj = String.format(JSAPI_CONSUMER_PAUSE, streamName, consumerName);
Message resp = makeRequestResponseRequired(subj, null, jso.getRequestTimeout());
ConsumerPauseResponse response = new ConsumerPauseResponse(resp).throwOnHasError();
return !response.isPaused();
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public interface NatsJetStreamConstants {
// JSAPI_CONSUMER_DELETE is used to delete consumers.
String JSAPI_CONSUMER_DELETE = "CONSUMER.DELETE.%s.%s";

// JSAPI_CONSUMER_PAUSE is used to pause/resume consumers.
String JSAPI_CONSUMER_PAUSE = "CONSUMER.PAUSE.%s.%s";

// JSAPI_CONSUMER_NAMES is used to return a list of consumer names
String JSAPI_CONSUMER_NAMES = "CONSUMER.NAMES.%s";

Expand Down
22 changes: 22 additions & 0 deletions src/test/java/io/nats/client/api/ResponseTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package io.nats.client.api;

import io.nats.client.impl.JetStreamTestBase;
import io.nats.client.support.DateTimeUtils;
import java.time.Duration;
import org.junit.jupiter.api.Test;

import static io.nats.client.utils.ResourceUtils.dataAsString;
Expand All @@ -31,4 +33,24 @@ public void testPurgeResponse() {
assertEquals(5, pr.getPurgedCount()); // coverage for deprecated
assertNotNull(pr.toString()); // COVERAGE
}

@Test
public void testPauseResponse() {
String json = dataAsString("ConsumerPauseResponse.json");
ConsumerPauseResponse pr = new ConsumerPauseResponse(getDataMessage(json));
assertTrue(pr.isPaused());
assertEquals(DateTimeUtils.parseDateTime("2024-03-02T13:21:45.198423724Z"), pr.getPauseUntil());
assertEquals(Duration.ofSeconds(30), pr.getPauseRemaining());
assertNotNull(pr.toString()); // COVERAGE
}

@Test
public void testPauseResumeResponse() {
String json = dataAsString("ConsumerResumeResponse.json");
ConsumerPauseResponse pr = new ConsumerPauseResponse(getDataMessage(json));
assertFalse(pr.isPaused());
assertEquals(DateTimeUtils.parseDateTime("0001-01-01T00:00:00Z"), pr.getPauseUntil());
assertNull(pr.getPauseRemaining());
assertNotNull(pr.toString()); // COVERAGE
}
}
30 changes: 30 additions & 0 deletions src/test/java/io/nats/client/impl/JetStreamManagementTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.nats.client.api.*;
import io.nats.client.support.DateTimeUtils;
import io.nats.client.utils.TestBase;
import java.time.ZoneOffset;
import org.junit.jupiter.api.Test;

import java.io.IOException;
Expand All @@ -28,6 +29,7 @@
import java.util.List;
import java.util.Map;

import static io.nats.client.support.DateTimeUtils.ZONE_ID_GMT;
import static io.nats.client.support.NatsJetStreamConstants.*;
import static io.nats.client.utils.ResourceUtils.dataAsString;
import static org.junit.jupiter.api.Assertions.*;
Expand Down Expand Up @@ -751,6 +753,34 @@ public void testAddDeleteConsumer() throws Exception {
});
}

@Test
public void testPauseResumeConsumer() throws Exception {
runInJsServer(nc -> {
JetStreamManagement jsm = nc.jetStreamManagement();

createMemoryStream(jsm, STREAM, subjectDot(">"));

List<ConsumerInfo> list = jsm.getConsumers(STREAM);
assertEquals(0, list.size());

ConsumerConfiguration cc = ConsumerConfiguration.builder().build();

// durable and name can both be null
ConsumerInfo ci = jsm.addOrUpdateConsumer(STREAM, cc);
assertNotNull(ci.getName());

// pause consumer
ZonedDateTime pauseUntil = ZonedDateTime.now(ZONE_ID_GMT).plusSeconds(30);
ConsumerPauseResponse pauseResponse = jsm.pauseConsumer(STREAM, ci.getName(), pauseUntil);
assertTrue(pauseResponse.isPaused());
assertEquals(pauseUntil, pauseResponse.getPauseUntil());

// resume consumer
boolean isResumed = jsm.resumeConsumer(STREAM, ci.getName());
assertTrue(isResumed);
});
}

private static void addConsumer(JetStreamManagement jsm, boolean atLeast2dot9, int id, boolean deliver, String fs, ConsumerConfiguration cc) throws IOException, JetStreamApiException {
ConsumerInfo ci = jsm.addOrUpdateConsumer(STREAM, cc);
assertEquals(durable(id), ci.getName());
Expand Down
6 changes: 6 additions & 0 deletions src/test/resources/data/ConsumerPauseResponse.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "io.nats.jetstream.api.v1.consumer_pause_response",
"paused": true,
"pause_until": "2024-03-02T13:21:45.198423724Z",
"pause_remaining": 30000000000
}
5 changes: 5 additions & 0 deletions src/test/resources/data/ConsumerResumeResponse.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"type": "io.nats.jetstream.api.v1.consumer_pause_response",
"paused": false,
"pause_until": "0001-01-01T00:00:00Z"
}

0 comments on commit d8a3b85

Please sign in to comment.