Skip to content

Commit

Permalink
[Feature-3934] Add zookeeper registry for streampark (apache#4127)
Browse files Browse the repository at this point in the history
* zookeeper registry

* add registry interface comment

* handle InterruptedException

* refactor
  • Loading branch information
HxpSerein authored Oct 30, 2024
1 parent 3cb63e1 commit ef78fdd
Show file tree
Hide file tree
Showing 8 changed files with 317 additions and 7 deletions.
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
<hadoop.version>3.3.4</hadoop.version>
<hbase.version>2.1.10</hbase.version>
<redis.version>3.3.0</redis.version>
<zoopkeeper.version>3.6.3</zoopkeeper.version>
<es.version>6.2.3</es.version>
<influxdb.version>2.17</influxdb.version>
<protobuf.version>2.5.0</protobuf.version>
Expand Down Expand Up @@ -242,6 +243,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zoopkeeper.version}</version>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,13 @@ sso:
# Optional, change by authentication client
# Please replace and fill in your client config below when enabled SSO

registry:
# default using jdbc as registry
type: jdbc
heartbeat-refresh-interval: 1s
session-timeout: 3s
high-availability:
enable: false # true
# The list of ZooKeeper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
#
zookeeper.quorum: 192.168.100.128:2181,192.168.100.129:2181

network:
# network interface preferred like eth0, default: empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,4 @@ public static void main(String[] args) throws Exception {
.sources(StreamParkConsoleBootstrap.class)
.run(args);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.SystemPropertyUtils;
import org.apache.streampark.console.base.util.SpringContextUtils;
import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.entity.SparkEnv;
import org.apache.streampark.console.core.service.RegistryService;
import org.apache.streampark.console.core.service.SettingService;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -81,6 +83,9 @@ public void run(ApplicationArguments args) throws Exception {
// init InternalConfig
initConfig();

// init RegistryService
initRegistryService();

boolean isTest = Arrays.asList(context.getEnvironment().getActiveProfiles()).contains("test");
if (!isTest) {
// initialize local file system resources
Expand Down Expand Up @@ -110,6 +115,18 @@ private void initConfig() {
overrideSystemProp(ConfigKeys.KEY_HADOOP_USER_NAME(), hadoopUserName);
}

private void initRegistryService() {
boolean enable = SystemPropertyUtils.get("high-availability.enable", "false").equals("true");
if (enable) {
RegistryService registryService = SpringContextUtils.getBean(RegistryService.class);
registryService.registry();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
registryService.unRegister();
log.info("RegistryService unRegister success");
}));
}
}

