Skip to content

Commit

Permalink
common grpc Server builder functional
Browse files Browse the repository at this point in the history
Signed-off-by: Maxwell Brown <[email protected]>
  • Loading branch information
Galactus22625 committed Jan 31, 2025
1 parent a2e36fd commit a8e41ae
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.protobuf.services.ProtoReflectionService;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
import org.opensearch.dataprepper.GrpcRequestExceptionHandler;
import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider;
import org.opensearch.dataprepper.metrics.PluginMetrics;
Expand All @@ -40,7 +37,7 @@
import java.util.function.Function;


public class CreateServerBuilder {
public class CreateServer {
private final ServerConfiguration serverConfiguration;
private final Logger LOG;
private final PluginMetrics pluginMetrics;
Expand All @@ -53,15 +50,15 @@ public class CreateServerBuilder {

private static final RetryInfoConfig DEFAULT_RETRY_INFO = new RetryInfoConfig(Duration.ofMillis(100), Duration.ofMillis(2000));

public CreateServerBuilder(final ServerConfiguration serverConfiguration, Logger LOG, PluginMetrics pluginMetrics, String sourceName, String pipelineName) {
public CreateServer(final ServerConfiguration serverConfiguration, Logger LOG, PluginMetrics pluginMetrics, String sourceName, String pipelineName) {
this.serverConfiguration = serverConfiguration;
this.LOG = LOG;
this.pluginMetrics = pluginMetrics;
this.sourceName = sourceName;
this.pipelineName = pipelineName;
}

public Server createGRPCServerBuilder(final GrpcAuthenticationProvider authenticationProvider, BindableService grpcService, CertificateProvider certificateProvider) {
public <K, V> Server createGRPCServerBuilder(final GrpcAuthenticationProvider authenticationProvider, BindableService grpcService, CertificateProvider certificateProvider, MethodDescriptor<K, V> methodDescriptor) {
final List<ServerInterceptor> serverInterceptors = getAuthenticationInterceptor(authenticationProvider);

final GrpcServiceBuilder grpcServiceBuilder = GrpcService
Expand All @@ -70,7 +67,6 @@ public Server createGRPCServerBuilder(final GrpcAuthenticationProvider authentic
.useBlockingTaskExecutor(true)
.exceptionHandler(createGrpExceptionHandler());

final MethodDescriptor<ExportMetricsServiceRequest, ExportMetricsServiceResponse> methodDescriptor = MetricsServiceGrpc.getExportMethod();
final String sourcePath = serverConfiguration.getPath();
if (sourcePath != null) {
final String transformedSourcePath = sourcePath.replace(PIPELINE_NAME_PLACEHOLDER, pipelineName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,20 @@
import com.fasterxml.jackson.annotation.JsonProperty;

public class RetryInfoConfig {
final Duration DEFAULT_MIN_DELAY = Duration.ofMillis(100);
final Duration DEFAULT_MAX_DELAY = Duration.ofMillis(2000);

@JsonProperty(value = "min_delay", defaultValue = "100ms")
private Duration minDelay;

@JsonProperty(value = "max_delay", defaultValue = "2s")
private Duration maxDelay;

public RetryInfoConfig() {
this.minDelay = DEFAULT_MIN_DELAY;
this.maxDelay = DEFAULT_MAX_DELAY;
}

public RetryInfoConfig(Duration minDelay, Duration maxDelay) {
this.minDelay = minDelay;
this.maxDelay = maxDelay;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@

package org.opensearch.dataprepper.plugins.server;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Setter;
import lombok.Getter;
import lombok.Setter;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
package org.opensearch.dataprepper.plugins.source.otellogs;

import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServerBuilder;
import io.grpc.MethodDescriptor;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse;
import io.opentelemetry.proto.collector.logs.v1.LogsServiceGrpc;
import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
Expand All @@ -22,17 +25,14 @@
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.otel.codec.OTelLogsDecoder;
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
import org.opensearch.dataprepper.plugins.server.CreateServerBuilder;
import org.opensearch.dataprepper.plugins.server.CreateServer;
import org.opensearch.dataprepper.plugins.server.ServerConfiguration;
import org.opensearch.dataprepper.plugins.server.RetryInfoConfig;
import org.opensearch.dataprepper.plugins.source.otellogs.certificate.CertificateProviderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;

@DataPrepperPlugin(name = "otel_logs_source", pluginType = Source.class, pluginConfigurationType = OTelLogsSourceConfig.class)
public class OTelLogsSource implements Source<Record<Object>> {
Expand Down Expand Up @@ -88,12 +88,13 @@ public void start(Buffer<Record<Object>> buffer) {
);

ServerConfiguration serverConfiguration = ConvertConfiguration.convertConfiguration(oTelLogsSourceConfig);
CreateServerBuilder createServer = new CreateServerBuilder(serverConfiguration, LOG, pluginMetrics, "otel_logs_source", pipelineName);
CreateServer createServer = new CreateServer(serverConfiguration, LOG, pluginMetrics, "otel_logs_source", pipelineName);
CertificateProvider certificateProvider = null;
if (oTelLogsSourceConfig.isSsl() || oTelLogsSourceConfig.useAcmCertForSSL()) {
certificateProvider = certificateProviderFactory.getCertificateProvider();
}
server = createServer.createGRPCServerBuilder(authenticationProvider, oTelLogsGrpcService, certificateProvider);
final MethodDescriptor<ExportLogsServiceRequest, ExportLogsServiceResponse> methodDescriptor = LogsServiceGrpc.getExportMethod();
server = createServer.createGRPCServerBuilder(authenticationProvider, oTelLogsGrpcService, certificateProvider, methodDescriptor);

pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
package org.opensearch.dataprepper.plugins.source.otelmetrics;

import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServerBuilder;
import io.grpc.MethodDescriptor;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
Expand All @@ -23,17 +26,14 @@
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.otel.codec.OTelMetricDecoder;
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
import org.opensearch.dataprepper.plugins.server.CreateServerBuilder;
import org.opensearch.dataprepper.plugins.server.RetryInfoConfig;
import org.opensearch.dataprepper.plugins.server.CreateServer;
import org.opensearch.dataprepper.plugins.server.ServerConfiguration;
import org.opensearch.dataprepper.plugins.source.otelmetrics.certificate.CertificateProviderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;

@DataPrepperPlugin(name = "otel_metrics_source", pluginType = Source.class, pluginConfigurationType = OTelMetricsSourceConfig.class)
public class OTelMetricsSource implements Source<Record<? extends Metric>> {
Expand Down Expand Up @@ -86,12 +86,13 @@ public void start(Buffer<Record<? extends Metric>> buffer) {
);

ServerConfiguration serverConfiguration = ConvertConfiguration.convertConfiguration(oTelMetricsSourceConfig);
CreateServerBuilder createServer = new CreateServerBuilder(serverConfiguration, LOG, pluginMetrics, "otel_metrics_source", pipelineName);
CreateServer createServer = new CreateServer(serverConfiguration, LOG, pluginMetrics, "otel_metrics_source", pipelineName);
CertificateProvider certificateProvider = null;
if (oTelMetricsSourceConfig.isSsl() || oTelMetricsSourceConfig.useAcmCertForSSL()) {
certificateProvider = certificateProviderFactory.getCertificateProvider();
}
server = createServer.createGRPCServerBuilder(authenticationProvider, oTelMetricsGrpcService, certificateProvider);
final MethodDescriptor<ExportMetricsServiceRequest, ExportMetricsServiceResponse> methodDescriptor = MetricsServiceGrpc.getExportMethod();
server = createServer.createGRPCServerBuilder(authenticationProvider, oTelMetricsGrpcService, certificateProvider, methodDescriptor);

pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@

package org.opensearch.dataprepper.plugins.source.oteltrace;

import com.linecorp.armeria.common.util.BlockingTaskExecutor;
import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServerBuilder;
import io.grpc.MethodDescriptor;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse;
import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc;
import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
Expand All @@ -23,7 +25,7 @@
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
import org.opensearch.dataprepper.plugins.otel.codec.OTelTraceDecoder;
import org.opensearch.dataprepper.plugins.server.CreateServerBuilder;
import org.opensearch.dataprepper.plugins.server.CreateServer;
import org.opensearch.dataprepper.plugins.server.ServerConfiguration;
import org.opensearch.dataprepper.plugins.source.oteltrace.certificate.CertificateProviderFactory;
import org.slf4j.Logger;
Expand Down Expand Up @@ -84,12 +86,13 @@ public void start(Buffer<Record<Object>> buffer) {
);

ServerConfiguration serverConfiguration = ConvertConfiguration.convertConfiguration(oTelTraceSourceConfig);
CreateServerBuilder createServer = new CreateServerBuilder(serverConfiguration, LOG, pluginMetrics, "otel_trace_source", pipelineName);
CreateServer createServer = new CreateServer(serverConfiguration, LOG, pluginMetrics, "otel_trace_source", pipelineName);
CertificateProvider certificateProvider = null;
if (oTelTraceSourceConfig.isSsl() || oTelTraceSourceConfig.useAcmCertForSSL()) {
certificateProvider = certificateProviderFactory.getCertificateProvider();
}
server = createServer.createGRPCServerBuilder(authenticationProvider, oTelTraceGrpcService, certificateProvider);
final MethodDescriptor<ExportTraceServiceRequest, ExportTraceServiceResponse> methodDescriptor = TraceServiceGrpc.getExportMethod();
server = createServer.createGRPCServerBuilder(authenticationProvider, oTelTraceGrpcService, certificateProvider, methodDescriptor);

pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections);
}
Expand Down

0 comments on commit a8e41ae

Please sign in to comment.