Skip to content

Commit

Permalink
chore: minor refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Dec 26, 2023
1 parent ad75b23 commit 11571db
Show file tree
Hide file tree
Showing 12 changed files with 94 additions and 148 deletions.
8 changes: 4 additions & 4 deletions ingester-protocol/src/main/java/io/greptime/GreptimeDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ public StreamWriter<List<?>, WriteOk> streamWriterPOJOs(int maxPointsPerSecond,
return new StreamWriter<List<?>, WriteOk>() {
@Override
public StreamWriter<List<?>, WriteOk> write(List<?> val, WriteOp writeOp) {
Table rows = pojoMapper.toTableData(val);
delegate.write(rows, writeOp);
Table table = pojoMapper.toTableData(val);
delegate.write(table, writeOp);
return this;
}

Expand All @@ -165,9 +165,9 @@ public CompletableFuture<WriteOk> completed() {
}

@Override
public CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> rows, WriteOp writeOp, Context ctx) {
public CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> tables, WriteOp writeOp, Context ctx) {
ensureInitialized();
return this.writeClient.write(rows, writeOp, attachCtx(ctx));
return this.writeClient.write(tables, writeOp, attachCtx(ctx));
}

@Override
Expand Down
14 changes: 7 additions & 7 deletions ingester-protocol/src/main/java/io/greptime/Write.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,26 @@ public interface Write {
/**
* @see #write(Collection, WriteOp, Context)
*/
default CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> rows) {
return write(rows, WriteOp.Insert, Context.newDefault());
default CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> tables) {
return write(tables, WriteOp.Insert, Context.newDefault());
}

/**
* @see #write(Collection, WriteOp, Context)
*/
default CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> rows, WriteOp writeOp) {
return write(rows, writeOp, Context.newDefault());
default CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> tables, WriteOp writeOp) {
return write(tables, writeOp, Context.newDefault());
}

/**
* Write multi tables multi rows data to database.
* Write multi tables multi tables data to database.
*
* @param rows rows with multi tables
* @param tables rows with multi tables
* @param writeOp write operation(insert or delete)
* @param ctx invoke context
* @return write result
*/
CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> rows, WriteOp writeOp, Context ctx);
CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> tables, WriteOp writeOp, Context ctx);

