diff --git a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java index 5f7a527..adc6a38 100644 --- a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java +++ b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java @@ -53,7 +53,7 @@ /** * The GreptimeDB client. */ -public class GreptimeDB implements Write, WriteObject, Lifecycle, Display { +public class GreptimeDB implements Write, WriteObject, Lifecycle, Health, Display { private static final Logger LOG = LoggerFactory.getLogger(GreptimeDB.class); @@ -177,6 +177,11 @@ public StreamWriter streamWriter(int maxPointsPerSecond, Context return this.writeClient.streamWriter(maxPointsPerSecond, attachCtx(ctx)); } + @Override + public CompletableFuture> checkHealth() { + return this.routerClient.checkHealth(); + } + @Override public void display(Printer out) { out.println("--- GreptimeDB Client ---") diff --git a/ingester-protocol/src/main/java/io/greptime/Health.java b/ingester-protocol/src/main/java/io/greptime/Health.java new file mode 100644 index 0000000..5b93b77 --- /dev/null +++ b/ingester-protocol/src/main/java/io/greptime/Health.java @@ -0,0 +1,29 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.greptime; + +import io.greptime.common.Endpoint; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Health check. It just like to probe the database and connections. + * Users can use this status to perform fault tolerance and disaster recovery actions. + */ +public interface Health { + CompletableFuture> checkHealth(); +} diff --git a/ingester-protocol/src/main/java/io/greptime/Router.java b/ingester-protocol/src/main/java/io/greptime/Router.java index 88b32af..6731148 100644 --- a/ingester-protocol/src/main/java/io/greptime/Router.java +++ b/ingester-protocol/src/main/java/io/greptime/Router.java @@ -33,17 +33,10 @@ public interface Router { CompletableFuture routeFor(R request); /** - * Refresh the routing table from remote server. - * @return a future that will be completed when the refresh is done - */ - CompletableFuture refresh(); - - /** - * Refresh the routing table. - * We need to get all the endpoints, and this method will overwrite all - * current endpoints. + * Refresh the routing table. By health checker or service discovery. * - * @param endpoints all new endpoints + * @param activities all activities endpoints + * @param inactivities all inactivities endpoints */ - void onRefresh(List endpoints); + void onRefresh(List activities, List inactivities); } diff --git a/ingester-protocol/src/main/java/io/greptime/RouterClient.java b/ingester-protocol/src/main/java/io/greptime/RouterClient.java index ed7756f..f39736e 100644 --- a/ingester-protocol/src/main/java/io/greptime/RouterClient.java +++ b/ingester-protocol/src/main/java/io/greptime/RouterClient.java @@ -25,12 +25,18 @@ import io.greptime.rpc.Context; import io.greptime.rpc.Observer; import io.greptime.rpc.RpcClient; +import io.greptime.v1.Health.HealthCheckRequest; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,12 +44,14 @@ * A route rpc client which cached the routing table information locally * and will auto refresh. */ -public class RouterClient implements Lifecycle, Display { +public class RouterClient implements Lifecycle, Health, Display { private static final Logger LOG = LoggerFactory.getLogger(RouterClient.class); private static final SharedScheduledPool REFRESHER_POOL = Util.getSharedScheduledPool("route_cache_refresher", 1); + private final AtomicLong refreshSequencer = new AtomicLong(0); + private ScheduledExecutorService refresher; private RouterOptions opts; private RpcClient rpcClient; @@ -57,19 +65,44 @@ public boolean init(RouterOptions opts) { List endpoints = Ensures.ensureNonNull(this.opts.getEndpoints(), "null `endpoints`"); this.router = new DefaultRouter(); - this.router.onRefresh(endpoints); + this.router.onRefresh(endpoints, null); long refreshPeriod = this.opts.getRefreshPeriodSeconds(); if (refreshPeriod > 0) { this.refresher = REFRESHER_POOL.getObject(); this.refresher.scheduleWithFixedDelay( - () -> this.router.refresh().whenComplete((r, e) -> { - if (e != null) { - LOG.error("Router cache refresh failed.", e); - } else { - LOG.debug("Router cache refresh {}.", r ? "success" : "failed"); - } - }), + () -> { + long thisSequence = this.refreshSequencer.incrementAndGet(); + checkHealth().whenComplete((r, t) -> { + if (t != null) { + LOG.warn("Failed to check health", t); + return; + } + + synchronized (this.refreshSequencer) { + // If the next task has started, we will ignore the current result. + // + // I don't want to worry about the overflow issue with long anymore, + // because assuming one increment per second, it will take 292 years + // to overflow. I think that's sufficient. + if (thisSequence < this.refreshSequencer.get()) { + LOG.warn("Skip outdated health check result, sequence: {}", thisSequence); + return; + } + + List activities = new ArrayList<>(); + List inactivities = new ArrayList<>(); + for (Map.Entry entry : r.entrySet()) { + if (entry.getValue()) { + activities.add(entry.getKey()); + } else { + inactivities.add(entry.getKey()); + } + } + this.router.onRefresh(activities, inactivities); + } + }); + }, Util.randomInitialDelay(180), refreshPeriod, TimeUnit.SECONDS); @@ -204,6 +237,22 @@ public String toString() { return "RouterClient{" + "refresher=" + refresher + ", opts=" + opts + ", rpcClient=" + rpcClient + '}'; } + @Override + public CompletableFuture> checkHealth() { + Map> futures = this.opts.getEndpoints().stream() + .collect(Collectors.toMap(Function.identity(), endpoint -> { + HealthCheckRequest req = HealthCheckRequest.newBuilder().build(); + return this.invoke(endpoint, req, Context.newDefault()) + .thenApply(resp -> true) + .exceptionally(t -> false); // Handle failure and return false + })); + + return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])) + .thenApply( + ok -> futures.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() + .join()))); + } + /** * Request to a `frontend` server, which needs to return all members(frontend server), * or it can return only one domain address, it is also possible to return no address @@ -219,25 +268,47 @@ public String toString() { */ private static class DefaultRouter implements Router { - private final AtomicReference> endpointsRef = new AtomicReference<>(); + private final AtomicReference endpointsRef = new AtomicReference<>(); @Override public CompletableFuture routeFor(Void request) { - List endpoints = this.endpointsRef.get(); + Endpoints endpoints = this.endpointsRef.get(); + + if (endpoints == null) { + return Util.errorCf(new IllegalStateException("null `endpoints`")); + } + ThreadLocalRandom random = ThreadLocalRandom.current(); - int i = random.nextInt(0, endpoints.size()); - return Util.completedCf(endpoints.get(i)); + + if (!endpoints.activities.isEmpty()) { + int i = random.nextInt(0, endpoints.activities.size()); + return Util.completedCf(endpoints.activities.get(i)); + } + + if (!endpoints.inactivities.isEmpty()) { + int i = random.nextInt(0, endpoints.inactivities.size()); + Endpoint goodLuck = endpoints.inactivities.get(i); + LOG.warn("No active endpoint, return an inactive one: {}", goodLuck); + return Util.completedCf(goodLuck); + } + + return Util.errorCf(new IllegalStateException("empty `endpoints`")); } @Override - public CompletableFuture refresh() { - // always return true - return Util.completedCf(true); + public void onRefresh(List activities, List inactivities) { + LOG.info("Router cache refreshed, activities: {}, inactivities: {}", activities, inactivities); + this.endpointsRef.set(new Endpoints(activities, inactivities)); } + } - @Override - public void onRefresh(List endpoints) { - this.endpointsRef.set(endpoints); + static class Endpoints { + final List activities; + final List inactivities; + + Endpoints(List activities, List inactivities) { + this.activities = activities == null ? new ArrayList<>() : activities; + this.inactivities = inactivities == null ? new ArrayList<>() : inactivities; } } } diff --git a/ingester-protocol/src/main/java/io/greptime/RpcServiceRegister.java b/ingester-protocol/src/main/java/io/greptime/RpcServiceRegister.java index 6426578..052ead9 100644 --- a/ingester-protocol/src/main/java/io/greptime/RpcServiceRegister.java +++ b/ingester-protocol/src/main/java/io/greptime/RpcServiceRegister.java @@ -19,31 +19,46 @@ import io.greptime.rpc.MethodDescriptor; import io.greptime.rpc.RpcFactoryProvider; import io.greptime.v1.Database; +import io.greptime.v1.Health; /** * The RPC service register. */ public class RpcServiceRegister { - private static final String METHOD_TEMPLATE = "greptime.v1.GreptimeDatabase/%s"; + private static final String DATABASE_METHOD_TEMPLATE = "greptime.v1.GreptimeDatabase/%s"; + private static final String HEALTH_METHOD_TEMPLATE = "greptime.v1.HealthCheck/%s"; public static void registerAllService() { - // register protobuf serializer + // Handle + MethodDescriptor handleMethod = MethodDescriptor.of( + String.format(DATABASE_METHOD_TEMPLATE, "Handle"), MethodDescriptor.MethodType.UNARY, 1); RpcFactoryProvider.getRpcFactory() .register( - MethodDescriptor.of( - String.format(METHOD_TEMPLATE, "Handle"), MethodDescriptor.MethodType.UNARY, 1), + handleMethod, Database.GreptimeRequest.class, Database.GreptimeRequest.getDefaultInstance(), Database.GreptimeResponse.getDefaultInstance()); + // HandleRequests + MethodDescriptor handleRequestsMethod = MethodDescriptor.of( + String.format(DATABASE_METHOD_TEMPLATE, "HandleRequests"), + MethodDescriptor.MethodType.CLIENT_STREAMING); RpcFactoryProvider.getRpcFactory() .register( - MethodDescriptor.of( - String.format(METHOD_TEMPLATE, "HandleRequests"), - MethodDescriptor.MethodType.CLIENT_STREAMING), + handleRequestsMethod, Database.GreptimeRequest.class, Database.GreptimeRequest.getDefaultInstance(), Database.GreptimeResponse.getDefaultInstance()); + + // HealthCheck + MethodDescriptor healthCheckMethod = MethodDescriptor.of( + String.format(HEALTH_METHOD_TEMPLATE, "HealthCheck"), MethodDescriptor.MethodType.UNARY); + RpcFactoryProvider.getRpcFactory() + .register( + healthCheckMethod, + Health.HealthCheckRequest.class, + Health.HealthCheckRequest.getDefaultInstance(), + Health.HealthCheckResponse.getDefaultInstance()); } } diff --git a/ingester-protocol/src/main/java/io/greptime/Util.java b/ingester-protocol/src/main/java/io/greptime/Util.java index ce5a53d..8b6126d 100644 --- a/ingester-protocol/src/main/java/io/greptime/Util.java +++ b/ingester-protocol/src/main/java/io/greptime/Util.java @@ -138,6 +138,20 @@ public static CompletableFuture completedCf(U value) { return CompletableFuture.completedFuture(value); } + /** + * Returns a new CompletableFuture that is already exceptionally with the given + * error. + * + * @param t the given exception + * @param the type of the value + * @return the exceptionally {@link CompletableFuture} + */ + public static CompletableFuture errorCf(Throwable t) { + CompletableFuture err = new CompletableFuture<>(); + err.completeExceptionally(t); + return err; + } + public static Observer toObserver(CompletableFuture future) { return new Observer() { diff --git a/ingester-protocol/src/main/java/io/greptime/WriteClient.java b/ingester-protocol/src/main/java/io/greptime/WriteClient.java index eb23659..246a0aa 100644 --- a/ingester-protocol/src/main/java/io/greptime/WriteClient.java +++ b/ingester-protocol/src/main/java/io/greptime/WriteClient.java @@ -47,6 +47,7 @@ import io.greptime.v1.Common; import io.greptime.v1.Database; import java.util.Collection; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -56,7 +57,7 @@ /** * Default Write API impl. */ -public class WriteClient implements Write, Lifecycle, Display { +public class WriteClient implements Write, Health, Lifecycle, Display { private static final Logger LOG = LoggerFactory.getLogger(WriteClient.class); @@ -254,6 +255,11 @@ public void onCompleted() { }; } + @Override + public CompletableFuture> checkHealth() { + return this.routerClient.checkHealth(); + } + @Override public void display(Printer out) { out.println("--- WriteClient ---") diff --git a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java index f35d6b9..0d321ca 100644 --- a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java +++ b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java @@ -35,6 +35,11 @@ * GreptimeDB client options. */ public class GreptimeOptions implements Copiable { + public static final int DEFAULT_WRITE_MAX_RETRIES = 1; + public static final int DEFAULT_MAX_IN_FLIGHT_WRITE_POINTS = 10 * 65536; + public static final int DEFAULT_DEFAULT_STREAM_MAX_WRITE_POINTS_PER_SECOND = 10 * 65536; + public static final long DEFAULT_ROUTE_TABLE_REFRESH_PERIOD_SECONDS = 10 * 60; + private List endpoints; private RpcOptions rpcOptions; private RouterOptions routerOptions; @@ -145,14 +150,14 @@ public static final class Builder { private RpcOptions rpcOptions = RpcOptions.newDefault(); // GreptimeDB secure connection options private TlsOptions tlsOptions; - private int writeMaxRetries = 1; + private int writeMaxRetries = DEFAULT_WRITE_MAX_RETRIES; // Write flow limit: maximum number of data points in-flight. - private int maxInFlightWritePoints = 10 * 65536; + private int maxInFlightWritePoints = DEFAULT_MAX_IN_FLIGHT_WRITE_POINTS; private LimitedPolicy writeLimitedPolicy = LimitedPolicy.defaultWriteLimitedPolicy(); - private int defaultStreamMaxWritePointsPerSecond = 10 * 65536; + private int defaultStreamMaxWritePointsPerSecond = DEFAULT_DEFAULT_STREAM_MAX_WRITE_POINTS_PER_SECOND; // Refresh frequency of route tables. The background refreshes all route tables periodically. // If the value is less than or equal to 0, the route tables will not be refreshed. - private long routeTableRefreshPeriodSeconds = -1; + private long routeTableRefreshPeriodSeconds = DEFAULT_ROUTE_TABLE_REFRESH_PERIOD_SECONDS; // Authentication information private AuthInfo authInfo; // The request router diff --git a/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java index a5b10b8..fda491d 100644 --- a/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java +++ b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java @@ -88,19 +88,13 @@ public void testAllOptions() { private Router createTestRouter() { return new Router() { - @Override public CompletableFuture routeFor(Void request) { return null; } @Override - public CompletableFuture refresh() { - return null; - } - - @Override - public void onRefresh(List endpoints) {} + public void onRefresh(List activities, List inactivities) {} }; } }