private void overrideSystemProp(String key, String defaultValue) {
String value = context.getEnvironment().getProperty(key, defaultValue);
log.info("initialize system properties: key:{}, value:{}", key, value);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console.core.service;

public interface RegistryService {

/**
* Registry the service.
*/
void registry();

/**
* Close the registry service.
*/
void unRegister();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console.core.service.impl;

import org.apache.streampark.common.util.SystemPropertyUtils;
import org.apache.streampark.console.core.service.DistributedTaskService;
import org.apache.streampark.console.core.service.RegistryService;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.net.InetAddress;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.apache.zookeeper.ZooDefs.Ids.OPEN_ACL_UNSAFE;

@Slf4j
@Service
public class RegistryServiceImpl implements RegistryService {

private static final String REGISTRY_PATH = "/services";
private static final int HEARTBEAT_INTERVAL = 10000;
private static final int HEARTBEAT_TIMEOUT = 60000;

private String zkAddress;
private ZooKeeper zk;
private String nodePath;
private Watcher watcher = event -> {
if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged
&& event.getPath().equals(REGISTRY_PATH)) {
handleNodeChanges();
}
};

@Getter
private Set<String> currentNodes = new HashSet<>();

@Autowired
private DistributedTaskService distributedTaskService;

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

public void registry() {
try {
zkAddress = SystemPropertyUtils.get("high-availability.zookeeper.quorum", "localhost:2181");
zk = new ZooKeeper(zkAddress, HEARTBEAT_TIMEOUT, watcher);

if (zk.exists(REGISTRY_PATH, false) == null) {
zk.create(REGISTRY_PATH, new byte[0], OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

String ip = InetAddress.getLocalHost().getHostAddress();
String port = SystemPropertyUtils.get("server.port", "10000");
String server_id = ip + ":" + port;
nodePath = zk.create(REGISTRY_PATH + "/" + server_id, new byte[0],
OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

currentNodes.add(nodePath);

doRegister();
} catch (Exception e) {
log.error("Failed to init ZooKeeper client", e);
}
}

public void doRegister() {
try {
distributedTaskService.init(currentNodes, nodePath);
startHeartbeat();
startHeartbeatChecker();
handleNodeChanges();
log.info("ZooKeeper client started: {}", nodePath);
} catch (Exception e) {
log.error("Failed to start ZooKeeper client", e);
}
}

private void startHeartbeat() {
scheduler.scheduleAtFixedRate(() -> {
try {
zk.setData(nodePath, new byte[0], -1);
log.info("Heartbeat updated for node: {}", nodePath);
} catch (KeeperException e) {
log.info("Zookeeper session expired, attempting to reconnect...");
reconnectAndRegister();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Failed to update heartbeat for node: {}", nodePath, e);
}
}, 0, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
}

private void startHeartbeatChecker() {
scheduler.scheduleAtFixedRate(() -> {
try {
long now = System.currentTimeMillis();
List<String> servers = zk.getChildren(REGISTRY_PATH, false);
for (String server : servers) {
String serverPath = REGISTRY_PATH + "/" + server;
Stat stat = zk.exists(serverPath, false);
if (stat != null && (now - stat.getMtime() > HEARTBEAT_TIMEOUT)) {
zk.delete(serverPath, -1);
log.info("Deleted stale node: {}", serverPath);
}
}
} catch (KeeperException e) {
log.info("Zookeeper session expired, attempting to reconnect...");
reconnectAndRegister();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Failed to check heartbeat", e);
}
}, HEARTBEAT_TIMEOUT, HEARTBEAT_TIMEOUT, TimeUnit.MILLISECONDS);
}

private synchronized void handleNodeChanges() {
try {
List<String> nodes = zk.getChildren(REGISTRY_PATH, true);
Set<String> newNodes = new HashSet<>(nodes);

for (String node : newNodes) {
if (!currentNodes.contains(node)) {
log.info("Node added: {}", node);
distributedTaskService.addServer(node);
}
}

for (String node : currentNodes) {
if (!newNodes.contains(node)) {
log.info("Node removed: {}", node);
distributedTaskService.removeServer(node);
}
}

currentNodes = newNodes;
log.info("Online servers: {}", currentNodes);
} catch (KeeperException e) {
log.info("Zookeeper session expired, attempting to reconnect...");
reconnectAndRegister();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Failed to handle node changes", e);
}
}

private void reconnectAndRegister() {
int retries = 5;
while (retries > 0) {
try {
zk.close();
zk = new ZooKeeper(zkAddress, HEARTBEAT_TIMEOUT, watcher);
zk.create(nodePath, new byte[0], OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
return;
} catch (Exception e) {
retries--;
log.warn("Retrying connection, attempts left: {}", retries, e);
try {
Thread.sleep(3000);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
}
log.error("Failed to reconnect and register node after multiple attempts.");
}

@Override
public void unRegister() {
try {
zk.close();
scheduler.shutdown();
log.info("ZooKeeper client closed: {}", nodePath);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Failed to close ZooKeeper client", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console.core.service;

import org.apache.streampark.common.util.SystemPropertyUtils;
import org.apache.streampark.console.core.service.impl.RegistryServiceImpl;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class RegistryServiceTest {

private final RegistryServiceImpl registryService = new RegistryServiceImpl();

@Test
public void testRegister() {
if (enableHA()) {
try {
registryService.registry();
} catch (Exception e) {
Assertions.assertEquals(1, registryService.getCurrentNodes().size());
registryService.unRegister();
}
}
}

public boolean enableHA() {
return SystemPropertyUtils.get("high-availability.enable", "false").equals("true");
}

}
4 changes: 3 additions & 1 deletion tools/dependencies/known-dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -377,5 +377,7 @@ xml-apis-1.4.01.jar
xnio-api-3.8.7.Final.jar
xnio-nio-3.8.7.Final.jar
xz-1.5.jar
zookeeper-3.4.14.jar
zookeeper-3.6.3.jar
icu4j-67.1.jar
zookeeper-jute-3.6.3.jar
netty-transport-native-epoll-4.1.91.Final.jar

0 comments on commit ef78fdd

Please sign in to comment.