diff --git a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java index 83bdae6..eebc2b1 100644 --- a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java +++ b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java @@ -53,7 +53,7 @@ /** * The GreptimeDB client. */ -public class GreptimeDB implements Write, WriteObject, Lifecycle, HealthCheck, Display { +public class GreptimeDB implements Write, WriteObject, Lifecycle, Health, Display { private static final Logger LOG = LoggerFactory.getLogger(GreptimeDB.class); @@ -178,7 +178,7 @@ public StreamWriter streamWriter(int maxPointsPerSecond, Context } @Override - public CompletableFuture is_alive() { + public CompletableFuture> checkHealth() { return null; } diff --git a/ingester-protocol/src/main/java/io/greptime/HealthCheck.java b/ingester-protocol/src/main/java/io/greptime/Health.java similarity index 85% rename from ingester-protocol/src/main/java/io/greptime/HealthCheck.java rename to ingester-protocol/src/main/java/io/greptime/Health.java index 04f48f4..ca7bd08 100644 --- a/ingester-protocol/src/main/java/io/greptime/HealthCheck.java +++ b/ingester-protocol/src/main/java/io/greptime/Health.java @@ -16,6 +16,8 @@ package io.greptime; +import io.greptime.common.Endpoint; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** @@ -24,6 +26,6 @@ * * @author jiachun.fjc */ -public interface HealthCheck { - CompletableFuture is_alive(); +public interface Health { + CompletableFuture> checkHealth(); } diff --git a/ingester-protocol/src/main/java/io/greptime/Router.java b/ingester-protocol/src/main/java/io/greptime/Router.java index 88b32af..6731148 100644 --- a/ingester-protocol/src/main/java/io/greptime/Router.java +++ b/ingester-protocol/src/main/java/io/greptime/Router.java @@ -33,17 +33,10 @@ public interface Router { CompletableFuture routeFor(R request); /** - * Refresh the routing table from remote server. - * @return a future that will be completed when the refresh is done - */ - CompletableFuture refresh(); - - /** - * Refresh the routing table. - * We need to get all the endpoints, and this method will overwrite all - * current endpoints. + * Refresh the routing table. By health checker or service discovery. * - * @param endpoints all new endpoints + * @param activities all activities endpoints + * @param inactivities all inactivities endpoints */ - void onRefresh(List endpoints); + void onRefresh(List activities, List inactivities); } diff --git a/ingester-protocol/src/main/java/io/greptime/RouterClient.java b/ingester-protocol/src/main/java/io/greptime/RouterClient.java index ed7756f..7863397 100644 --- a/ingester-protocol/src/main/java/io/greptime/RouterClient.java +++ b/ingester-protocol/src/main/java/io/greptime/RouterClient.java @@ -25,12 +25,17 @@ import io.greptime.rpc.Context; import io.greptime.rpc.Observer; import io.greptime.rpc.RpcClient; +import io.greptime.v1.Health.HealthCheckRequest; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +43,7 @@ * A route rpc client which cached the routing table information locally * and will auto refresh. */ -public class RouterClient implements Lifecycle, Display { +public class RouterClient implements Lifecycle, Health, Display { private static final Logger LOG = LoggerFactory.getLogger(RouterClient.class); @@ -57,19 +62,29 @@ public boolean init(RouterOptions opts) { List endpoints = Ensures.ensureNonNull(this.opts.getEndpoints(), "null `endpoints`"); this.router = new DefaultRouter(); - this.router.onRefresh(endpoints); + this.router.onRefresh(endpoints, null); long refreshPeriod = this.opts.getRefreshPeriodSeconds(); if (refreshPeriod > 0) { this.refresher = REFRESHER_POOL.getObject(); this.refresher.scheduleWithFixedDelay( - () -> this.router.refresh().whenComplete((r, e) -> { - if (e != null) { - LOG.error("Router cache refresh failed.", e); - } else { - LOG.debug("Router cache refresh {}.", r ? "success" : "failed"); + () -> { + try { + Map health = this.checkHealth().get(); + List activities = new ArrayList<>(); + List inactivities = new ArrayList<>(); + for (Map.Entry entry : health.entrySet()) { + if (entry.getValue()) { + activities.add(entry.getKey()); + } else { + inactivities.add(entry.getKey()); + } + } + this.router.onRefresh(activities, inactivities); + } catch (Throwable t) { + LOG.warn("Failed to check health", t); } - }), + }, Util.randomInitialDelay(180), refreshPeriod, TimeUnit.SECONDS); @@ -204,6 +219,22 @@ public String toString() { return "RouterClient{" + "refresher=" + refresher + ", opts=" + opts + ", rpcClient=" + rpcClient + '}'; } + @Override + public CompletableFuture> checkHealth() { + Map> futures = this.opts.getEndpoints().stream() + .collect(Collectors.toMap(Function.identity(), endpoint -> { + HealthCheckRequest req = HealthCheckRequest.newBuilder().build(); + return this.invoke(endpoint, req, Context.newDefault()) + .thenApply(resp -> true) + .exceptionally(t -> false); // Handle failure and return false + })); + + return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])) + .thenApply( + ok -> futures.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() + .join()))); + } + /** * Request to a `frontend` server, which needs to return all members(frontend server), * or it can return only one domain address, it is also possible to return no address @@ -219,25 +250,46 @@ public String toString() { */ private static class DefaultRouter implements Router { - private final AtomicReference> endpointsRef = new AtomicReference<>(); + private final AtomicReference endpointsRef = new AtomicReference<>(); @Override public CompletableFuture routeFor(Void request) { - List endpoints = this.endpointsRef.get(); + Endpoints endpoints = this.endpointsRef.get(); + + if (endpoints == null) { + return Util.errorCf(new IllegalStateException("null `endpoints`")); + } + ThreadLocalRandom random = ThreadLocalRandom.current(); - int i = random.nextInt(0, endpoints.size()); - return Util.completedCf(endpoints.get(i)); + + if (!endpoints.activities.isEmpty()) { + int i = random.nextInt(0, endpoints.activities.size()); + return Util.completedCf(endpoints.activities.get(i)); + } + + if (!endpoints.inactivities.isEmpty()) { + int i = random.nextInt(0, endpoints.inactivities.size()); + Endpoint goodLuck = endpoints.inactivities.get(i); + LOG.warn("No active endpoint, return an inactive one: {}", goodLuck); + return Util.completedCf(goodLuck); + } + + return Util.errorCf(new IllegalStateException("empty `endpoints`")); } @Override - public CompletableFuture refresh() { - // always return true - return Util.completedCf(true); + public void onRefresh(List activities, List inactivities) { + this.endpointsRef.set(new Endpoints(activities, inactivities)); } + } - @Override - public void onRefresh(List endpoints) { - this.endpointsRef.set(endpoints); + static class Endpoints { + final List activities; + final List inactivities; + + Endpoints(List activities, List inactivities) { + this.activities = activities == null ? new ArrayList<>() : activities; + this.inactivities = inactivities == null ? new ArrayList<>() : inactivities; } } } diff --git a/ingester-protocol/src/main/java/io/greptime/RpcServiceRegister.java b/ingester-protocol/src/main/java/io/greptime/RpcServiceRegister.java index f1c97e6..8fb4c57 100644 --- a/ingester-protocol/src/main/java/io/greptime/RpcServiceRegister.java +++ b/ingester-protocol/src/main/java/io/greptime/RpcServiceRegister.java @@ -31,19 +31,22 @@ public class RpcServiceRegister { public static void registerAllService() { // Handle - MethodDescriptor handleMethod = MethodDescriptor - .of(String.format(DATABASE_METHOD_TEMPLATE, "Handle"), MethodDescriptor.MethodType.UNARY, 1); + MethodDescriptor handleMethod = MethodDescriptor.of( + String.format(DATABASE_METHOD_TEMPLATE, "Handle"), MethodDescriptor.MethodType.UNARY, 1); RpcFactoryProvider.getRpcFactory() - .register(handleMethod, + .register( + handleMethod, Database.GreptimeRequest.class, Database.GreptimeRequest.getDefaultInstance(), Database.GreptimeResponse.getDefaultInstance()); // HandleRequests - MethodDescriptor handleRequestsMethod = MethodDescriptor - .of(String.format(DATABASE_METHOD_TEMPLATE, "HandleRequests"), MethodDescriptor.MethodType.CLIENT_STREAMING); + MethodDescriptor handleRequestsMethod = MethodDescriptor.of( + String.format(DATABASE_METHOD_TEMPLATE, "HandleRequests"), + MethodDescriptor.MethodType.CLIENT_STREAMING); RpcFactoryProvider.getRpcFactory() - .register(handleRequestsMethod, + .register( + handleRequestsMethod, Database.GreptimeRequest.class, Database.GreptimeRequest.getDefaultInstance(), Database.GreptimeResponse.getDefaultInstance()); @@ -52,7 +55,8 @@ public static void registerAllService() { MethodDescriptor healthCheckMethod = MethodDescriptor.of( String.format(HEALTH_METHOD_TEMPLATE, "HealthCheck"), MethodDescriptor.MethodType.UNARY); RpcFactoryProvider.getRpcFactory() - .register(healthCheckMethod, + .register( + healthCheckMethod, Health.HealthCheckRequest.class, Health.HealthCheckRequest.getDefaultInstance(), Health.HealthCheckResponse.getDefaultInstance()); diff --git a/ingester-protocol/src/main/java/io/greptime/Util.java b/ingester-protocol/src/main/java/io/greptime/Util.java index ce5a53d..8b6126d 100644 --- a/ingester-protocol/src/main/java/io/greptime/Util.java +++ b/ingester-protocol/src/main/java/io/greptime/Util.java @@ -138,6 +138,20 @@ public static CompletableFuture completedCf(U value) { return CompletableFuture.completedFuture(value); } + /** + * Returns a new CompletableFuture that is already exceptionally with the given + * error. + * + * @param t the given exception + * @param the type of the value + * @return the exceptionally {@link CompletableFuture} + */ + public static CompletableFuture errorCf(Throwable t) { + CompletableFuture err = new CompletableFuture<>(); + err.completeExceptionally(t); + return err; + } + public static Observer toObserver(CompletableFuture future) { return new Observer() { diff --git a/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java index a5b10b8..fda491d 100644 --- a/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java +++ b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java @@ -88,19 +88,13 @@ public void testAllOptions() { private Router createTestRouter() { return new Router() { - @Override public CompletableFuture routeFor(Void request) { return null; } @Override - public CompletableFuture refresh() { - return null; - } - - @Override - public void onRefresh(List endpoints) {} + public void onRefresh(List activities, List inactivities) {} }; } }