Skip to content

Commit

Permalink
chore: add keepalive time and keepalive timeout for grpc external pro…
Browse files Browse the repository at this point in the history
…cessor (#23)

* chore: add keepalive and keepalive timeout for grpc external processor

* chore: add keepalive and keepalive timeout for grpc external processor

* fix: grpc channel shutdown and reformation

* fix: rename grpcArgKeepaliveTimeOutMs to grpcArgKeepaliveTimeoutMs

* fix: added default value for keepalive and keepalivetimeout

* fix:  checkstyle fix

* fix: bump up version
  • Loading branch information
mayankrai09 authored Apr 15, 2024
1 parent 7a3595e commit b19b5a4
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 44 deletions.
2 changes: 1 addition & 1 deletion config/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
<!-- See http://checkstyle.sf.net/config_sizes.html -->
<module name="MethodLength"/>
<module name="ParameterNumber">
<property name="max" value="17"/>
<property name="max" value="19"/>
</module>

<!-- Checks for whitespace -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class GrpcSourceConfig implements Serializable, SourceConfig {
private String grpcMethodUrl;
private String requestPattern;
private String requestVariables;
private String grpcArgKeepaliveTimeMs;
private String grpcArgKeepaliveTimeoutMs;
private String streamTimeout;
private String connectTimeout;
private boolean failOnErrors;
Expand Down Expand Up @@ -63,30 +65,34 @@ public GrpcSourceConfig(String endpoint, int servicePort, String grpcRequestProt
/**
* Instantiates a new Grpc source config with specified grpc stencil url.
*
* @param endpoint the endpoint
* @param servicePort the service port
* @param grpcRequestProtoSchema the grpc request proto schema
* @param grpcResponseProtoSchema the grpc response proto schema
* @param grpcMethodUrl the grpc method url
* @param requestPattern the request pattern
* @param requestVariables the request variables
* @param streamTimeout the stream timeout
* @param connectTimeout the connect timeout
* @param failOnErrors the fail on errors
* @param grpcStencilUrl the grpc stencil url
* @param type the type
* @param retainResponseType the retain response type
* @param headers the headers
* @param outputMapping the output mapping
* @param metricId the metric id
* @param capacity the capacity
* @param endpoint the endpoint
* @param servicePort the service port
* @param grpcRequestProtoSchema the grpc request proto schema
* @param grpcResponseProtoSchema the grpc response proto schema
* @param grpcMethodUrl the grpc method url
* @param requestPattern the request pattern
* @param grpcArgKeepaliveTimeMs the grpc Keepalive Time ms
* @param grpcArgKeepaliveTimeoutMs the grpc Keepalive Timeout ms
* @param requestVariables the request variables
* @param streamTimeout the stream timeout
* @param connectTimeout the connect timeout
* @param failOnErrors the fail on errors
* @param grpcStencilUrl the grpc stencil url
* @param type the type
* @param retainResponseType the retain response type
* @param headers the headers
* @param outputMapping the output mapping
* @param metricId the metric id
* @param capacity the capacity
*/
public GrpcSourceConfig(String endpoint, int servicePort, String grpcRequestProtoSchema, String grpcResponseProtoSchema, String grpcMethodUrl, String requestPattern, String requestVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String grpcStencilUrl, String type, boolean retainResponseType, Map<String, String> headers, Map<String, OutputMapping> outputMapping, String metricId, int capacity) {
public GrpcSourceConfig(String endpoint, int servicePort, String grpcRequestProtoSchema, String grpcResponseProtoSchema, String grpcMethodUrl, String grpcArgKeepaliveTimeMs, String grpcArgKeepaliveTimeoutMs, String requestPattern, String requestVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String grpcStencilUrl, String type, boolean retainResponseType, Map<String, String> headers, Map<String, OutputMapping> outputMapping, String metricId, int capacity) {
this.endpoint = endpoint;
this.servicePort = servicePort;
this.grpcRequestProtoSchema = grpcRequestProtoSchema;
this.grpcResponseProtoSchema = grpcResponseProtoSchema;
this.grpcMethodUrl = grpcMethodUrl;
this.grpcArgKeepaliveTimeMs = grpcArgKeepaliveTimeMs;
this.grpcArgKeepaliveTimeoutMs = grpcArgKeepaliveTimeoutMs;
this.requestPattern = requestPattern;
this.requestVariables = requestVariables;
this.streamTimeout = streamTimeout;
Expand Down Expand Up @@ -209,6 +215,42 @@ public String getGrpcMethodUrl() {
return grpcMethodUrl;
}

/**
* Gets grpc arg keepalive time ms.
*
* @return grpc arg keepalive time ms
*/
public String getGrpcArgKeepaliveTimeMs() {
return grpcArgKeepaliveTimeMs;
}

/**
* Gets grpc arg keepalive timeout ms.
*
* @return grpc arg keepalive timeout ms
*/
public String getGrpcArgKeepaliveTimeoutMs() {
return grpcArgKeepaliveTimeoutMs;
}

/**
* Sets grpc arg keepalive time ms.
*
* @param grpcArgKeepaliveTimeMs the grpc arg keepalive time ms
*/
public void setGrpcArgKeepaliveTimeMs(String grpcArgKeepaliveTimeMs) {
this.grpcArgKeepaliveTimeMs = grpcArgKeepaliveTimeMs;
}

/**
* Sets grpc arg keepalive timeout ms.
*
* @param grpcArgKeepaliveTimeoutMs the grpc arg keepalive timeout ms
*/
public void setGrpcArgKeepaliveTimeoutMs(String grpcArgKeepaliveTimeoutMs) {
this.grpcArgKeepaliveTimeoutMs = grpcArgKeepaliveTimeoutMs;
}

/**
* Gets service port.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public class GrpcSourceConfigBuilder {
private String grpcRequestProtoSchema;
private String grpcResponseProtoSchema;
private String grpcMethodUrl;
private String grpcArgKeepaliveTimeMs;
private String grpcArgKeepaliveTimeoutMs;
private String requestPattern;
private String requestVariables;
private Map<String, OutputMapping> outputMapping;
Expand Down Expand Up @@ -108,8 +110,18 @@ public GrpcSourceConfigBuilder setCapacity(int capacity) {
return this;
}

public GrpcSourceConfigBuilder setGrpcArgKeepaliveTimeMs(String grpcArgKeepaliveTimeMs) {
this.grpcArgKeepaliveTimeMs = grpcArgKeepaliveTimeMs;
return this;
}

public GrpcSourceConfigBuilder setGrpcArgKeepaliveTimeoutMs(String grpcArgKeepaliveTimeoutMs) {
this.grpcArgKeepaliveTimeoutMs = grpcArgKeepaliveTimeoutMs;
return this;
}

public GrpcSourceConfig createGrpcSourceConfig() {
return new GrpcSourceConfig(endpoint, servicePort, grpcRequestProtoSchema, grpcResponseProtoSchema, grpcMethodUrl, requestPattern, requestVariables,
return new GrpcSourceConfig(endpoint, servicePort, grpcRequestProtoSchema, grpcResponseProtoSchema, grpcMethodUrl, grpcArgKeepaliveTimeMs, grpcArgKeepaliveTimeoutMs, requestPattern, requestVariables,
streamTimeout, connectTimeout, failOnErrors, grpcStencilUrl, type, retainResponseType, headers, outputMapping, metricId, capacity);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,32 @@
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.DynamicMessage;

import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.ClientInterceptors;
import io.grpc.Metadata;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Channel;
import io.grpc.MethodDescriptor;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import org.apache.commons.lang3.StringUtils;

import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* The Grpc client.
*/
public class GrpcClient {
private final GrpcSourceConfig grpcConfig;

private Channel decoratedChannel;
private ManagedChannel decoratedChannel;

private final long defaultKeepAliveTimeout = 20000L;

private final long defaultKeepAliveInterval = Long.MAX_VALUE;

/**
* Instantiates a new Grpc client.
Expand All @@ -39,19 +45,27 @@ public GrpcClient(GrpcSourceConfig grpcConfig) {
* Add channel.
*/
public void addChannel() {
Channel channel = ManagedChannelBuilder.forAddress(grpcConfig.getEndpoint(), grpcConfig.getServicePort()).usePlaintext().build();
ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress(grpcConfig.getEndpoint(), grpcConfig.getServicePort()).usePlaintext();
channelBuilder = decorateManagedChannelBuilder(channelBuilder);
decoratedChannel = channelBuilder.build();
}

protected ManagedChannelBuilder<?> decorateManagedChannelBuilder(ManagedChannelBuilder<?> channelBuilder) {

Metadata metadata = new Metadata();
long keepAliveInterval = StringUtils.isNotEmpty(grpcConfig.getGrpcArgKeepaliveTimeMs()) ? Long.parseLong(grpcConfig.getGrpcArgKeepaliveTimeMs()) : defaultKeepAliveInterval;
long keepAliveTimeout = StringUtils.isNotEmpty(grpcConfig.getGrpcArgKeepaliveTimeoutMs()) ? Long.parseLong(grpcConfig.getGrpcArgKeepaliveTimeoutMs()) : defaultKeepAliveTimeout;

channelBuilder = channelBuilder.keepAliveTime(keepAliveInterval, TimeUnit.MILLISECONDS).keepAliveTimeout(keepAliveTimeout, TimeUnit.MILLISECONDS);

if (grpcConfig.getHeaders() != null && !grpcConfig.getHeaders().isEmpty()) {
Metadata metadata = new Metadata();
for (Map.Entry<String, String> header : grpcConfig.getHeaders().entrySet()) {
metadata.put(Metadata.Key.of(header.getKey(), Metadata.ASCII_STRING_MARSHALLER), header.getValue());
}
channelBuilder.intercept(MetadataUtils.newAttachHeadersInterceptor(metadata));
}
decoratedChannel = ClientInterceptors.intercept(channel,
MetadataUtils.newAttachHeadersInterceptor(metadata));


return channelBuilder;
}

/**
Expand Down Expand Up @@ -89,6 +103,9 @@ private ClientCall<DynamicMessage, DynamicMessage> createCall(CallOptions callOp
* Close channel.
*/
public void close() {
if (decoratedChannel != null && !decoratedChannel.isShutdown()) {
decoratedChannel.shutdown();
}
this.decoratedChannel = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,39 @@

import io.grpc.Channel;
import com.gotocompany.dagger.core.processors.external.grpc.GrpcSourceConfig;
import io.grpc.ManagedChannelBuilder;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;

import java.util.concurrent.TimeUnit;

import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.times;
import static org.mockito.MockitoAnnotations.initMocks;

public class GrpcClientTest {

@Mock
private GrpcSourceConfig grpcSourceConfig;

@Mock
private ManagedChannelBuilder channelBuilder;

@Before
public void setUp() {
initMocks(this);
when(grpcSourceConfig.getEndpoint()).thenReturn("localhost");
when(grpcSourceConfig.getServicePort()).thenReturn(8080);
}

@Test
public void channelShouldBeAddedForAHostAndPort() {

grpcSourceConfig = mock(GrpcSourceConfig.class);

GrpcClient grpcClient = new GrpcClient(grpcSourceConfig);

when(grpcSourceConfig.getEndpoint()).thenReturn("localhost");
when(grpcSourceConfig.getServicePort()).thenReturn(8080);

grpcClient.addChannel();

Channel decoratedChannel = grpcClient.getDecoratedChannel();
Expand All @@ -30,22 +43,29 @@ public void channelShouldBeAddedForAHostAndPort() {
}

@Test
public void grpcClientCloseShouldWork() {

grpcSourceConfig = mock(GrpcSourceConfig.class);
public void channelBuilderShouldBeDecoratedWithKeepaliveAndTimeOutMS() {
when(grpcSourceConfig.getGrpcArgKeepaliveTimeMs()).thenReturn("1000");
when(grpcSourceConfig.getGrpcArgKeepaliveTimeoutMs()).thenReturn("100");
when(channelBuilder.keepAliveTime(anyLong(), any())).thenReturn(channelBuilder);

GrpcClient grpcClient = new GrpcClient(grpcSourceConfig);
grpcClient.decorateManagedChannelBuilder(channelBuilder);
verify(channelBuilder, times(1)).keepAliveTimeout(Long.parseLong("100"), TimeUnit.MILLISECONDS);
verify(channelBuilder, times(1)).keepAliveTime(Long.parseLong("1000"), TimeUnit.MILLISECONDS);
}

when(grpcSourceConfig.getEndpoint()).thenReturn("localhost");
when(grpcSourceConfig.getServicePort()).thenReturn(8080);
@Test
public void grpcClientCloseShouldWork() {

GrpcClient grpcClient = new GrpcClient(grpcSourceConfig);

grpcClient.addChannel();

Channel decoratedChannel = grpcClient.getDecoratedChannel();
assertNotNull(decoratedChannel);

grpcClient.close();
decoratedChannel = grpcClient.getDecoratedChannel();
decoratedChannel = grpcClient.getDecoratedChannel();
assertNull(decoratedChannel);

}
Expand Down
18 changes: 18 additions & 0 deletions docs/docs/advance/post_processor.md
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,24 @@ The timeout value for gRPC client in ms.
- Example value: `5000`
- Type: `required`
##### `grpc_arg_keepalive_time_ms`
The keepalive ping is a way to check if a channel is currently working by sending HTTP2 pings over the transport. It is sent periodically, and if the ping is not acknowledged by the peer within a certain timeout period, the transport is disconnected. Other keepalive configurations are described [here](https://github.com/grpc/grpc/blob/master/doc/keepalive.md).
This channel argument controls the period (in milliseconds) after which a keepalive ping is sent on the transport. If smaller than 10000, 10000 will be used instead.
- Example value: `60000`
- Type: `optional`
- Default value: `infinite`
##### `grpc_arg_keepalive_timeout_ms`
This channel argument controls the amount of time (in milliseconds) the sender of the keepalive ping waits for an acknowledgement. If it does not receive an acknowledgment within this time, it will close the connection.
- Example value: `5000`
- Type: `optional`
- Default value: `20000`
##### `fail_on_errors`
A flag for deciding whether the job should fail on encountering errors or not. If set false the job won’t fail and enrich with empty fields otherwise the job will fail.
Expand Down
2 changes: 1 addition & 1 deletion version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.10.1
0.10.2

0 comments on commit b19b5a4

Please sign in to comment.