diff --git a/ingester-example/src/main/java/io/greptime/TestConnector.java b/ingester-example/src/main/java/io/greptime/TestConnector.java index f0bd59f..c056d5d 100644 --- a/ingester-example/src/main/java/io/greptime/TestConnector.java +++ b/ingester-example/src/main/java/io/greptime/TestConnector.java @@ -47,6 +47,8 @@ public static GreptimeDB connectToDefaultDB() { // // Sets the RPC options, in general, the default configuration is fine. .rpcOptions(RpcOptions.newDefault()) + // Enable TLS connection when remote port is secured by TLS + // .tlsOptions(null) // Optional, the default value is fine. // // In some case of failure, a retry of write can be attempted. @@ -85,8 +87,6 @@ public static GreptimeDB connectToDefaultDB() { .router(null) // Sets authentication information. If the DB is not required to authenticate, we can ignore this. .authInfo(AuthInfo.noAuthorization()) - // Enable TLS connection when remote port is secured by TLS - // .tlsOptions(new TlsOptions()) // A good start ^_^ .build(); diff --git a/ingester-grpc/src/main/java/io/greptime/rpc/GrpcClient.java b/ingester-grpc/src/main/java/io/greptime/rpc/GrpcClient.java index 94d7f44..71597d5 100644 --- a/ingester-grpc/src/main/java/io/greptime/rpc/GrpcClient.java +++ b/ingester-grpc/src/main/java/io/greptime/rpc/GrpcClient.java @@ -58,8 +58,10 @@ import io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -547,22 +549,23 @@ private Channel getCheckedChannel(Endpoint endpoint, Consumer onFaile } private SslContext newSslContext(TlsOptions tlsOptions) { - try { SslContextBuilder builder = GrpcSslContexts.forClient(); - if (tlsOptions.getClientCertChain().isPresent() && tlsOptions.getPrivateKey().isPresent()) { - if (tlsOptions.getPrivateKeyPassword().isPresent()) { - builder.keyManager(tlsOptions.getClientCertChain().get(), tlsOptions.getPrivateKey().get(), - tlsOptions.getPrivateKeyPassword().get()); + Optional clientCertChain = tlsOptions.getClientCertChain(); + Optional privateKey = tlsOptions.getPrivateKey(); + Optional privateKeyPassword = tlsOptions.getPrivateKeyPassword(); + + if (clientCertChain.isPresent() && privateKey.isPresent()) { + if (privateKeyPassword.isPresent()) { + builder.keyManager(clientCertChain.get(), privateKey.get(), + privateKeyPassword.get()); } else { - builder.keyManager(tlsOptions.getClientCertChain().get(), tlsOptions.getPrivateKey().get()); + builder.keyManager(clientCertChain.get(), privateKey.get()); } } - if (tlsOptions.getRootCerts().isPresent()) { - builder.trustManager(tlsOptions.getRootCerts().get()); - } + tlsOptions.getRootCerts().ifPresent(builder::trustManager); return builder.build(); } catch (SSLException e) { @@ -574,8 +577,9 @@ private IdChannel newChannel(Endpoint endpoint) { NettyChannelBuilder innerChannelBuilder = NettyChannelBuilder.forAddress(endpoint.getAddr(), endpoint.getPort()); - if (this.opts.getTlsOptions().isPresent()) { - innerChannelBuilder.useTransportSecurity().sslContext(newSslContext(this.opts.getTlsOptions().get())); + TlsOptions tlsOptions = this.opts.getTlsOptions(); + if (tlsOptions != null) { + innerChannelBuilder.useTransportSecurity().sslContext(newSslContext(tlsOptions)); } else { innerChannelBuilder.usePlaintext(); } diff --git a/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java b/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java index 16ce06c..012d145 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java +++ b/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java @@ -144,6 +144,9 @@ public Builder addColumn(String name, // "Invalid timestamp data type: %s, only support `DataType.TimestampXXX`", dataType); } + // trim leading and trailing spaces + name = name.trim(); + this.columnNames.add(name); this.semanticTypes.add(semanticType.toProtoValue()); this.dataTypes.add(dataType.toProtoValue()); diff --git a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java index f6214b6..cc8a45c 100644 --- a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java +++ b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java @@ -145,6 +145,8 @@ public static final class Builder { private Executor asyncPool; // Rpc options, in general the default configuration is fine. private RpcOptions rpcOptions = RpcOptions.newDefault(); + // GreptimeDB secure connection options + private TlsOptions tlsOptions; private int writeMaxRetries = 1; // Write flow limit: maximum number of data points in-flight. private int maxInFlightWritePoints = 10 * 65536; @@ -191,6 +193,18 @@ public Builder rpcOptions(RpcOptions rpcOptions) { return this; } + /** + * Set `TlsOptions` to use secure connection between client and server. Set to `null` to use + * plaintext connection instead. + * + * @param tlsOptions for configure secure connection, set to null to use plaintext + * @return this builder + */ + public Builder tlsOptions(TlsOptions tlsOptions) { + this.tlsOptions = tlsOptions; + return this; + } + /** * In some case of failure, a retry of write can be attempted. * @@ -280,24 +294,16 @@ public Builder router(Router router) { return this; } - /** - * Set `TlsOptions` to use secure connection between client and server. Set to `null` to use - * plaintext connection instead. - * - * @param tlsOptions for configure secure connection, set to null to use plaintext - * @return this builder - */ - public Builder tlsOptions(TlsOptions tlsOptions) { - this.rpcOptions.setTlsOptions(tlsOptions); - return this; - } - /** * A good start, happy coding. * * @return nice things */ public GreptimeOptions build() { + // Set tls options to rpc options if tls options is not null + if (this.tlsOptions != null && this.rpcOptions != null) { + this.rpcOptions.setTlsOptions(this.tlsOptions); + } GreptimeOptions opts = new GreptimeOptions(); opts.setEndpoints(this.endpoints); opts.setRpcOptions(this.rpcOptions); diff --git a/ingester-rpc/src/main/java/io/greptime/rpc/RpcOptions.java b/ingester-rpc/src/main/java/io/greptime/rpc/RpcOptions.java index 6df2aee..08d19d8 100644 --- a/ingester-rpc/src/main/java/io/greptime/rpc/RpcOptions.java +++ b/ingester-rpc/src/main/java/io/greptime/rpc/RpcOptions.java @@ -105,14 +105,14 @@ public class RpcOptions implements Copiable { * Set `TlsOptions` to use secure connection between client and server. Set to `null` to use * plaintext connection instead. */ - private Optional tlsOptions = Optional.empty(); + private TlsOptions tlsOptions; - public Optional getTlsOptions() { + public TlsOptions getTlsOptions() { return tlsOptions; } public void setTlsOptions(TlsOptions tlsOptions) { - this.tlsOptions = Optional.ofNullable(tlsOptions); + this.tlsOptions = tlsOptions; } public boolean isUseRpcSharedPool() { diff --git a/ingester-rpc/src/main/java/io/greptime/rpc/TlsOptions.java b/ingester-rpc/src/main/java/io/greptime/rpc/TlsOptions.java index 954b37f..3abc187 100644 --- a/ingester-rpc/src/main/java/io/greptime/rpc/TlsOptions.java +++ b/ingester-rpc/src/main/java/io/greptime/rpc/TlsOptions.java @@ -26,73 +26,65 @@ */ public class TlsOptions implements Copiable { - private Optional clientCertChain = Optional.empty(); + private File clientCertChain; - private Optional privateKey = Optional.empty(); + private File privateKey; - private Optional privateKeyPassword = Optional.empty(); + private String privateKeyPassword; - private Optional rootCerts = Optional.empty(); + private File rootCerts; @Override public TlsOptions copy() { TlsOptions that = new TlsOptions(); - that.setClientCertChain(this.getClientCertChain()); - that.setPrivateKey(this.getPrivateKey()); - that.setPrivateKeyPassword(this.getPrivateKeyPassword()); - that.setRootCerts(this.getRootCerts()); + that.setClientCertChain(this.clientCertChain); + that.setPrivateKey(this.privateKey); + that.setPrivateKeyPassword(this.privateKeyPassword); + that.setRootCerts(this.rootCerts); return that; } public Optional getClientCertChain() { - return clientCertChain; + return Optional.ofNullable(this.clientCertChain); } - public void setClientCertChain(Optional clientCertChain) { + public void setClientCertChain(File clientCertChain) { this.clientCertChain = clientCertChain; } public Optional getPrivateKey() { - return privateKey; + return Optional.ofNullable(this.privateKey); } - public void setPrivateKey(Optional privateKey) { + public void setPrivateKey(File privateKey) { this.privateKey = privateKey; } public Optional getPrivateKeyPassword() { - return privateKeyPassword; + return Optional.ofNullable(this.privateKeyPassword); } - public void setPrivateKeyPassword(Optional privateKeyPassword) { + public void setPrivateKeyPassword(String privateKeyPassword) { this.privateKeyPassword = privateKeyPassword; } public Optional getRootCerts() { - return rootCerts; + return Optional.ofNullable(this.rootCerts); } - public void setRootCerts(Optional rootCerts) { + public void setRootCerts(File rootCerts) { this.rootCerts = rootCerts; } @Override public String toString() { - return "TlsOptions{" - + // - "clientCertChain=" - + this.clientCertChain - + // - ", privateKey=" - + this.privateKey - + // - ", privateKeyPassword=" - + this.privateKeyPassword.map((v) -> "****") - + // - ", rootCerts=" - + this.rootCerts - + '}'; + return "TlsOptions{" + // + "clientCertChain=" + clientCertChain + // + ", privateKey=" + privateKey + // + ", privateKeyPassword='" + getPrivateKeyPassword().map((v) -> "****") + '\'' + // + ", rootCerts=" + rootCerts + // + '}'; } }