diff --git a/ingester-example/src/main/java/io/greptime/TestConnector.java b/ingester-example/src/main/java/io/greptime/TestConnector.java
index 01d8197..471a553 100644
--- a/ingester-example/src/main/java/io/greptime/TestConnector.java
+++ b/ingester-example/src/main/java/io/greptime/TestConnector.java
@@ -54,9 +54,9 @@ public static GreptimeDB connectToDefaultDB() {
.writeMaxRetries(1)
// Optional, the default value is fine.
//
- // Write flow limit: maximum number of data rows in-flight. It does not take effect on `StreamWriter`
- // The default is 65536
- .maxInFlightWriteRows(65536)
+ // Write flow limit: maximum number of data points in-flight. It does not take effect on `StreamWriter`
+ // The default is 10 * 65536
+ .maxInFlightWritePoints(10 * 65536)
// Optional, the default value is fine.
//
// Write flow limit: the policy to use when the write flow limit is exceeded.
diff --git a/ingester-protocol/src/main/java/io/greptime/WriteClient.java b/ingester-protocol/src/main/java/io/greptime/WriteClient.java
index 347253b..4cfc509 100644
--- a/ingester-protocol/src/main/java/io/greptime/WriteClient.java
+++ b/ingester-protocol/src/main/java/io/greptime/WriteClient.java
@@ -73,7 +73,8 @@ public boolean init(WriteOptions opts) {
Executor pool = this.opts.getAsyncPool();
this.asyncPool = pool != null ? pool : new SerializingExecutor("write_client");
this.asyncPool = new MetricExecutor(this.asyncPool, "async_write_pool.time");
- this.writeLimiter = new DefaultWriteLimiter(this.opts.getMaxInFlightWriteRows(), this.opts.getLimitedPolicy());
+ this.writeLimiter =
+ new DefaultWriteLimiter(this.opts.getMaxInFlightWritePoints(), this.opts.getLimitedPolicy());
return true;
}
@@ -313,7 +314,7 @@ public DefaultWriteLimiter(int maxInFlight, LimitedPolicy policy) {
@Override
public int calculatePermits(Collection
in) {
- return in.stream().map(Table::rowCount).reduce(0, Integer::sum);
+ return in.stream().map(Table::pointCount).reduce(0, Integer::sum);
}
@Override
diff --git a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java
index 9a3ff81..43f7597 100644
--- a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java
+++ b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java
@@ -145,8 +145,8 @@ public static final class Builder {
// Rpc options, in general the default configuration is fine.
private RpcOptions rpcOptions = RpcOptions.newDefault();
private int writeMaxRetries = 1;
- // Write flow limit: maximum number of data rows in-flight.
- private int maxInFlightWriteRows = 65536;
+ // Write flow limit: maximum number of data points in-flight.
+ private int maxInFlightWritePoints = 10 * 65536;
private LimitedPolicy writeLimitedPolicy = LimitedPolicy.defaultWriteLimitedPolicy();
private int defaultStreamMaxWritePointsPerSecond = 10 * 65536;
// Refresh frequency of route tables. The background refreshes all route tables periodically.
@@ -202,13 +202,13 @@ public Builder writeMaxRetries(int maxRetries) {
}
/**
- * Write flow limit: maximum number of data rows in-flight.
+ * Write flow limit: maximum number of data points in-flight.
*
- * @param maxInFlightWriteRows max in-flight rows
+ * @param maxInFlightWritePoints max in-flight points
* @return this builder
*/
- public Builder maxInFlightWriteRows(int maxInFlightWriteRows) {
- this.maxInFlightWriteRows = maxInFlightWriteRows;
+ public Builder maxInFlightWritePoints(int maxInFlightWritePoints) {
+ this.maxInFlightWritePoints = maxInFlightWritePoints;
return this;
}
@@ -308,7 +308,7 @@ private WriteOptions writeOptions() {
writeOpts.setAuthInfo(this.authInfo);
writeOpts.setAsyncPool(this.asyncPool);
writeOpts.setMaxRetries(this.writeMaxRetries);
- writeOpts.setMaxInFlightWriteRows(this.maxInFlightWriteRows);
+ writeOpts.setMaxInFlightWritePoints(this.maxInFlightWritePoints);
writeOpts.setLimitedPolicy(this.writeLimitedPolicy);
writeOpts.setDefaultStreamMaxWritePointsPerSecond(this.defaultStreamMaxWritePointsPerSecond);
return writeOpts;
diff --git a/ingester-protocol/src/main/java/io/greptime/options/WriteOptions.java b/ingester-protocol/src/main/java/io/greptime/options/WriteOptions.java
index cdfd7de..6449a94 100644
--- a/ingester-protocol/src/main/java/io/greptime/options/WriteOptions.java
+++ b/ingester-protocol/src/main/java/io/greptime/options/WriteOptions.java
@@ -33,7 +33,7 @@ public class WriteOptions implements Copiable {
private Executor asyncPool;
private int maxRetries = 1;
// Write flow limit: maximum number of data rows in-flight.
- private int maxInFlightWriteRows = 65536;
+ private int maxInFlightWritePoints = 10 * 65536;
private LimitedPolicy limitedPolicy = LimitedPolicy.defaultWriteLimitedPolicy();
// Default rate limit for stream writer
private int defaultStreamMaxWritePointsPerSecond = 10 * 65536;
@@ -78,12 +78,12 @@ public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
}
- public int getMaxInFlightWriteRows() {
- return maxInFlightWriteRows;
+ public int getMaxInFlightWritePoints() {
+ return maxInFlightWritePoints;
}
- public void setMaxInFlightWriteRows(int maxInFlightWriteRows) {
- this.maxInFlightWriteRows = maxInFlightWriteRows;
+ public void setMaxInFlightWritePoints(int maxInFlightWritePoints) {
+ this.maxInFlightWritePoints = maxInFlightWritePoints;
}
public LimitedPolicy getLimitedPolicy() {
@@ -110,7 +110,7 @@ public WriteOptions copy() {
opts.routerClient = this.routerClient;
opts.asyncPool = this.asyncPool;
opts.maxRetries = this.maxRetries;
- opts.maxInFlightWriteRows = this.maxInFlightWriteRows;
+ opts.maxInFlightWritePoints = this.maxInFlightWritePoints;
opts.limitedPolicy = this.limitedPolicy;
opts.defaultStreamMaxWritePointsPerSecond = this.defaultStreamMaxWritePointsPerSecond;
return opts;
@@ -124,7 +124,7 @@ public String toString() {
", routerClient=" + routerClient + //
", asyncPool=" + asyncPool + //
", maxRetries=" + maxRetries + //
- ", maxInFlightWriteRows=" + maxInFlightWriteRows + //
+ ", maxInFlightWritePoints=" + maxInFlightWritePoints + //
", limitedPolicy=" + limitedPolicy + //
", defaultStreamMaxWritePointsPerSecond=" + defaultStreamMaxWritePointsPerSecond + //
'}';
diff --git a/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java
index 1b182b8..6ecf8dc 100644
--- a/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java
+++ b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java
@@ -38,7 +38,7 @@ public void testAllOptions() {
Executor asyncPool = command -> System.out.println("asyncPool");
RpcOptions rpcOptions = RpcOptions.newDefault();
int writeMaxRetries = 2;
- int maxInFlightWriteRows = 999;
+ int maxInFlightWritePoints = 9990;
LimitedPolicy limitedPolicy = new LimitedPolicy.DiscardPolicy();
int defaultStreamMaxWritePointsPerSecond = 100000;
long routeTableRefreshPeriodSeconds = 99;
@@ -49,7 +49,7 @@ public void testAllOptions() {
.asyncPool(asyncPool) //
.rpcOptions(rpcOptions) //
.writeMaxRetries(writeMaxRetries) //
- .maxInFlightWriteRows(maxInFlightWriteRows) //
+ .maxInFlightWritePoints(maxInFlightWritePoints) //
.writeLimitedPolicy(limitedPolicy) //
.defaultStreamMaxWritePointsPerSecond(defaultStreamMaxWritePointsPerSecond) //
.routeTableRefreshPeriodSeconds(routeTableRefreshPeriodSeconds) //
@@ -71,7 +71,7 @@ public void testAllOptions() {
Assert.assertNotNull(writeOptions);
Assert.assertEquals(asyncPool, writeOptions.getAsyncPool());
Assert.assertEquals(writeMaxRetries, writeOptions.getMaxRetries());
- Assert.assertEquals(maxInFlightWriteRows, writeOptions.getMaxInFlightWriteRows());
+ Assert.assertEquals(maxInFlightWritePoints, writeOptions.getMaxInFlightWritePoints());
Assert.assertEquals(limitedPolicy, writeOptions.getLimitedPolicy());
Assert.assertEquals(defaultStreamMaxWritePointsPerSecond, writeOptions.getDefaultStreamMaxWritePointsPerSecond());
Assert.assertEquals(authInfo, writeOptions.getAuthInfo());