Skip to content

Commit

Permalink
Implement Consumers Pause (#1093)
Browse files Browse the repository at this point in the history
  • Loading branch information
MauriceVanVeen authored Mar 7, 2024
1 parent 1cbd2e8 commit 6c5ae3f
Show file tree
Hide file tree
Showing 19 changed files with 364 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
import io.nats.client.Nats;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.api.ConsumerPauseResponse;
import io.nats.examples.ExampleArgs;
import io.nats.examples.ExampleUtils;

import java.time.ZonedDateTime;
import java.util.List;

import static io.nats.client.support.DateTimeUtils.ZONE_ID_GMT;
import static io.nats.examples.jetstream.NatsJsUtils.*;

/**
Expand Down Expand Up @@ -82,16 +85,30 @@ public static void main(String[] args) {
List<String> consumerNames = jsm.getConsumerNames(exArgs.stream);
printObject(consumerNames);

// 4. Delete a consumer, then list them again
// 4. Pause a consumer
System.out.println("\n----------\n4. pauseConsumer");
ZonedDateTime pauseUntil = ZonedDateTime.now(ZONE_ID_GMT).plusSeconds(30);
ConsumerPauseResponse pauseResponse = jsm.pauseConsumer(exArgs.stream, durable1, pauseUntil);
printObject(pauseResponse);
ci = jsm.getConsumerInfo(exArgs.stream, durable1);
printObject(ci);

// 5. Resume a (paused) consumer
System.out.println("\n----------\n5. resumeConsumer");
jsm.resumeConsumer(exArgs.stream, durable1);
ci = jsm.getConsumerInfo(exArgs.stream, durable1);
printObject(ci);

// 6. Delete a consumer, then list them again
// Subsequent calls to deleteStream will throw a
// JetStreamApiException [10014]
System.out.println("\n----------\n3. Delete consumers");
System.out.println("\n----------\n6. Delete consumers");
jsm.deleteConsumer(exArgs.stream, durable1);
consumerNames = jsm.getConsumerNames(exArgs.stream);
printObject(consumerNames);

// 5. Try to delete the consumer again and get the exception
System.out.println("\n----------\n5. Delete consumer again");
// 7. Try to delete the consumer again and get the exception
System.out.println("\n----------\n7. Delete consumer again");
try
{
jsm.deleteConsumer(exArgs.stream, durable1);
Expand All @@ -101,6 +118,17 @@ public static void main(String[] args) {
System.out.println("Exception was: '" + e.getMessage() + "'");
}

// 8. Try to pause a consumer that does not exist, and you will get an exception
System.out.println("\n----------\n8. Pause non-existent consumer .");
try
{
jsm.pauseConsumer(exArgs.stream, durable1, ZonedDateTime.now());
}
catch (JetStreamApiException e)
{
System.out.println("Exception was: '" + e.getMessage() + "'");
}

System.out.println("\n----------");

// delete the stream since we are done with it.
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/io/nats/client/JetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.nats.client.api.*;

import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.List;

/**
Expand Down Expand Up @@ -131,6 +132,29 @@ public interface JetStreamManagement {
*/
boolean deleteConsumer(String streamName, String consumerName) throws IOException, JetStreamApiException;

/**
* Pauses a consumer.
* @param streamName name of the stream
* @param consumerName the name of the consumer.
* @param pauseUntil consumer is paused until this time.
* @return ConsumerPauseResponse the pause response
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data, for instance the consumer does not exist.
*/
ConsumerPauseResponse pauseConsumer(String streamName, String consumerName, ZonedDateTime pauseUntil) throws IOException, JetStreamApiException;

/**
* Resumes a paused consumer.
* @param streamName name of the stream
* @param consumerName the name of the consumer.
* @return true if the resume succeeded
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data, for instance the consumer does not exist.
*/
boolean resumeConsumer(String streamName, String consumerName) throws IOException, JetStreamApiException;

/**
* Gets the info for an existing consumer.
* @param streamName name of the stream
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/io/nats/client/api/ConsumerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class ConsumerConfiguration implements JsonSerializable {
protected final Integer maxBatch;
protected final Integer maxBytes;
protected final Integer numReplicas;
protected final ZonedDateTime pauseUntil;
protected final Boolean flowControl;
protected final Boolean headersOnly;
protected final Boolean memStorage;
Expand Down Expand Up @@ -110,6 +111,7 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) {
this.maxBatch = cc.maxBatch;
this.maxBytes = cc.maxBytes;
this.numReplicas = cc.numReplicas;
this.pauseUntil = cc.pauseUntil;
this.flowControl = cc.flowControl;
this.headersOnly = cc.headersOnly;
this.memStorage = cc.memStorage;
Expand Down Expand Up @@ -143,6 +145,7 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) {
maxBatch = readInteger(v, MAX_BATCH);
maxBytes = readInteger(v, MAX_BYTES);
numReplicas = readInteger(v, NUM_REPLICAS);
pauseUntil = readDate(v, PAUSE_UNTIL);

flowControl = readBoolean(v, FLOW_CONTROL, null);
headersOnly = readBoolean(v, HEADERS_ONLY, null);
Expand Down Expand Up @@ -187,6 +190,7 @@ protected ConsumerConfiguration(Builder b)
this.maxBatch = b.maxBatch;
this.maxBytes = b.maxBytes;
this.numReplicas = b.numReplicas;
this.pauseUntil = b.pauseUntil;

this.flowControl = b.flowControl;
this.headersOnly = b.headersOnly;
Expand Down Expand Up @@ -229,6 +233,7 @@ public String toJson() {
JsonUtils.addFieldAsNanos(sb, INACTIVE_THRESHOLD, inactiveThreshold);
JsonUtils.addDurations(sb, BACKOFF, backoff);
JsonUtils.addField(sb, NUM_REPLICAS, numReplicas);
JsonUtils.addField(sb, PAUSE_UNTIL, pauseUntil);
JsonUtils.addField(sb, MEM_STORAGE, memStorage);
JsonUtils.addField(sb, METADATA, metadata);
if (filterSubjects != null) {
Expand Down Expand Up @@ -485,6 +490,14 @@ public Map<String, String> getMetadata() {
*/
public int getNumReplicas() { return getOrUnset(numReplicas); }

/**
* Get the time until the consumer is paused.
* @return paused until time
*/
public ZonedDateTime getPauseUntil() {
return pauseUntil;
}

/**
* Gets whether deliver policy of this consumer configuration was set or left unset
* @return true if the policy was set, false if the policy was not set
Expand Down Expand Up @@ -663,6 +676,7 @@ public static class Builder {
private Integer maxBatch;
private Integer maxBytes;
private Integer numReplicas;
private ZonedDateTime pauseUntil;

private Boolean flowControl;
private Boolean headersOnly;
Expand Down Expand Up @@ -1142,6 +1156,16 @@ public Builder numReplicas(Integer numReplicas) {
return this;
}

/**
* Sets the time to pause the consumer until.
* @param pauseUntil the time to pause
* @return Builder
*/
public Builder pauseUntil(ZonedDateTime pauseUntil) {
this.pauseUntil = pauseUntil;
return this;
}

/**
* set the headers only flag saying to deliver only the headers of
* messages in the stream and not the bodies
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/io/nats/client/api/ConsumerInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.nats.client.Message;
import io.nats.client.support.JsonValue;

import java.time.Duration;
import java.time.ZonedDateTime;

import static io.nats.client.support.ApiConstants.*;
Expand All @@ -36,6 +37,8 @@ public class ConsumerInfo extends ApiResponse<ConsumerInfo> {
private final long numWaiting;
private final long numAckPending;
private final long numRedelivered;
private final boolean paused;
private final Duration pauseRemaining;
private final ClusterInfo clusterInfo;
private final boolean pushBound;
private final ZonedDateTime timestamp;
Expand All @@ -59,6 +62,8 @@ public ConsumerInfo(JsonValue vConsumerInfo) {
numRedelivered = readLong(jv, NUM_REDELIVERED, 0);
numPending = readLong(jv, NUM_PENDING, 0);
numWaiting = readLong(jv, NUM_WAITING, 0);
paused = readBoolean(jv, PAUSED, false);
pauseRemaining = readNanos(jv, PAUSE_REMAINING);

clusterInfo = ClusterInfo.optionalInstance(readValue(jv, CLUSTER));
pushBound = readBoolean(jv, PUSH_BOUND);
Expand Down Expand Up @@ -110,6 +115,14 @@ public long getRedelivered() {
return numRedelivered;
}

public boolean getPaused() {
return paused;
}

public Duration getPauseRemaining() {
return pauseRemaining;
}

public ClusterInfo getClusterInfo() {
return clusterInfo;
}
Expand Down
45 changes: 45 additions & 0 deletions src/main/java/io/nats/client/api/ConsumerPauseRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client.api;

import io.nats.client.support.JsonSerializable;
import io.nats.client.support.JsonUtils;
import java.time.ZonedDateTime;

import static io.nats.client.support.ApiConstants.CONFIG;
import static io.nats.client.support.ApiConstants.PAUSE_UNTIL;
import static io.nats.client.support.ApiConstants.STREAM_NAME;
import static io.nats.client.support.JsonUtils.addField;
import static io.nats.client.support.JsonUtils.beginJson;
import static io.nats.client.support.JsonUtils.endJson;

/**
* Object used to make a request to pause a consumer. Used Internally
*/
public class ConsumerPauseRequest implements JsonSerializable {
private final ZonedDateTime pauseUntil;

public ConsumerPauseRequest(ZonedDateTime pauseUntil) {
this.pauseUntil = pauseUntil;
}

@Override
public String toJson() {
StringBuilder sb = beginJson();

addField(sb, PAUSE_UNTIL, pauseUntil);

return endJson(sb).toString();
}
}
64 changes: 64 additions & 0 deletions src/main/java/io/nats/client/api/ConsumerPauseResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client.api;

import io.nats.client.Message;
import java.time.Duration;
import java.time.ZonedDateTime;

import static io.nats.client.support.ApiConstants.PAUSED;
import static io.nats.client.support.ApiConstants.PAUSE_REMAINING;
import static io.nats.client.support.ApiConstants.PAUSE_UNTIL;
import static io.nats.client.support.JsonValueUtils.readBoolean;
import static io.nats.client.support.JsonValueUtils.readDate;
import static io.nats.client.support.JsonValueUtils.readLong;
import static io.nats.client.support.JsonValueUtils.readNanos;

public class ConsumerPauseResponse extends ApiResponse<ConsumerPauseResponse> {

private final boolean paused;
private final ZonedDateTime pauseUntil;
private final Duration pauseRemaining;

public ConsumerPauseResponse(Message msg) {
super(msg);
paused = readBoolean(jv, PAUSED);
pauseUntil = readDate(jv, PAUSE_UNTIL);
pauseRemaining = readNanos(jv, PAUSE_REMAINING);
}

/**
* Returns true if the consumer was paused
* @return whether the consumer is paused
*/
public boolean isPaused() {
return paused;
}

/**
* Returns the time until the consumer is paused
* @return pause until time
*/
public ZonedDateTime getPauseUntil() {
return pauseUntil;
}

/**
* Returns how much time is remaining for this consumer to be paused
* @return remaining paused time
*/
public Duration getPauseRemaining() {
return pauseRemaining;
}
}
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/impl/NatsJetStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ public List<String> getChanges(ConsumerConfiguration serverCc) {
if (maxBatch != null && maxBatch != serverCcc.getMaxBatch()) { changes.add("maxBatch"); }
if (maxBytes != null && maxBytes != serverCcc.getMaxBytes()) { changes.add("maxBytes"); }
if (numReplicas != null && !numReplicas.equals(serverCcc.numReplicas)) { changes.add("numReplicas"); }
if (pauseUntil != null && !pauseUntil.equals(serverCcc.pauseUntil)) { changes.add("pauseUntil"); }

if (ackWait != null && !ackWait.equals(getOrUnset(serverCcc.ackWait))) { changes.add("ackWait"); }
if (idleHeartbeat != null && !idleHeartbeat.equals(getOrUnset(serverCcc.idleHeartbeat))) { changes.add("idleHeartbeat"); }
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/io/nats/client/impl/NatsJetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.ZonedDateTime;
import java.util.List;

import static io.nats.client.support.Validator.*;
Expand Down Expand Up @@ -142,6 +143,32 @@ public boolean deleteConsumer(String streamName, String consumerName) throws IOE
return new SuccessApiResponse(resp).throwOnHasError().getSuccess();
}

/**
* {@inheritDoc}
*/
@Override
public ConsumerPauseResponse pauseConsumer(String streamName, String consumerName, ZonedDateTime pauseUntil) throws IOException, JetStreamApiException {
validateNotNull(streamName, "Stream Name");
validateNotNull(consumerName, "Consumer Name");
String subj = String.format(JSAPI_CONSUMER_PAUSE, streamName, consumerName);
ConsumerPauseRequest pauseRequest = new ConsumerPauseRequest(pauseUntil);
Message resp = makeRequestResponseRequired(subj, pauseRequest.serialize(), jso.getRequestTimeout());
return new ConsumerPauseResponse(resp).throwOnHasError();
}

/**
* {@inheritDoc}
*/
@Override
public boolean resumeConsumer(String streamName, String consumerName) throws IOException, JetStreamApiException {
validateNotNull(streamName, "Stream Name");
validateNotNull(consumerName, "Consumer Name");
String subj = String.format(JSAPI_CONSUMER_PAUSE, streamName, consumerName);
Message resp = makeRequestResponseRequired(subj, null, jso.getRequestTimeout());
ConsumerPauseResponse response = new ConsumerPauseResponse(resp).throwOnHasError();
return !response.isPaused();
}

/**
* {@inheritDoc}
*/
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/nats/client/support/ApiConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ public interface ApiConstants {
String OPT_START_SEQ = "opt_start_seq";
String OPT_START_TIME = "opt_start_time";
String OPTIONS = "options";
String PAUSED = "paused";
String PAUSE_REMAINING = "pause_remaining";
String PAUSE_UNTIL = "pause_until";
String PLACEMENT = "placement";
String PORT = "port";
String PROCESSING_TIME = "processing_time";
Expand Down
Loading

0 comments on commit 6c5ae3f

Please sign in to comment.