/**
* @see #streamWriter(int, Context)
Expand Down
52 changes: 25 additions & 27 deletions ingester-protocol/src/main/java/io/greptime/WriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import io.greptime.models.WriteOk;
import io.greptime.models.Table;
import io.greptime.models.TableHelper;
import io.greptime.models.WriteTable;
import io.greptime.models.WriteTables;
import io.greptime.options.WriteOptions;
import io.greptime.rpc.Context;
import io.greptime.rpc.Observer;
Expand Down Expand Up @@ -79,13 +79,13 @@ public void shutdownGracefully() {
}

@Override
public CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> rows, WriteOp writeOp, Context ctx) {
Ensures.ensureNonNull(rows, "null `rows`");
Ensures.ensure(!rows.isEmpty(), "empty `rows`");
public CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> tables, WriteOp writeOp, Context ctx) {
Ensures.ensureNonNull(tables, "null `rows`");
Ensures.ensure(!tables.isEmpty(), "empty `rows`");

long startCall = Clock.defaultClock().getTick();

return this.writeLimiter.acquireAndDo(rows, () -> write0(rows, writeOp, ctx, 0).whenCompleteAsync((r, e) -> {
WriteTables writeTables = new WriteTables(tables, writeOp);
return this.writeLimiter.acquireAndDo(tables, () -> write0(writeTables, ctx, 0).whenCompleteAsync((r, e) -> {
InnerMetricHelper.writeQps().mark();
if (r != null) {
if (Util.isRwLogging()) {
Expand Down Expand Up @@ -118,11 +118,11 @@ public StreamWriter<Table, WriteOk> streamWriter(int maxPointsPerSecond, Context
.thenApply(reqObserver -> new RateLimitingStreamWriter(reqObserver, permitsPerSecond) {

@Override
public StreamWriter<Table, WriteOk> write(Table rows, WriteOp writeOp) {
public StreamWriter<Table, WriteOk> write(Table table, WriteOp writeOp) {
if (respFuture.isCompletedExceptionally()) {
respFuture.getNow(null); // throw the exception now
}
return super.write(rows, writeOp); // may wait
return super.write(table, writeOp); // may wait
}

@Override
Expand All @@ -133,11 +133,11 @@ public CompletableFuture<WriteOk> completed() {
}).join();
}

private CompletableFuture<Result<WriteOk, Err>> write0(Collection<Table> rows, WriteOp writeOp, Context ctx, int retries) {
private CompletableFuture<Result<WriteOk, Err>> write0(WriteTables writeTables, Context ctx, int retries) {
InnerMetricHelper.writeByRetries(retries).mark();

return this.routerClient.route()
.thenComposeAsync(endpoint -> writeTo(endpoint, rows, writeOp, ctx, retries), this.asyncPool)
.thenComposeAsync(endpoint -> writeTo(endpoint, writeTables, ctx, retries), this.asyncPool)
.thenComposeAsync(r -> {
if (r.isOk()) {
LOG.debug("Success to write to {}, ok={}.", Keys.DB_NAME, r.getOk());
Expand All @@ -155,14 +155,14 @@ private CompletableFuture<Result<WriteOk, Err>> write0(Collection<Table> rows, W
return Util.completedCf(r);
}

return write0(rows, writeOp, ctx, retries + 1);
return write0(writeTables, ctx, retries + 1);
}, this.asyncPool);
}

private CompletableFuture<Result<WriteOk, Err>> writeTo(Endpoint endpoint, Collection<Table> rows, WriteOp writeOp, Context ctx, int retries) {
private CompletableFuture<Result<WriteOk, Err>> writeTo(Endpoint endpoint, WriteTables writeTables, Context ctx, int retries) {
String database = this.opts.getDatabase();
AuthInfo authInfo = this.opts.getAuthInfo();
Database.GreptimeRequest req = TableHelper.toGreptimeRequest(rows, writeOp, database, authInfo);
Database.GreptimeRequest req = TableHelper.toGreptimeRequest(writeTables, database, authInfo);
ctx.with("retries", retries);

CompletableFuture<Database.GreptimeResponse> future = this.routerClient.invoke(endpoint, req, ctx);
Expand All @@ -175,12 +175,12 @@ private CompletableFuture<Result<WriteOk, Err>> writeTo(Endpoint endpoint, Colle
int affectedRows = resp.getAffectedRows().getValue();
return WriteOk.ok(affectedRows, 0).mapToResult();
} else {
return Err.writeErr(statusCode, new ServerException(status.getErrMsg()), endpoint, rows).mapToResult();
return Err.writeErr(statusCode, new ServerException(status.getErrMsg()), endpoint).mapToResult();
}
}, this.asyncPool);
}

private Observer<WriteTable> streamWriteTo(Endpoint endpoint, Context ctx, Observer<WriteOk> respObserver) {
private Observer<WriteTables> streamWriteTo(Endpoint endpoint, Context ctx, Observer<WriteOk> respObserver) {
Observer<Database.GreptimeRequest> rpcObserver =
this.routerClient.invokeClientStreaming(endpoint, Database.GreptimeRequest.getDefaultInstance(), ctx,
new Observer<Database.GreptimeResponse>() {
Expand All @@ -207,15 +207,13 @@ public void onCompleted() {
}
});

return new Observer<WriteTable>() {
return new Observer<WriteTables>() {

@Override
public void onNext(WriteTable writeTable) {
Table rows = writeTable.getRows();
WriteOp writeOp = writeTable.getWriteOp();
public void onNext(WriteTables writeTables) {
String database = WriteClient.this.opts.getDatabase();
AuthInfo authInfo = WriteClient.this.opts.getAuthInfo();
Database.GreptimeRequest req = TableHelper.toGreptimeRequest(rows, writeOp, database, authInfo);
Database.GreptimeRequest req = TableHelper.toGreptimeRequest(writeTables, database, authInfo);
rpcObserver.onNext(req);
}

Expand Down Expand Up @@ -317,17 +315,17 @@ public Result<WriteOk, Err> rejected(Collection<Table> in, RejectedState state)
state.acquirePermits(), //
state.maxPermits(), //
state.availablePermits());
return Result.err(Err.writeErr(Result.FLOW_CONTROL, new LimitedException(errMsg), null, in));
return Result.err(Err.writeErr(Result.FLOW_CONTROL, new LimitedException(errMsg), null));
}
}

@SuppressWarnings("UnstableApiUsage")
static abstract class RateLimitingStreamWriter implements StreamWriter<Table, WriteOk> {

private final Observer<WriteTable> observer;
private final Observer<WriteTables> observer;
private final RateLimiter rateLimiter;

RateLimitingStreamWriter(Observer<WriteTable> observer, double permitsPerSecond) {
RateLimitingStreamWriter(Observer<WriteTables> observer, double permitsPerSecond) {
this.observer = observer;
if (permitsPerSecond > 0) {
this.rateLimiter = RateLimiter.create(permitsPerSecond);
Expand All @@ -337,14 +335,14 @@ static abstract class RateLimitingStreamWriter implements StreamWriter<Table, Wr
}

@Override
public StreamWriter<Table, WriteOk> write(Table rows, WriteOp writeOp) {
Ensures.ensureNonNull(rows, "null `rows`");
public StreamWriter<Table, WriteOk> write(Table table, WriteOp writeOp) {
Ensures.ensureNonNull(table, "null `table`");

if (this.rateLimiter != null) {
double timeSpent = this.rateLimiter.acquire(rows.pointCount());
double timeSpent = this.rateLimiter.acquire(table.pointCount());
InnerMetricHelper.writeStreamLimiterTimeSpent().update((long) timeSpent);
}
this.observer.onNext(new WriteTable(rows, writeOp));
this.observer.onNext(new WriteTables(table, writeOp));
return this;
}
}
Expand Down
41 changes: 1 addition & 40 deletions ingester-protocol/src/main/java/io/greptime/models/Err.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ public class Err {
private Throwable error;
// the server address where the error occurred
private Endpoint errTo;
// the data of wrote failed, can be used to retry
private Collection<Table> rowsFailed;
// the QL failed to query
private String failedQl;

/**
* Returns the error code.
Expand All @@ -56,20 +52,6 @@ public Endpoint getErrTo() {
return errTo;
}

/**
* Returns the data of wrote failed, can be used to retry.
*/
public Collection<Table> getRowsFailed() {
return rowsFailed;
}

