From 1508b94116a95f9417abf31670e10626d5e938f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rain=E4=B8=A8xiyang?= Date: Mon, 30 Jan 2023 15:20:20 +0800 Subject: [PATCH 1/2] maxwellHA on zookeeper --- bin/maxwell-leaders | 22 ++++ docs/docs/high_availability.md | 28 +++++ pom.xml | 21 ++++ .../java/com/zendesk/maxwell/Maxwell.java | 16 ++- .../com/zendesk/maxwell/MaxwellConfig.java | 52 +++++++- .../java/com/zendesk/maxwell/MaxwellHA.java | 79 +++++++++++- .../zendesk/maxwell/util/CuratorUtils.java | 118 ++++++++++++++++++ .../zendesk/maxwell/util/MaxwellLeaders.java | 57 +++++++++ 8 files changed, 382 insertions(+), 11 deletions(-) create mode 100644 bin/maxwell-leaders create mode 100644 src/main/java/com/zendesk/maxwell/util/CuratorUtils.java create mode 100644 src/main/java/com/zendesk/maxwell/util/MaxwellLeaders.java diff --git a/bin/maxwell-leaders b/bin/maxwell-leaders new file mode 100644 index 000000000..dd8b0822c --- /dev/null +++ b/bin/maxwell-leaders @@ -0,0 +1,22 @@ +#!/bin/bash +set -e + +base_dir="$(dirname "$0")/.." +lib_dir="$base_dir/lib" +lib_dir_development="$base_dir/target/lib" + +if [ ! -e "$lib_dir" -a -e "$lib_dir_development" ]; then + lib_dir="$lib_dir_development" + CLASSPATH="$CLASSPATH:$base_dir/target/classes" +fi + +CLASSPATH="$CLASSPATH:$lib_dir/*" + +if [ -z "$JAVA_HOME" ]; then + JAVA="java" +else + JAVA="$JAVA_HOME/bin/java" +fi + +export LANG="en_US.UTF-8" +exec $JAVA -Dlog4j.shutdownCallbackRegistry=com.djdch.log4j.StaticShutdownCallbackRegistry -cp $CLASSPATH com.zendesk.maxwell.util.MaxwellLeaders "$@" \ No newline at end of file diff --git a/docs/docs/high_availability.md b/docs/docs/high_availability.md index 427cec8f0..8132d2deb 100644 --- a/docs/docs/high_availability.md +++ b/docs/docs/high_availability.md @@ -53,8 +53,36 @@ which can be worked around by forcing the JVM onto an ipv4 stack: JAVA_OPTS="-Djava.net.preferIPv4Stack=true" bin/maxwell --ha --raft_member_id=B ``` +# High Availabilty on Zookeeper +High availability through zookeeper +## Getting started +Prepare two or more servers to serve as the maxwell host server and a zookeeper cluster. (The maxwell host server and a zookeeper cluster can communicate.) + +Example Running Scripts: +``` + bin/maxwell --log_level='INFO' --user='' --password='' --host='' --producer=stdout --client_id='' --ha='zookeeper' --zookeeper_server =',,' +``` +Run the preceding command on each maxwell host. + +Get which host is the leader script Example: +``` + bin/maxwell-leaders --ha='zookeeper' --zookeeper_server =',,' --client_id='' +``` +You can get: +``` + [INFO] MaxwellLeaders: clientID::leaders now are -> +``` + +## Getting deeper +If a timeout error occurs between the maxwell host and the zookeeper cluster or the connection is abnormal due to network instability, you can set the following parameters: +``` +--zookeeper_session_timeout_ms= +--zookeeper_connection_timeout_ms= +--zookeeper_max_retries= +--zookeeper_retry_wait_ms= +``` diff --git a/pom.xml b/pom.xml index 6d7959b0e..d4f4da924 100644 --- a/pom.xml +++ b/pom.xml @@ -387,6 +387,27 @@ 1.18.0 test + + + org.apache.curator + curator-recipes + 2.12.0 + + + org.apache.curator + curator-client + 2.12.0 + + + org.apache.curator + curator-framework + 2.12.0 + + + org.apache.zookeeper + zookeeper + 3.4.13 + diff --git a/src/main/java/com/zendesk/maxwell/Maxwell.java b/src/main/java/com/zendesk/maxwell/Maxwell.java index 04b0741d4..9d181c8eb 100644 --- a/src/main/java/com/zendesk/maxwell/Maxwell.java +++ b/src/main/java/com/zendesk/maxwell/Maxwell.java @@ -13,6 +13,7 @@ import com.zendesk.maxwell.schema.*; import com.zendesk.maxwell.schema.columndef.ColumnDefCastException; import com.zendesk.maxwell.util.Logging; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -331,9 +332,18 @@ public void run() { LOGGER.info("Starting Maxwell. maxMemory: " + Runtime.getRuntime().maxMemory() + " bufferMemoryUsage: " + config.bufferMemoryUsage); - if ( config.haMode ) { - new MaxwellHA(maxwell, config.jgroupsConf, config.raftMemberID, config.clientID).startHA(); - } else { + if ( null != config.haMode){ + if ( "jgroups-raft".equals(config.haMode)){ + new MaxwellHA(maxwell, config.jgroupsConf, config.raftMemberID, config.clientID).startHAJGroups(); + } else if ( "zookeeper".equals(config.haMode)){ + if( StringUtils.isBlank(config.zookeeperServer)){ + throw new Exception("In high availability mode 'zookeeperServer' does not allow Null. --zookeeper_server = " + config.zookeeperServer); + } + new MaxwellHA(maxwell, config.zookeeperServer, config.zookeeperSessionTimeoutMs, config.zookeeperConnectionTimeoutMs, config.zookeeperMaxRetries, config.zookeeperRetryWaitMs, config.clientID).startHAZookeeper(); + } else { + throw new Exception("The value of ha is not in (jgroups-raft,zookeeper). ha = " + config.haMode); + } + } else{ maxwell.start(); } } catch ( SQLException e ) { diff --git a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java index 1cac4be78..13248649f 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java @@ -609,9 +609,9 @@ public class MaxwellConfig extends AbstractConfig { public Scripting scripting; /** - * Enable high available support (via jgroups-raft) + * Enable high available support (via jgroups-raft or zookeeper) */ - public boolean haMode; + public String haMode; /** * Path to raft.xml file that configures high availability support @@ -629,6 +629,32 @@ public class MaxwellConfig extends AbstractConfig { */ public int binlogEventQueueSize; + /** + * HA zookeeper address + */ + public String zookeeperServer; + + /** + * session time + */ + public int zookeeperSessionTimeoutMs; + + /** + * connection time + */ + public int zookeeperConnectionTimeoutMs; + + /** + * maxRetries + */ + public int zookeeperMaxRetries; + + /** + * retryWaitMs + */ + public int zookeeperRetryWaitMs; + + /** * Build a default configuration object. */ @@ -741,12 +767,22 @@ protected MaxwellOptionParser buildOptionParser() { .withRequiredArg(); parser.separator(); - parser.accepts( "ha", "enable high-availability mode via jgroups-raft" ) - .withOptionalArg().ofType(Boolean.class); + parser.accepts( "ha", "enable high-availability mode via jgroups-raft or zookeeper" ) + .withOptionalArg(); parser.accepts( "jgroups_config", "location of jgroups xml configuration file" ) .withRequiredArg(); parser.accepts( "raft_member_id", "raft memberID. (may also be specified in raft.xml)" ) .withRequiredArg(); + parser.accepts("zookeeper_server","enable maxwell High Availability using zookeeper") + .withRequiredArg(); + parser.accepts("zookeeper_session_timeout_ms","session timeout duration (maxwellHA on zk)") + .withRequiredArg().ofType(Integer.class); + parser.accepts("zookeeper_connection_timeout_ms","connection timeout duration (maxwellHA on zk)") + .withRequiredArg().ofType(Integer.class); + parser.accepts("zookeeper_max_retries","maximum retry (maxwellHA on zk)") + .withRequiredArg().ofType(Integer.class); + parser.accepts("zookeeper_retry_wait_ms","initial retry wait time (maxwellHA on zk)") + .withRequiredArg().ofType(Integer.class); parser.separator(); @@ -1206,11 +1242,17 @@ private void setup(OptionSet options, Properties properties) { setupEncryptionOptions(options, properties); - this.haMode = fetchBooleanOption("ha", options, properties, false); + this.haMode = fetchStringOption("ha", options, properties, null); this.jgroupsConf = fetchStringOption("jgroups_config", options, properties, "raft.xml"); this.raftMemberID = fetchStringOption("raft_member_id", options, properties, null); this.replicationReconnectionRetries = fetchIntegerOption("replication_reconnection_retries", options, properties, 1); + this.zookeeperServer = fetchStringOption("zookeeper_server", options, properties, null); + this.zookeeperSessionTimeoutMs = fetchIntegerOption("zookeeper_session_timeout_ms", options, properties, 6000); + this.zookeeperConnectionTimeoutMs = fetchIntegerOption("zookeeper_connection_timeout_ms", options, properties, 6000); + this.zookeeperMaxRetries = fetchIntegerOption("zookeeper_max_retries", options, properties, 3); + this.zookeeperRetryWaitMs = fetchIntegerOption("zookeeper_retry_wait_ms", options, properties, 1000); + this.binlogEventQueueSize = fetchIntegerOption("binlog_event_queue_size", options, properties, BinlogConnectorReplicator.BINLOG_QUEUE_SIZE); } diff --git a/src/main/java/com/zendesk/maxwell/MaxwellHA.java b/src/main/java/com/zendesk/maxwell/MaxwellHA.java index e9caf138c..78c2788bf 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellHA.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellHA.java @@ -1,5 +1,9 @@ package com.zendesk.maxwell; +import com.zendesk.maxwell.util.CuratorUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; import org.jgroups.JChannel; import org.jgroups.protocols.raft.Role; import org.jgroups.raft.RaftHandle; @@ -9,13 +13,15 @@ import java.util.concurrent.atomic.AtomicBoolean; /** - * Class that joins a jgroups-raft cluster of servers + * Class that joins a jgroups-raft cluster of servers or zookeeper */ public class MaxwellHA { static final Logger LOGGER = LoggerFactory.getLogger(MaxwellHA.class); private final Maxwell maxwell; - private final String jgroupsConf, raftMemberID, clientID; + private String jgroupsConf, raftMemberID, clientID; + private String zookeeperServer; + private int sessionTimeoutMs, connectionTimeoutMs, maxRetries, baseSleepTimeMs; private boolean hasRun = false; private AtomicBoolean isRaftLeader = new AtomicBoolean(false); @@ -33,6 +39,26 @@ public MaxwellHA(Maxwell maxwell, String jgroupsConf, String raftMemberID, Strin this.clientID = clientID; } + /** + * Build a MaxwellHA object + * @param maxwell The Maxwell instance that will be run when an election is won + * @param zookeeperServer zookeeper adds + * @param sessionTimeoutMs + * @param connectionTimeoutMs + * @param maxRetries + * @param baseSleepTimeMs + * @param clientID The maxwell clientID. This will be the only one through which the actual path is stored + */ + public MaxwellHA(Maxwell maxwell, String zookeeperServer, int sessionTimeoutMs, int connectionTimeoutMs, int maxRetries, int baseSleepTimeMs, String clientID) { + this.maxwell = maxwell; + this.zookeeperServer = zookeeperServer; + this.sessionTimeoutMs = sessionTimeoutMs; + this.connectionTimeoutMs = connectionTimeoutMs; + this.maxRetries = maxRetries; + this.baseSleepTimeMs = baseSleepTimeMs; + this.clientID = clientID; + } + private void run() { try { if (hasRun) @@ -53,7 +79,7 @@ private void run() { * Does not return. * @throws Exception if there's any issues */ - public void startHA() throws Exception { + public void startHAJGroups() throws Exception { JChannel ch=new JChannel(jgroupsConf); RaftHandle handle=new RaftHandle(ch, null); if ( raftMemberID != null ) @@ -83,4 +109,51 @@ public void startHA() throws Exception { Thread.sleep(Long.MAX_VALUE); } + + /** + * indicates that Ha is started in zookeeper mode + * @throws Exception + */ + public void startHAZookeeper() throws Exception { + String electPath = "/" + clientID + "/services"; + String masterPath = "/" + clientID + "/leader"; + CuratorUtils cu = new CuratorUtils(); + cu.setZookeeperServer(zookeeperServer); + cu.setSessionTimeoutMs(sessionTimeoutMs); + cu.setConnectionTimeoutMs(connectionTimeoutMs); + cu.setMaxRetries(maxRetries); + cu.setBaseSleepTimeMs(baseSleepTimeMs); + cu.setClientId(clientID); + cu.setElectPath(electPath); + cu.setMasterPath(masterPath); + cu.init(); + CuratorFramework client = cu.getClient(); + LeaderLatch leader = new LeaderLatch(client, cu.getElectPath()); + leader.start(); + LOGGER.info("this node is participating in the election of the leader ...."); + leader.addListener(new LeaderLatchListener() { + @Override + public void isLeader() { + try { + cu.register(); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.error("The node registration is abnormal, check whether the maxwell host communicates properly with the zookeeper network"); + cu.stop(); + System.exit(1); + } + LOGGER.info("node is current leader, starting Maxwell...."); + run(); + cu.stop(); + } + + @Override + public void notLeader() { + //LeaderLatch.CloseMode.SILENT mode will not invoke this method + } + }); + + Thread.sleep(Long.MAX_VALUE); + } + } diff --git a/src/main/java/com/zendesk/maxwell/util/CuratorUtils.java b/src/main/java/com/zendesk/maxwell/util/CuratorUtils.java new file mode 100644 index 000000000..7f6511775 --- /dev/null +++ b/src/main/java/com/zendesk/maxwell/util/CuratorUtils.java @@ -0,0 +1,118 @@ +package com.zendesk.maxwell.util; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.zookeeper.CreateMode; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.List; + +public class CuratorUtils { + + private CuratorFramework client; + private String zookeeperServer; + private int sessionTimeoutMs; + private int connectionTimeoutMs; + private int baseSleepTimeMs; + private int maxRetries; + private String namespace = "maxwellHA"; + private String clientId; + private String electPath; + private String masterPath; + + public void setZookeeperServer(String zookeeperServer) { + this.zookeeperServer = zookeeperServer; + } + + public String getZookeeperServer() { + return zookeeperServer; + } + + public void setSessionTimeoutMs(int sessionTimeoutMs) { + this.sessionTimeoutMs = sessionTimeoutMs; + } + + public int getSessionTimeoutMs() { + return sessionTimeoutMs; + } + + public void setConnectionTimeoutMs(int connectionTimeoutMs) { + this.connectionTimeoutMs = connectionTimeoutMs; + } + + public int getConnectionTimeoutMs() { + return connectionTimeoutMs; + } + + public void setBaseSleepTimeMs(int baseSleepTimeMs) { + this.baseSleepTimeMs = baseSleepTimeMs; + } + + public int getBaseSleepTimeMs() { + return baseSleepTimeMs; + } + + public void setMaxRetries(int maxRetries) { + this.maxRetries = maxRetries; + } + + public int getMaxRetries() { + return maxRetries; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public String getElectPath() { + return electPath; + } + + public void setElectPath(String electPath) { + this.electPath = electPath; + } + + public String getMasterPath() { + return masterPath; + } + + public void setMasterPath(String masterPath) { + this.masterPath = masterPath; + } + + public void init() { + RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); + client = CuratorFrameworkFactory.builder().connectString(zookeeperServer).retryPolicy(retryPolicy) + .sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs) + .namespace(namespace) + .build(); + client.start(); + } + + public void stop() { + client.close(); + } + + public CuratorFramework getClient() { + return client; + } + + public void register() throws Exception { + String rootPath = masterPath; + String hostAddress = InetAddress.getLocalHost().getHostAddress(); + client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(rootPath + "/" + hostAddress); + } + + public List getChildren(String path) throws Exception { + List childrenList = new ArrayList<>(); + childrenList = client.getChildren().forPath(path); + return childrenList; + } + + public List getInstances() throws Exception { + return getChildren(masterPath); + } +} diff --git a/src/main/java/com/zendesk/maxwell/util/MaxwellLeaders.java b/src/main/java/com/zendesk/maxwell/util/MaxwellLeaders.java new file mode 100644 index 000000000..60d3c1ff3 --- /dev/null +++ b/src/main/java/com/zendesk/maxwell/util/MaxwellLeaders.java @@ -0,0 +1,57 @@ +package com.zendesk.maxwell.util; + +import com.zendesk.maxwell.MaxwellConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.List; + +/** + * Get maxwell highly available leaders (maxwell on zk) + */ +public class MaxwellLeaders { + + static final Logger LOGGER = LoggerFactory.getLogger(MaxwellLeaders.class); + + public static void main(String[] args) { + + Logging.setupLogBridging(); + MaxwellConfig config = new MaxwellConfig(args); + + if ( config.log_level != null ) { + Logging.setLevel(config.log_level); + }else { + Logging.setLevel("INFO"); + } + + if( "zookeeper".equals(config.haMode)){ + CuratorUtils cu = new CuratorUtils(); + cu.setZookeeperServer(config.zookeeperServer); + cu.setSessionTimeoutMs(config.zookeeperSessionTimeoutMs); + cu.setConnectionTimeoutMs(config.zookeeperConnectionTimeoutMs); + cu.setMaxRetries(config.zookeeperMaxRetries); + cu.setBaseSleepTimeMs(config.zookeeperRetryWaitMs); + cu.setClientId(config.clientID); + String electPath = "/" + config.clientID + "/services"; + String masterPath = "/" + config.clientID + "/leader"; + cu.setElectPath(electPath); + cu.setMasterPath(masterPath); + cu.init(); + List instances = null; + try { + instances = cu.getInstances(); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.error("The path does not exist or is empty. Please check whether the clientID is correct. clientID = " + config.clientID); + System.exit(1); + } + + if(0 == instances.size()){ + LOGGER.info("Maxwell is not a high availability mode Or maxwell is not started"); + }else { + LOGGER.info("clientID:"+config.clientID + ":leaders now are -> {}",instances.get(0)); + } + }else { + LOGGER.error("make sure ha = 'zookeeper'. ha = " + config.haMode); + } + } +} From 7b2fa72801a3d174d88260f25c61f2c324f2b1bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rain=E4=B8=A8xiyang?= Date: Fri, 3 Feb 2023 14:10:36 +0800 Subject: [PATCH 2/2] Added callback display after leader exits election --- .../java/com/zendesk/maxwell/MaxwellHA.java | 37 +++++++++++++++++-- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/MaxwellHA.java b/src/main/java/com/zendesk/maxwell/MaxwellHA.java index 78c2788bf..9c585e1f4 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellHA.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellHA.java @@ -5,12 +5,17 @@ import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatchListener; import org.jgroups.JChannel; +import org.jgroups.protocols.raft.Log; import org.jgroups.protocols.raft.Role; import org.jgroups.raft.RaftHandle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.InetAddress; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * Class that joins a jgroups-raft cluster of servers or zookeeper @@ -115,6 +120,10 @@ public void startHAJGroups() throws Exception { * @throws Exception */ public void startHAZookeeper() throws Exception { + + Lock lock = new ReentrantLock(); + String hostAddress = InetAddress.getLocalHost().getHostAddress(); + String electPath = "/" + clientID + "/services"; String masterPath = "/" + clientID + "/leader"; CuratorUtils cu = new CuratorUtils(); @@ -128,28 +137,48 @@ public void startHAZookeeper() throws Exception { cu.setMasterPath(masterPath); cu.init(); CuratorFramework client = cu.getClient(); - LeaderLatch leader = new LeaderLatch(client, cu.getElectPath()); + LeaderLatch leader = new LeaderLatch(client, cu.getElectPath(),hostAddress,LeaderLatch.CloseMode.NOTIFY_LEADER); leader.start(); - LOGGER.info("this node is participating in the election of the leader ...."); + LOGGER.info("this node:" + hostAddress + " is participating in the election of the leader ...."); leader.addListener(new LeaderLatchListener() { @Override public void isLeader() { try { + lock.lock(); cu.register(); } catch (Exception e) { e.printStackTrace(); LOGGER.error("The node registration is abnormal, check whether the maxwell host communicates properly with the zookeeper network"); cu.stop(); System.exit(1); + }finally { + lock.unlock(); } - LOGGER.info("node is current leader, starting Maxwell...."); + LOGGER.info("node:" + hostAddress + " is current leader, starting Maxwell...."); + LOGGER.info("hasLeadership = " + leader.hasLeadership()); + run(); + + try { + leader.close(); + } catch (IOException e) { + e.printStackTrace(); + } cu.stop(); } @Override public void notLeader() { - //LeaderLatch.CloseMode.SILENT mode will not invoke this method + try { + lock.lock(); + LOGGER.warn("node:" + hostAddress + " lost leader"); + LOGGER.warn("master-slave switchover......"); + LOGGER.warn("The leadership went from " + hostAddress + " to " + leader.getLeader()); + }catch (Exception e){ + e.printStackTrace(); + }finally { + lock.unlock(); + } } });