Skip to content

Commit

Permalink
chore: minor refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Jan 10, 2024
1 parent d8e3766 commit e520dad
Show file tree
Hide file tree
Showing 26 changed files with 95 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.nio.ByteBuffer;

/**
* A {@code ByteString} helper.
* A {@code ByteString} helper, avoid some memory copying to improve performance.
*
* @author jiachun.fjc
*/
Expand Down
4 changes: 3 additions & 1 deletion ingester-common/src/main/java/io/greptime/common/Keys.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
package io.greptime.common;

/**
* System properties option keys
* Constant string keys.
*
* @author jiachun.fjc
*/
public final class Keys {
public static final String DB_NAME = "GreptimeDB";
public static final String VERSION_KEY = "client.version";
public static final String ID_KEY = "client.id";
public static final String NODE_KEY = "client.node";
public static final String OS_NAME = "os.name";
public static final String USE_OS_SIGNAL = "greptimedb.use_os_signal";
public static final String AVAILABLE_CPUS = "greptimedb.available_cpus";
Expand Down
33 changes: 0 additions & 33 deletions ingester-common/src/main/java/io/greptime/common/Streamable.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

/**
* File signal.
* <p>
* Adopt the method of creating files with specified names to interact
* with the Client process and implement signal transmission, achieve the
* purpose of controlling the process to output specified content through this.
*
* @author jiachun.fjc
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public DirectExecutor(String name) {
this.executeTimer = MetricsUtil.timer("direct_executor_timer", name);
}

@SuppressWarnings("NullableProblems")
@Override
public void execute(Runnable cmd) {
this.executeTimer.time(cmd);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,10 @@ public class LogScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor
private final int corePoolSize;
private final String name;

public LogScheduledThreadPoolExecutor(int corePoolSize, String name) {
super(corePoolSize);
this.corePoolSize = corePoolSize;
this.name = name;
}

public LogScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, String name) {
super(corePoolSize, threadFactory);
this.corePoolSize = corePoolSize;
this.name = name;
}

public LogScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler, String name) {
super(corePoolSize, handler);
this.corePoolSize = corePoolSize;
this.name = name;
}

public LogScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory,
RejectedExecutionHandler handler, String name) {
public LogScheduledThreadPoolExecutor(int corePoolSize, //
ThreadFactory threadFactory, //
RejectedExecutionHandler handler, //
String name) {
super(corePoolSize, threadFactory, handler);
this.corePoolSize = corePoolSize;
this.name = name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@ public class LogThreadPoolExecutor extends ThreadPoolExecutor {
private final int maximumPoolSize;
private final String name;

public LogThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler,
public LogThreadPoolExecutor(int corePoolSize, //
int maximumPoolSize, //
long keepAliveTime, //
TimeUnit unit, //
BlockingQueue<Runnable> workQueue, //
ThreadFactory threadFactory, //
RejectedExecutionHandler handler, //
String name) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.corePoolSize = corePoolSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
*/
public class MetricScheduledThreadPoolExecutor extends LogScheduledThreadPoolExecutor {

public MetricScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory,
RejectedExecutionHandler handler, String name) {
public MetricScheduledThreadPoolExecutor(int corePoolSize, //
ThreadFactory threadFactory, //
RejectedExecutionHandler handler, //
String name) {
super(corePoolSize, threadFactory, handler, name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static String emptyToNull(String string) {
* Returns {@code true} if the given string is null or is the empty string.
*/
public static boolean isNullOrEmpty(String str) {
return str == null || str.length() == 0;
return str == null || str.isEmpty();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

/**
* ClientInterceptor that enforces per service and/or per method concurrent
* request limits and returns a Status.UNAVAILABLE when that limit has been
* request limits and returns a `Status.UNAVAILABLE` when that limit has been
* reached.
* <p>
* Refer to `concurrency-limit-grpc`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT
private static final class HeaderAttachingClientCall<ReqT, RespT> extends
ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT> {

// Non private to avoid synthetic class
HeaderAttachingClientCall(ClientCall<ReqT, RespT> delegate) {
super(delegate);
}
Expand Down
11 changes: 7 additions & 4 deletions ingester-protocol/src/main/java/io/greptime/GreptimeDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.codahale.metrics.Meter;
import io.greptime.common.Display;
import io.greptime.common.Endpoint;
import io.greptime.common.Keys;
import io.greptime.common.Lifecycle;
import io.greptime.common.signal.SignalHandlersLoader;
import io.greptime.common.util.MetricsUtil;
Expand All @@ -40,6 +41,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
Expand All @@ -55,11 +57,11 @@ public class GreptimeDB implements Write, WritePOJO, Lifecycle<GreptimeOptions>,

private static final Logger LOG = LoggerFactory.getLogger(GreptimeDB.class);


private static final Map<Integer, GreptimeDB> INSTANCES = new ConcurrentHashMap<>();
private static final AtomicInteger ID = new AtomicInteger(0);
private static final String ID_KEY = "greptimedb.client.id";
private static final String VERSION_KEY = "greptimedb.client.version";
private static final String VERSION = Util.clientVersion();
private static final String NODE_ID = UUID.randomUUID().toString();

private final int id;
private final AtomicBoolean started = new AtomicBoolean(false);
Expand Down Expand Up @@ -215,8 +217,9 @@ public String toString() {

private Context attachCtx(Context ctx) {
Context newCtx = ctx == null ? Context.newDefault() : ctx;
return newCtx.with(ID_KEY, this.id) //
.with(VERSION_KEY, VERSION);
return newCtx.with(Keys.VERSION_KEY, VERSION) //
.with(Keys.NODE_KEY, NODE_ID) //
.with(Keys.ID_KEY, this.id);
}

private static RpcClient makeRpcClient(GreptimeOptions opts) {
Expand Down
13 changes: 0 additions & 13 deletions ingester-protocol/src/main/java/io/greptime/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,6 @@ public static <U> CompletableFuture<U> completedCf(U value) {
return CompletableFuture.completedFuture(value);
}

/**
* Returns a new CompletableFuture that is already exceptionally with the given
* error.
*
* @param t the given exception
* @param <U> the type of the value
* @return the exceptionally {@link CompletableFuture}
*/
public static <U> CompletableFuture<U> errorCf(Throwable t) {
final CompletableFuture<U> err = new CompletableFuture<>();
err.completeExceptionally(t);
return err;
}

public static <V> Observer<V> toObserver(CompletableFuture<V> future) {
return new Observer<V>() {
Expand Down
17 changes: 9 additions & 8 deletions ingester-protocol/src/main/java/io/greptime/WritePOJO.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ default CompletableFuture<Result<WriteOk, Err>> writePOJOs(Collection<List<?>> p
}

/**
* Write multi tables multi rows data to database.
* Write multiple rows of data (which can belong to multiple tables) to the database at once.
*
* @param pojos rows with multi tables
* @param pojos a collection of data to be written, classified by table
* @param writeOp write operation(insert or delete)
* @param ctx invoke context
* @return write result
Expand All @@ -76,14 +76,15 @@ default StreamWriter<List<?>, WriteOk> streamWriterPOJOs(int maxPointsPerSecond)
}

/**
* Create a stream to continuously write data to the database, typically used in data import
* scenarios. After completion, the stream needs to be closed(Call `StreamWriter#completed()`),
* and the write result can be obtained from the database server.
* Create a `Stream` to write POJO data.
* You can hold on to this `Stream` and continuously write to it. After you are finished
* writing, remember to close (call `StreamWriter#completed()`) it.
* <p>
* It is important to note that each write operation can write a List of POJOs.
* However, the POJO objects in the List must have the same type. If you need to
* write different types of POJO objects, you can perform multiple write operations
* on the `Stream`, dividing them into separate writes when you obtain the `Stream`.
* It is important to note that each write operation can write a List of POJOs. However,
* the POJO objects in the List must have the same type. If you need to write different types
* of POJO objects, you can perform multiple write operations on the `Stream`, dividing them
* into separate writes when you obtain the `Stream`.
*
* @param maxPointsPerSecond The max number of points that can be written per second,
* exceeding which may cause blockage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*
* @author jiachun.fjc
*/
@SuppressWarnings("unused")
public class LimitedException extends RuntimeException {

private static final long serialVersionUID = -1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
/**
* @author jiachun.fjc
*/
@SuppressWarnings("unused")
public class PojoException extends RuntimeException {

public PojoException() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
/**
* @author jiachun.fjc
*/
@SuppressWarnings("unused")
public class ServerException extends RuntimeException {

private static final long serialVersionUID = -1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
package io.greptime.errors;

/**
* Error about stream-query or stream-write.
* Error about stream-write.
*
* @author jiachun.fjc
*/
@SuppressWarnings("unused")
public class StreamException extends RuntimeException {

private static final long serialVersionUID = -1L;
Expand Down

This file was deleted.

Loading

0 comments on commit e520dad

Please sign in to comment.