diff --git a/ingester-common/src/main/java/com/google/protobuf/ByteStringHelper.java b/ingester-common/src/main/java/com/google/protobuf/ByteStringHelper.java index 29ee2ed..d4c403c 100644 --- a/ingester-common/src/main/java/com/google/protobuf/ByteStringHelper.java +++ b/ingester-common/src/main/java/com/google/protobuf/ByteStringHelper.java @@ -19,7 +19,7 @@ import java.nio.ByteBuffer; /** - * A {@code ByteString} helper. + * A {@code ByteString} helper, avoid some memory copying to improve performance. * * @author jiachun.fjc */ diff --git a/ingester-common/src/main/java/io/greptime/common/Keys.java b/ingester-common/src/main/java/io/greptime/common/Keys.java index fef33b1..2576556 100644 --- a/ingester-common/src/main/java/io/greptime/common/Keys.java +++ b/ingester-common/src/main/java/io/greptime/common/Keys.java @@ -16,13 +16,15 @@ package io.greptime.common; /** - * System properties option keys + * Constant string keys. * * @author jiachun.fjc */ public final class Keys { public static final String DB_NAME = "GreptimeDB"; public static final String VERSION_KEY = "client.version"; + public static final String ID_KEY = "client.id"; + public static final String NODE_KEY = "client.node"; public static final String OS_NAME = "os.name"; public static final String USE_OS_SIGNAL = "greptimedb.use_os_signal"; public static final String AVAILABLE_CPUS = "greptimedb.available_cpus"; diff --git a/ingester-common/src/main/java/io/greptime/common/Streamable.java b/ingester-common/src/main/java/io/greptime/common/Streamable.java deleted file mode 100644 index fe7567c..0000000 --- a/ingester-common/src/main/java/io/greptime/common/Streamable.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2023 Greptime Team - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.greptime.common; - -import java.util.stream.Stream; - -/** - * Streamable interface. - * - * @author jiachun.fjc - */ -public interface Streamable { - - /** - * Returns a sequential {@code Stream} over the elements. - * - * @return a sequential {@code Stream}. - */ - Stream stream(); -} diff --git a/ingester-common/src/main/java/io/greptime/common/signal/FileSignal.java b/ingester-common/src/main/java/io/greptime/common/signal/FileSignal.java index 99f626a..4d65639 100644 --- a/ingester-common/src/main/java/io/greptime/common/signal/FileSignal.java +++ b/ingester-common/src/main/java/io/greptime/common/signal/FileSignal.java @@ -19,6 +19,10 @@ /** * File signal. + *

+ * Adopt the method of creating files with specified names to interact + * with the Client process and implement signal transmission, achieve the + * purpose of controlling the process to output specified content through this. * * @author jiachun.fjc */ diff --git a/ingester-common/src/main/java/io/greptime/common/util/DirectExecutor.java b/ingester-common/src/main/java/io/greptime/common/util/DirectExecutor.java index 88fa04c..3256902 100644 --- a/ingester-common/src/main/java/io/greptime/common/util/DirectExecutor.java +++ b/ingester-common/src/main/java/io/greptime/common/util/DirectExecutor.java @@ -32,6 +32,7 @@ public DirectExecutor(String name) { this.executeTimer = MetricsUtil.timer("direct_executor_timer", name); } + @SuppressWarnings("NullableProblems") @Override public void execute(Runnable cmd) { this.executeTimer.time(cmd); diff --git a/ingester-common/src/main/java/io/greptime/common/util/LogScheduledThreadPoolExecutor.java b/ingester-common/src/main/java/io/greptime/common/util/LogScheduledThreadPoolExecutor.java index 98ed2d0..74f5a31 100644 --- a/ingester-common/src/main/java/io/greptime/common/util/LogScheduledThreadPoolExecutor.java +++ b/ingester-common/src/main/java/io/greptime/common/util/LogScheduledThreadPoolExecutor.java @@ -38,26 +38,10 @@ public class LogScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor private final int corePoolSize; private final String name; - public LogScheduledThreadPoolExecutor(int corePoolSize, String name) { - super(corePoolSize); - this.corePoolSize = corePoolSize; - this.name = name; - } - - public LogScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, String name) { - super(corePoolSize, threadFactory); - this.corePoolSize = corePoolSize; - this.name = name; - } - - public LogScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler, String name) { - super(corePoolSize, handler); - this.corePoolSize = corePoolSize; - this.name = name; - } - - public LogScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, - RejectedExecutionHandler handler, String name) { + public LogScheduledThreadPoolExecutor(int corePoolSize, // + ThreadFactory threadFactory, // + RejectedExecutionHandler handler, // + String name) { super(corePoolSize, threadFactory, handler); this.corePoolSize = corePoolSize; this.name = name; diff --git a/ingester-common/src/main/java/io/greptime/common/util/LogThreadPoolExecutor.java b/ingester-common/src/main/java/io/greptime/common/util/LogThreadPoolExecutor.java index 63e812b..c0e3dde 100644 --- a/ingester-common/src/main/java/io/greptime/common/util/LogThreadPoolExecutor.java +++ b/ingester-common/src/main/java/io/greptime/common/util/LogThreadPoolExecutor.java @@ -40,8 +40,13 @@ public class LogThreadPoolExecutor extends ThreadPoolExecutor { private final int maximumPoolSize; private final String name; - public LogThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, - BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, + public LogThreadPoolExecutor(int corePoolSize, // + int maximumPoolSize, // + long keepAliveTime, // + TimeUnit unit, // + BlockingQueue workQueue, // + ThreadFactory threadFactory, // + RejectedExecutionHandler handler, // String name) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); this.corePoolSize = corePoolSize; diff --git a/ingester-common/src/main/java/io/greptime/common/util/MetricScheduledThreadPoolExecutor.java b/ingester-common/src/main/java/io/greptime/common/util/MetricScheduledThreadPoolExecutor.java index 0339e96..9630386 100644 --- a/ingester-common/src/main/java/io/greptime/common/util/MetricScheduledThreadPoolExecutor.java +++ b/ingester-common/src/main/java/io/greptime/common/util/MetricScheduledThreadPoolExecutor.java @@ -28,8 +28,10 @@ */ public class MetricScheduledThreadPoolExecutor extends LogScheduledThreadPoolExecutor { - public MetricScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, - RejectedExecutionHandler handler, String name) { + public MetricScheduledThreadPoolExecutor(int corePoolSize, // + ThreadFactory threadFactory, // + RejectedExecutionHandler handler, // + String name) { super(corePoolSize, threadFactory, handler, name); } diff --git a/ingester-common/src/main/java/io/greptime/common/util/Strings.java b/ingester-common/src/main/java/io/greptime/common/util/Strings.java index a9a2c07..aeb6e2c 100644 --- a/ingester-common/src/main/java/io/greptime/common/util/Strings.java +++ b/ingester-common/src/main/java/io/greptime/common/util/Strings.java @@ -51,7 +51,7 @@ public static String emptyToNull(String string) { * Returns {@code true} if the given string is null or is the empty string. */ public static boolean isNullOrEmpty(String str) { - return str == null || str.length() == 0; + return str == null || str.isEmpty(); } /** diff --git a/ingester-grpc/src/main/java/io/greptime/rpc/interceptors/ClientRequestLimitInterceptor.java b/ingester-grpc/src/main/java/io/greptime/rpc/interceptors/ClientRequestLimitInterceptor.java index 4292611..5be99e2 100644 --- a/ingester-grpc/src/main/java/io/greptime/rpc/interceptors/ClientRequestLimitInterceptor.java +++ b/ingester-grpc/src/main/java/io/greptime/rpc/interceptors/ClientRequestLimitInterceptor.java @@ -33,7 +33,7 @@ /** * ClientInterceptor that enforces per service and/or per method concurrent - * request limits and returns a Status.UNAVAILABLE when that limit has been + * request limits and returns a `Status.UNAVAILABLE` when that limit has been * reached. *

* Refer to `concurrency-limit-grpc` diff --git a/ingester-grpc/src/main/java/io/greptime/rpc/interceptors/ContextToHeadersInterceptor.java b/ingester-grpc/src/main/java/io/greptime/rpc/interceptors/ContextToHeadersInterceptor.java index 8a998c7..5614616 100644 --- a/ingester-grpc/src/main/java/io/greptime/rpc/interceptors/ContextToHeadersInterceptor.java +++ b/ingester-grpc/src/main/java/io/greptime/rpc/interceptors/ContextToHeadersInterceptor.java @@ -47,7 +47,6 @@ public ClientCall interceptCall(MethodDescriptor extends ForwardingClientCall.SimpleForwardingClientCall { - // Non private to avoid synthetic class HeaderAttachingClientCall(ClientCall delegate) { super(delegate); } diff --git a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java index 817fc0f..f0c8426 100644 --- a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java +++ b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java @@ -19,6 +19,7 @@ import com.codahale.metrics.Meter; import io.greptime.common.Display; import io.greptime.common.Endpoint; +import io.greptime.common.Keys; import io.greptime.common.Lifecycle; import io.greptime.common.signal.SignalHandlersLoader; import io.greptime.common.util.MetricsUtil; @@ -40,6 +41,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -55,11 +57,11 @@ public class GreptimeDB implements Write, WritePOJO, Lifecycle, private static final Logger LOG = LoggerFactory.getLogger(GreptimeDB.class); + private static final Map INSTANCES = new ConcurrentHashMap<>(); private static final AtomicInteger ID = new AtomicInteger(0); - private static final String ID_KEY = "greptimedb.client.id"; - private static final String VERSION_KEY = "greptimedb.client.version"; private static final String VERSION = Util.clientVersion(); + private static final String NODE_ID = UUID.randomUUID().toString(); private final int id; private final AtomicBoolean started = new AtomicBoolean(false); @@ -215,8 +217,9 @@ public String toString() { private Context attachCtx(Context ctx) { Context newCtx = ctx == null ? Context.newDefault() : ctx; - return newCtx.with(ID_KEY, this.id) // - .with(VERSION_KEY, VERSION); + return newCtx.with(Keys.VERSION_KEY, VERSION) // + .with(Keys.NODE_KEY, NODE_ID) // + .with(Keys.ID_KEY, this.id); } private static RpcClient makeRpcClient(GreptimeOptions opts) { diff --git a/ingester-protocol/src/main/java/io/greptime/Util.java b/ingester-protocol/src/main/java/io/greptime/Util.java index 403185c..e32d404 100644 --- a/ingester-protocol/src/main/java/io/greptime/Util.java +++ b/ingester-protocol/src/main/java/io/greptime/Util.java @@ -139,19 +139,6 @@ public static CompletableFuture completedCf(U value) { return CompletableFuture.completedFuture(value); } - /** - * Returns a new CompletableFuture that is already exceptionally with the given - * error. - * - * @param t the given exception - * @param the type of the value - * @return the exceptionally {@link CompletableFuture} - */ - public static CompletableFuture errorCf(Throwable t) { - final CompletableFuture err = new CompletableFuture<>(); - err.completeExceptionally(t); - return err; - } public static Observer toObserver(CompletableFuture future) { return new Observer() { diff --git a/ingester-protocol/src/main/java/io/greptime/WritePOJO.java b/ingester-protocol/src/main/java/io/greptime/WritePOJO.java index ef440cc..b17d9cf 100644 --- a/ingester-protocol/src/main/java/io/greptime/WritePOJO.java +++ b/ingester-protocol/src/main/java/io/greptime/WritePOJO.java @@ -52,9 +52,9 @@ default CompletableFuture> writePOJOs(Collection> p } /** - * Write multi tables multi rows data to database. + * Write multiple rows of data (which can belong to multiple tables) to the database at once. * - * @param pojos rows with multi tables + * @param pojos a collection of data to be written, classified by table * @param writeOp write operation(insert or delete) * @param ctx invoke context * @return write result @@ -76,14 +76,15 @@ default StreamWriter, WriteOk> streamWriterPOJOs(int maxPointsPerSecond) } /** + * Create a stream to continuously write data to the database, typically used in data import + * scenarios. After completion, the stream needs to be closed(Call `StreamWriter#completed()`), + * and the write result can be obtained from the database server. * Create a `Stream` to write POJO data. - * You can hold on to this `Stream` and continuously write to it. After you are finished - * writing, remember to close (call `StreamWriter#completed()`) it. *

- * It is important to note that each write operation can write a List of POJOs. - * However, the POJO objects in the List must have the same type. If you need to - * write different types of POJO objects, you can perform multiple write operations - * on the `Stream`, dividing them into separate writes when you obtain the `Stream`. + * It is important to note that each write operation can write a List of POJOs. However, + * the POJO objects in the List must have the same type. If you need to write different types + * of POJO objects, you can perform multiple write operations on the `Stream`, dividing them + * into separate writes when you obtain the `Stream`. * * @param maxPointsPerSecond The max number of points that can be written per second, * exceeding which may cause blockage. diff --git a/ingester-protocol/src/main/java/io/greptime/errors/LimitedException.java b/ingester-protocol/src/main/java/io/greptime/errors/LimitedException.java index 319ff62..425c63a 100644 --- a/ingester-protocol/src/main/java/io/greptime/errors/LimitedException.java +++ b/ingester-protocol/src/main/java/io/greptime/errors/LimitedException.java @@ -20,6 +20,7 @@ * * @author jiachun.fjc */ +@SuppressWarnings("unused") public class LimitedException extends RuntimeException { private static final long serialVersionUID = -1L; diff --git a/ingester-protocol/src/main/java/io/greptime/errors/PojoException.java b/ingester-protocol/src/main/java/io/greptime/errors/PojoException.java index 80aad98..f120f96 100644 --- a/ingester-protocol/src/main/java/io/greptime/errors/PojoException.java +++ b/ingester-protocol/src/main/java/io/greptime/errors/PojoException.java @@ -18,6 +18,7 @@ /** * @author jiachun.fjc */ +@SuppressWarnings("unused") public class PojoException extends RuntimeException { public PojoException() {} diff --git a/ingester-protocol/src/main/java/io/greptime/errors/ServerException.java b/ingester-protocol/src/main/java/io/greptime/errors/ServerException.java index 2e90522..ee04767 100644 --- a/ingester-protocol/src/main/java/io/greptime/errors/ServerException.java +++ b/ingester-protocol/src/main/java/io/greptime/errors/ServerException.java @@ -18,6 +18,7 @@ /** * @author jiachun.fjc */ +@SuppressWarnings("unused") public class ServerException extends RuntimeException { private static final long serialVersionUID = -1L; diff --git a/ingester-protocol/src/main/java/io/greptime/errors/StreamException.java b/ingester-protocol/src/main/java/io/greptime/errors/StreamException.java index 2428295..d00efe0 100644 --- a/ingester-protocol/src/main/java/io/greptime/errors/StreamException.java +++ b/ingester-protocol/src/main/java/io/greptime/errors/StreamException.java @@ -16,10 +16,11 @@ package io.greptime.errors; /** - * Error about stream-query or stream-write. + * Error about stream-write. * * @author jiachun.fjc */ +@SuppressWarnings("unused") public class StreamException extends RuntimeException { private static final long serialVersionUID = -1L; diff --git a/ingester-protocol/src/main/java/io/greptime/models/DataTypeWithExtension.java b/ingester-protocol/src/main/java/io/greptime/models/DataTypeWithExtension.java deleted file mode 100644 index 0047333..0000000 --- a/ingester-protocol/src/main/java/io/greptime/models/DataTypeWithExtension.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2023 Greptime Team - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.greptime.models; - -/** - * `DataType` with extension info, now only used for `Decimal128`. - * - * @author jiachun.fjc - */ -public class DataTypeWithExtension { - private final DataType columnDataType; - private final DataType.DecimalTypeExtension decimalTypeExtension; - - public static DataTypeWithExtension of(DataType columnDataType) { - if (columnDataType == DataType.Decimal128) { - return new DataTypeWithExtension(columnDataType, DataType.DecimalTypeExtension.DEFAULT); - } - return new DataTypeWithExtension(columnDataType, null); - } - - public static DataTypeWithExtension of(DataType columnDataType, DataType.DecimalTypeExtension decimalTypeExtension) { - if (columnDataType == DataType.Decimal128) { - return new DataTypeWithExtension(columnDataType, - decimalTypeExtension == null ? DataType.DecimalTypeExtension.DEFAULT : decimalTypeExtension); - } - return new DataTypeWithExtension(columnDataType, null); - } - - DataTypeWithExtension(DataType columnDataType, DataType.DecimalTypeExtension decimalTypeExtension) { - this.columnDataType = columnDataType; - this.decimalTypeExtension = decimalTypeExtension; - } - - public DataType getColumnDataType() { - return columnDataType; - } - - public DataType.DecimalTypeExtension getDecimalTypeExtension() { - return decimalTypeExtension; - } -} diff --git a/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java b/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java index d1af871..3e35ffa 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java +++ b/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java @@ -19,8 +19,6 @@ import io.greptime.v1.Common; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; /** * Table schema for write. @@ -29,8 +27,6 @@ */ public class TableSchema { - private static final Map TABLE_SCHEMA_CACHE = new ConcurrentHashMap<>(); - private String tableName; private List columnNames; private List semanticTypes; @@ -59,18 +55,6 @@ public List getDataTypeExtensions() { return dataTypeExtensions; } - public static TableSchema findSchema(String tableName) { - return TABLE_SCHEMA_CACHE.get(tableName); - } - - public static TableSchema removeSchema(String tableName) { - return TABLE_SCHEMA_CACHE.remove(tableName); - } - - public static void clearAllSchemas() { - TABLE_SCHEMA_CACHE.clear(); - } - public static Builder newBuilder(String tableName) { return new Builder(tableName); } @@ -99,9 +83,15 @@ public Builder addColumn(String name, SemanticType semanticType, DataType dataTy this.columnNames.add(name); this.semanticTypes.add(semanticType.toProtoValue()); this.dataTypes.add(dataType.toProtoValue()); - this.dataTypeExtensions.add(decimalTypeExtension == null ? Common.ColumnDataTypeExtension - .getDefaultInstance() : Common.ColumnDataTypeExtension.newBuilder() - .setDecimalType(decimalTypeExtension.into()).build()); + if (decimalTypeExtension == null) { + this.dataTypeExtensions.add(Common.ColumnDataTypeExtension.getDefaultInstance()); + } else { + Ensures.ensure(dataType == DataType.Decimal128, "Only decimal type can have decimal type extension"); + Common.ColumnDataTypeExtension ext = Common.ColumnDataTypeExtension.newBuilder() // + .setDecimalType(decimalTypeExtension.into()) // + .build(); + this.dataTypeExtensions.add(ext); + } return this; } @@ -128,11 +118,5 @@ public TableSchema build() { tableSchema.dataTypeExtensions = this.dataTypeExtensions; return tableSchema; } - - public TableSchema buildAndCache() { - TableSchema tableSchema = build(); - TABLE_SCHEMA_CACHE.putIfAbsent(tableSchema.tableName, tableSchema); - return tableSchema; - } } } diff --git a/ingester-protocol/src/main/java/io/greptime/models/Util.java b/ingester-protocol/src/main/java/io/greptime/models/Util.java index 7dca85b..e670c81 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/Util.java +++ b/ingester-protocol/src/main/java/io/greptime/models/Util.java @@ -101,6 +101,9 @@ static Common.Decimal128 getDecimal128Value(Common.ColumnDataTypeExtension dataT long high64Bits = unscaledValue.shiftRight(64).longValue(); long low64Bits = unscaledValue.longValue(); - return Common.Decimal128.newBuilder().setHi(high64Bits).setLo(low64Bits).build(); + return Common.Decimal128.newBuilder() // + .setHi(high64Bits) // + .setLo(low64Bits) // + .build(); } } diff --git a/ingester-protocol/src/test/java/io/greptime/TestUtil.java b/ingester-protocol/src/test/java/io/greptime/TestUtil.java index 174f848..399d633 100644 --- a/ingester-protocol/src/test/java/io/greptime/TestUtil.java +++ b/ingester-protocol/src/test/java/io/greptime/TestUtil.java @@ -19,6 +19,9 @@ import io.greptime.models.SemanticType; import io.greptime.models.Table; import io.greptime.models.TableSchema; +import io.greptime.v1.Common; +import java.math.BigDecimal; +import java.math.BigInteger; import java.util.Collection; import java.util.Collections; @@ -40,4 +43,18 @@ public static Collection testTable(String tableName, int rowCount) { } return Collections.singleton(table); } + + public static BigDecimal getDecimal(Common.Decimal128 decimal128, int scale) { + long lo = decimal128.getLo(); + BigInteger loValue = BigInteger.valueOf(lo & Long.MAX_VALUE); + if (lo < 0) { + loValue = loValue.add(BigInteger.valueOf(1).shiftLeft(63)); + } + + BigInteger unscaledValue = BigInteger.valueOf(decimal128.getHi()); + unscaledValue = unscaledValue.shiftLeft(64); + unscaledValue = unscaledValue.add(loValue); + + return new BigDecimal(unscaledValue, scale); + } } diff --git a/ingester-protocol/src/test/java/io/greptime/models/TableTest.java b/ingester-protocol/src/test/java/io/greptime/models/TableTest.java index 1d8cc11..6f82dea 100644 --- a/ingester-protocol/src/test/java/io/greptime/models/TableTest.java +++ b/ingester-protocol/src/test/java/io/greptime/models/TableTest.java @@ -15,9 +15,11 @@ */ package io.greptime.models; +import io.greptime.TestUtil; import io.greptime.v1.RowData; import org.junit.Assert; import org.junit.Test; +import java.math.BigDecimal; /** * @author jiachun.fjc @@ -30,18 +32,25 @@ public void testTableNonNull() { .addColumn("col1", SemanticType.Tag, DataType.String) // .addColumn("col2", SemanticType.Tag, DataType.String) // .addColumn("col3", SemanticType.Field, DataType.Int32) // + .addColumn("col4", SemanticType.Field, DataType.Decimal128, new DataType.DecimalTypeExtension(39, 9)) // .build(); Table.RowBasedTable table = (Table.RowBasedTable) Table.from(schema); - table.addRow("1", "11", 111) // - .addRow("2", "22", 222) // - .addRow("3", "33", 333); + table.addRow("1", "11", 111, new BigDecimal("0.1")) // + .addRow("2", "22", 222, new BigDecimal("0.2")) // + .addRow("3", "33", 333, new BigDecimal("0.3")); Assert.assertEquals(3, table.rowCount()); RowData.Rows rawRows = table.into(); Assert.assertEquals(111, rawRows.getRows(0).getValues(2).getI32Value()); Assert.assertEquals(222, rawRows.getRows(1).getValues(2).getI32Value()); Assert.assertEquals(333, rawRows.getRows(2).getValues(2).getI32Value()); + Assert.assertEquals(new BigDecimal("0.100000000"), + TestUtil.getDecimal(rawRows.getRows(0).getValues(3).getDecimal128Value(), 9)); + Assert.assertEquals(new BigDecimal("0.200000000"), + TestUtil.getDecimal(rawRows.getRows(1).getValues(3).getDecimal128Value(), 9)); + Assert.assertEquals(new BigDecimal("0.300000000"), + TestUtil.getDecimal(rawRows.getRows(2).getValues(3).getDecimal128Value(), 9)); } @Test diff --git a/ingester-protocol/src/test/java/io/greptime/models/UtilTest.java b/ingester-protocol/src/test/java/io/greptime/models/UtilTest.java index 0b7c609..8aa19a3 100644 --- a/ingester-protocol/src/test/java/io/greptime/models/UtilTest.java +++ b/ingester-protocol/src/test/java/io/greptime/models/UtilTest.java @@ -15,6 +15,7 @@ */ package io.greptime.models; +import io.greptime.TestUtil; import io.greptime.v1.Common; import org.junit.Assert; import org.junit.Test; @@ -83,17 +84,7 @@ public void testGetDecimal128Value() { BigDecimal value = new BigDecimal(bigInt, scale); Common.Decimal128 result = Util.getDecimal128Value(dataTypeExtension, value); - long lo = result.getLo(); - BigInteger loValue = BigInteger.valueOf(lo & Long.MAX_VALUE); - if (lo < 0) { - loValue = loValue.add(BigInteger.valueOf(1).shiftLeft(63)); - } - - BigInteger unscaledValue = BigInteger.valueOf(result.getHi()); - unscaledValue = unscaledValue.shiftLeft(64); - unscaledValue = unscaledValue.add(loValue); - - BigDecimal value2 = new BigDecimal(unscaledValue, scale); + BigDecimal value2 = TestUtil.getDecimal(result, scale); Assert.assertEquals(value, value2); } diff --git a/ingester-rpc/src/main/java/io/greptime/rpc/Context.java b/ingester-rpc/src/main/java/io/greptime/rpc/Context.java index 08c4154..7246750 100644 --- a/ingester-rpc/src/main/java/io/greptime/rpc/Context.java +++ b/ingester-rpc/src/main/java/io/greptime/rpc/Context.java @@ -21,15 +21,14 @@ import java.util.Set; /** - * Invoke context. + * Invoke context, it can pass some additional information to the + * database server in the form of KV. * * @author jiachun.fjc */ -@SuppressWarnings("unchecked") +@SuppressWarnings({"unchecked", "unused"}) public class Context implements Copiable { - public static final String KEY_ENDPOINT = "Endpoint"; - private final Map ctx = new HashMap<>(); /** @@ -98,7 +97,6 @@ public T remove(String key) { * @return the value * @param the type of the value */ - @SuppressWarnings("unused") public T getOrDefault(String key, T defaultValue) { synchronized (this) { return (T) this.ctx.getOrDefault(key, defaultValue); @@ -108,7 +106,6 @@ public T getOrDefault(String key, T defaultValue) { /** * Clears all key-value pairs from this {@link Context}. */ - @SuppressWarnings("unused") public void clear() { synchronized (this) { this.ctx.clear(); diff --git a/ingester-rpc/src/main/java/io/greptime/rpc/RpcOptions.java b/ingester-rpc/src/main/java/io/greptime/rpc/RpcOptions.java index 4bbabfa..2764371 100644 --- a/ingester-rpc/src/main/java/io/greptime/rpc/RpcOptions.java +++ b/ingester-rpc/src/main/java/io/greptime/rpc/RpcOptions.java @@ -23,6 +23,7 @@ * * @author jiachun.fjc */ +@SuppressWarnings("unused") public class RpcOptions implements Copiable { private boolean useRpcSharedPool = false;