diff --git a/kyuubi-ha/pom.xml b/kyuubi-ha/pom.xml index e1b46c8589a..749ab063266 100644 --- a/kyuubi-ha/pom.xml +++ b/kyuubi-ha/pom.xml @@ -132,6 +132,13 @@ ${project.version} test + + + org.apache.kyuubi + kyuubi-hive-jdbc + ${project.version} + test + diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala index 34ed0559383..ba3d0650d19 100644 --- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala +++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala @@ -34,8 +34,9 @@ import org.apache.kyuubi.ha.HighAvailabilityConf._ import org.apache.kyuubi.ha.client._ import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient import org.apache.kyuubi.ha.client.zookeeper.ZookeeperClientProvider._ +import org.apache.kyuubi.jdbc.hive.strategy.{ServerSelectStrategy, ServerSelectStrategyFactory} import org.apache.kyuubi.service._ -import org.apache.kyuubi.shaded.curator.framework.CuratorFrameworkFactory +import org.apache.kyuubi.shaded.curator.framework.{CuratorFramework, CuratorFrameworkFactory} import org.apache.kyuubi.shaded.curator.retry.ExponentialBackoffRetry import org.apache.kyuubi.shaded.zookeeper.ZooDefs import org.apache.kyuubi.shaded.zookeeper.data.ACL @@ -227,4 +228,41 @@ abstract class ZookeeperDiscoveryClientSuite extends DiscoveryClientTests discovery.stop() } } + + test("server select strategy with zookeeper") { + val zkClient = CuratorFrameworkFactory.builder() + .connectString(getConnectString) + .sessionTimeoutMs(5000) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)) + .build + zkClient.start() + + val namespace = "kyuubi-strategy-test" + val testServerHosts = Seq( + "testNode1", + "testNode2", + "testNode3").asJava + // test polling strategy + val pollingStrategy = ServerSelectStrategyFactory.createStrategy("polling") + 1 to testServerHosts.size() * 2 foreach { _ => + assertResult(f"testNode1")(pollingStrategy.chooseServer(testServerHosts, zkClient, namespace)) + assertResult(f"testNode2")(pollingStrategy.chooseServer(testServerHosts, zkClient, namespace)) + assertResult(f"testNode3")(pollingStrategy.chooseServer(testServerHosts, zkClient, namespace)) + } + + // test only get first serverHost strategy + val customStrategy = new ServerSelectStrategy { + override def chooseServer( + serverHosts: util.List[String], + zkClient: CuratorFramework, + namespace: String): String = serverHosts.get(0) + } + 1 to testServerHosts.size() * 2 foreach { _ => + assertResult("testNode1") { + customStrategy.chooseServer(testServerHosts, zkClient, namespace) + } + } + + zkClient.close() + } } diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java index b3884c694fd..0db99da7100 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java @@ -79,6 +79,7 @@ public class JdbcConnectionParams { // Use ZooKeeper for indirection while using dynamic service discovery static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER = "zooKeeper"; static final String ZOOKEEPER_NAMESPACE = "zooKeeperNamespace"; + static final String SERVER_SELECT_STRATEGY = "serverSelectStrategy"; // Default namespace value on ZooKeeper. // This value is used if the param "zooKeeperNamespace" is not specified in the JDBC Uri. static final String ZOOKEEPER_DEFAULT_NAMESPACE = "hiveserver2"; diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ZooKeeperHiveClientHelper.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ZooKeeperHiveClientHelper.java index 948fd333463..f94bdb431e6 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ZooKeeperHiveClientHelper.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ZooKeeperHiveClientHelper.java @@ -22,9 +22,11 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.kyuubi.jdbc.hive.strategy.ServerSelectStrategy; +import org.apache.kyuubi.jdbc.hive.strategy.ServerSelectStrategyFactory; +import org.apache.kyuubi.jdbc.hive.strategy.zk.RandomSelectStrategy; import org.apache.kyuubi.shaded.curator.framework.CuratorFramework; import org.apache.kyuubi.shaded.curator.framework.CuratorFrameworkFactory; import org.apache.kyuubi.shaded.curator.retry.ExponentialBackoffRetry; @@ -111,7 +113,7 @@ static void configureConnParams(JdbcConnectionParams connParams) try (CuratorFramework zooKeeperClient = getZkClient(connParams)) { List serverHosts = getServerHosts(connParams, zooKeeperClient); // Now pick a server node randomly - String serverNode = serverHosts.get(ThreadLocalRandom.current().nextInt(serverHosts.size())); + String serverNode = chooseServer(connParams, serverHosts, zooKeeperClient); updateParamsWithZKServerNode(connParams, zooKeeperClient, serverNode); } catch (Exception e) { throw new ZooKeeperHiveClientException( @@ -120,6 +122,22 @@ static void configureConnParams(JdbcConnectionParams connParams) // Close the client connection with ZooKeeper } + private static String chooseServer( + JdbcConnectionParams connParams, List serverHosts, CuratorFramework zkClient) { + String zooKeeperNamespace = getZooKeeperNamespace(connParams); + String strategyName = + connParams + .getSessionVars() + .getOrDefault( + JdbcConnectionParams.SERVER_SELECT_STRATEGY, RandomSelectStrategy.strategyName); + try { + ServerSelectStrategy strategy = ServerSelectStrategyFactory.createStrategy(strategyName); + return strategy.chooseServer(serverHosts, zkClient, zooKeeperNamespace); + } catch (Exception e) { + throw new RuntimeException("Failed to choose server with strategy " + strategyName, e); + } + } + static List getDirectParamsList(JdbcConnectionParams connParams) throws ZooKeeperHiveClientException { try (CuratorFramework zooKeeperClient = getZkClient(connParams)) { diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/ServerSelectStrategy.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/ServerSelectStrategy.java new file mode 100644 index 00000000000..740c3577637 --- /dev/null +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/ServerSelectStrategy.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.jdbc.hive.strategy; + +import java.util.List; +import org.apache.kyuubi.shaded.curator.framework.CuratorFramework; + +public interface ServerSelectStrategy { + String chooseServer(List serverHosts, CuratorFramework zkClient, String namespace); +} diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/ServerSelectStrategyFactory.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/ServerSelectStrategyFactory.java new file mode 100644 index 00000000000..9950097adec --- /dev/null +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/ServerSelectStrategyFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.jdbc.hive.strategy; + +import java.lang.reflect.Constructor; +import org.apache.kyuubi.jdbc.hive.strategy.zk.PollingSelectStrategy; +import org.apache.kyuubi.jdbc.hive.strategy.zk.RandomSelectStrategy; + +public class ServerSelectStrategyFactory { + public static ServerSelectStrategy createStrategy(String strategyName) { + try { + switch (strategyName) { + case PollingSelectStrategy.strategyName: + return new PollingSelectStrategy(); + case RandomSelectStrategy.strategyName: + return new RandomSelectStrategy(); + default: + Class clazz = Class.forName(strategyName); + if (ServerSelectStrategy.class.isAssignableFrom(clazz)) { + Constructor constructor = + clazz.asSubclass(ServerSelectStrategy.class).getConstructor(); + return constructor.newInstance(); + } else { + throw new ClassNotFoundException( + "The loaded class does not implement ServerSelectStrategy"); + } + } + } catch (Exception e) { + throw new RuntimeException("Failed to init server select strategy", e); + } + } +} diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/zk/PollingSelectStrategy.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/zk/PollingSelectStrategy.java new file mode 100644 index 00000000000..664c76defa5 --- /dev/null +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/zk/PollingSelectStrategy.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.jdbc.hive.strategy.zk; + +import java.util.List; +import org.apache.kyuubi.jdbc.hive.strategy.ServerSelectStrategy; +import org.apache.kyuubi.shaded.curator.framework.CuratorFramework; +import org.apache.kyuubi.shaded.curator.framework.recipes.atomic.AtomicValue; +import org.apache.kyuubi.shaded.curator.framework.recipes.atomic.DistributedAtomicInteger; +import org.apache.kyuubi.shaded.curator.retry.RetryForever; + +public class PollingSelectStrategy implements ServerSelectStrategy { + public static final String strategyName = "polling"; + + private static final String COUNTER_PATH_PREFIX = "/"; + private static final String COUNTER_PATH_SUFFIX = "-counter"; + + @Override + public String chooseServer( + List serverHosts, CuratorFramework zkClient, String namespace) { + String counterPath = COUNTER_PATH_PREFIX + namespace + COUNTER_PATH_SUFFIX; + try { + return serverHosts.get(getAndIncrement(zkClient, counterPath) % serverHosts.size()); + } catch (Exception e) { + throw new RuntimeException("Failed to choose server by polling select strategy", e); + } + } + + private int getAndIncrement(CuratorFramework zkClient, String path) throws Exception { + DistributedAtomicInteger dai = + new DistributedAtomicInteger(zkClient, path, new RetryForever(3000)); + AtomicValue atomicVal; + do { + atomicVal = dai.add(1); + } while (atomicVal == null || !atomicVal.succeeded()); + return atomicVal.preValue(); + } +} diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/zk/RandomSelectStrategy.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/zk/RandomSelectStrategy.java new file mode 100644 index 00000000000..f42fd529481 --- /dev/null +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/zk/RandomSelectStrategy.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.jdbc.hive.strategy.zk; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.kyuubi.jdbc.hive.strategy.ServerSelectStrategy; +import org.apache.kyuubi.shaded.curator.framework.CuratorFramework; + +public class RandomSelectStrategy implements ServerSelectStrategy { + public static final String strategyName = "random"; + + @Override + public String chooseServer( + List serverHosts, CuratorFramework zkClient, String namespace) { + return serverHosts.get(ThreadLocalRandom.current().nextInt(serverHosts.size())); + } +}