/**
* Returns the QL failed to query.
*/
public String getFailedQl() {
return failedQl;
}

/**
* Returns a {@link Result} containing this error.
*/
Expand All @@ -83,7 +65,6 @@ public String toString() {
"code=" + code + //
", error='" + error + '\'' + //
", errTo=" + errTo + //
", failedQl=" + failedQl + //
'}';
}

Expand All @@ -93,33 +74,13 @@ public String toString() {
* @param code the error code
* @param error the error
* @param errTo the server address where the error occurred
* @param rowsFailed the data of wrote failed, can be used to retry
* @return a new {@link Err} for write error
*/
public static Err writeErr(int code, Throwable error, Endpoint errTo, Collection<Table> rowsFailed) {
Err err = new Err();
err.code = code;
err.error = error;
err.errTo = errTo;
err.rowsFailed = rowsFailed;
return err;
}

/**
* Creates a new {@link Err} for query error.
*
* @param code the error code
* @param error the error
* @param errTo the server address where the error occurred
* @param failedQl the QL failed to query
* @return a new {@link Err} for query error
*/
public static Err queryErr(int code, Throwable error, Endpoint errTo, String failedQl) {
public static Err writeErr(int code, Throwable error, Endpoint errTo) {
Err err = new Err();
err.code = code;
err.error = error;
err.errTo = errTo;
err.failedQl = failedQl;
return err;
}
}
24 changes: 12 additions & 12 deletions ingester-protocol/src/main/java/io/greptime/models/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,28 +109,28 @@ public Table build() {
Ensures.ensure(columnCount == dataTypes.size(), "Column names size not equal to data types size");
Ensures.ensure(columnCount == dataTypeExtensions.size(), "Column names size not equal to data type extensions size");

return buildRow(tableName, columnCount, columnNames, semanticTypes, dataTypes, dataTypeExtensions);
return buildTable(tableName, columnCount, columnNames, semanticTypes, dataTypes, dataTypeExtensions);
}

private static Table buildRow(String tableName, //
int columnCount, //
List<String> columnNames, //
List<Common.SemanticType> semanticTypes, //
List<Common.ColumnDataType> dataTypes, //
List<Common.ColumnDataTypeExtension> dataTypeExtensions) {
RowBasedTable rows = new RowBasedTable();
rows.tableName = tableName;
rows.columnSchemas = new ArrayList<>(columnCount);
private static Table buildTable(String tableName, //
int columnCount, //
List<String> columnNames, //
List<Common.SemanticType> semanticTypes, //
List<Common.ColumnDataType> dataTypes, //
List<Common.ColumnDataTypeExtension> dataTypeExtensions) {
RowBasedTable table = new RowBasedTable();
table.tableName = tableName;
table.columnSchemas = new ArrayList<>(columnCount);

for (int i = 0; i < columnCount; i++) {
RowData.ColumnSchema.Builder builder = RowData.ColumnSchema.newBuilder();
builder.setColumnName(columnNames.get(i)) //
.setSemanticType(semanticTypes.get(i)) //
.setDatatype(dataTypes.get(i)) //
.setDatatypeExtension(dataTypeExtensions.get(i));
rows.columnSchemas.add(builder.build());
table.columnSchemas.add(builder.build());
}
return rows;
return table;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,7 @@
*/
public class TableHelper {

public static Database.GreptimeRequest toGreptimeRequest(Table rows, //
WriteOp writeOp, //
String database, //
AuthInfo authInfo) {
return toGreptimeRequest(Collections.singleton(rows), writeOp, database, authInfo);
}

public static Database.GreptimeRequest toGreptimeRequest(Collection<Table> rows, //
WriteOp writeOp, //
String database, //
AuthInfo authInfo) {
public static Database.GreptimeRequest toGreptimeRequest(WriteTables writeTables, String database, AuthInfo authInfo) {
Common.RequestHeader.Builder headerBuilder = Common.RequestHeader.newBuilder();
if (database != null) {
headerBuilder.setDbname(database);
Expand All @@ -45,20 +35,23 @@ public static Database.GreptimeRequest toGreptimeRequest(Collection<Table> rows,
headerBuilder.setAuthorization(authInfo.into());
}

Collection<Table> tables = writeTables.getTables();
WriteOp writeOp = writeTables.getWriteOp();

switch (writeOp) {
case Insert:
Database.RowInsertRequests.Builder insertBuilder = Database.RowInsertRequests.newBuilder();
for (Table r : rows) {
insertBuilder.addInserts(r.intoRowInsertRequest());
for (Table t : tables) {
insertBuilder.addInserts(t.intoRowInsertRequest());
}
return Database.GreptimeRequest.newBuilder() //
.setHeader(headerBuilder.build()) //
.setRowInserts(insertBuilder.build()) //
.build();
case Delete:
Database.RowDeleteRequests.Builder deleteBuilder = Database.RowDeleteRequests.newBuilder();
for (Table r : rows) {
deleteBuilder.addDeletes(r.intoRowDeleteRequest());
for (Table t : tables) {
deleteBuilder.addDeletes(t.intoRowDeleteRequest());
}
return Database.GreptimeRequest.newBuilder() //
.setHeader(headerBuilder.build()) //
Expand Down
Loading

0 comments on commit 11571db

Please sign in to comment.