Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

maxwellHA on zookeeper #1948

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions bin/maxwell-leaders
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/bash
set -e

base_dir="$(dirname "$0")/.."
lib_dir="$base_dir/lib"
lib_dir_development="$base_dir/target/lib"

if [ ! -e "$lib_dir" -a -e "$lib_dir_development" ]; then
lib_dir="$lib_dir_development"
CLASSPATH="$CLASSPATH:$base_dir/target/classes"
fi

CLASSPATH="$CLASSPATH:$lib_dir/*"

if [ -z "$JAVA_HOME" ]; then
JAVA="java"
else
JAVA="$JAVA_HOME/bin/java"
fi

export LANG="en_US.UTF-8"
exec $JAVA -Dlog4j.shutdownCallbackRegistry=com.djdch.log4j.StaticShutdownCallbackRegistry -cp $CLASSPATH com.zendesk.maxwell.util.MaxwellLeaders "$@"
28 changes: 28 additions & 0 deletions docs/docs/high_availability.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,36 @@ which can be worked around by forcing the JVM onto an ipv4 stack:
JAVA_OPTS="-Djava.net.preferIPv4Stack=true" bin/maxwell --ha --raft_member_id=B
```

# High Availabilty on Zookeeper

High availability through zookeeper

## Getting started
Prepare two or more servers to serve as the maxwell host server and a zookeeper cluster. (The maxwell host server and a zookeeper cluster can communicate.)

Example Running Scripts:

```
bin/maxwell --log_level='INFO' --user='<user>' --password='<passwd>' --host='<host>' --producer=stdout --client_id='<client_id>' --ha='zookeeper' --zookeeper_server ='<host1:port>,<host2:port>,<host3:port>'
```

Run the preceding command on each maxwell host.

Get which host is the leader script Example:
```
bin/maxwell-leaders --ha='zookeeper' --zookeeper_server ='<host1:port>,<host2:port>,<host3:port>' --client_id='<client_id>'
```
You can get:
```
[INFO] MaxwellLeaders: clientID:<clientID>:leaders now are -> <leader host>
```

## Getting deeper
If a timeout error occurs between the maxwell host and the zookeeper cluster or the connection is abnormal due to network instability, you can set the following parameters:
```
--zookeeper_session_timeout_ms=<session timeout duration>
--zookeeper_connection_timeout_ms=<internal default wait time for the client to establish a connection with the zk>
--zookeeper_max_retries=<number of retries>
--zookeeper_retry_wait_ms=<retry time interval>
```

21 changes: 21 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,27 @@
<version>1.18.0</version>
<scope>test</scope>
</dependency>
<!--zookeeper as ha-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.13</version>
</dependency>

</dependencies>

Expand Down
16 changes: 13 additions & 3 deletions src/main/java/com/zendesk/maxwell/Maxwell.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.zendesk.maxwell.schema.*;
import com.zendesk.maxwell.schema.columndef.ColumnDefCastException;
import com.zendesk.maxwell.util.Logging;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -331,9 +332,18 @@ public void run() {

LOGGER.info("Starting Maxwell. maxMemory: " + Runtime.getRuntime().maxMemory() + " bufferMemoryUsage: " + config.bufferMemoryUsage);

if ( config.haMode ) {
new MaxwellHA(maxwell, config.jgroupsConf, config.raftMemberID, config.clientID).startHA();
} else {
if ( null != config.haMode){
if ( "jgroups-raft".equals(config.haMode)){
new MaxwellHA(maxwell, config.jgroupsConf, config.raftMemberID, config.clientID).startHAJGroups();
} else if ( "zookeeper".equals(config.haMode)){
if( StringUtils.isBlank(config.zookeeperServer)){
throw new Exception("In high availability mode 'zookeeperServer' does not allow Null. --zookeeper_server = " + config.zookeeperServer);
}
new MaxwellHA(maxwell, config.zookeeperServer, config.zookeeperSessionTimeoutMs, config.zookeeperConnectionTimeoutMs, config.zookeeperMaxRetries, config.zookeeperRetryWaitMs, config.clientID).startHAZookeeper();
} else {
throw new Exception("The value of ha is not in (jgroups-raft,zookeeper). ha = " + config.haMode);
}
} else{
maxwell.start();
}
} catch ( SQLException e ) {
Expand Down
52 changes: 47 additions & 5 deletions src/main/java/com/zendesk/maxwell/MaxwellConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -609,9 +609,9 @@ public class MaxwellConfig extends AbstractConfig {
public Scripting scripting;

/**
* Enable high available support (via jgroups-raft)
* Enable high available support (via jgroups-raft or zookeeper)
*/
public boolean haMode;
public String haMode;

/**
* Path to raft.xml file that configures high availability support
Expand All @@ -629,6 +629,32 @@ public class MaxwellConfig extends AbstractConfig {
*/
public int binlogEventQueueSize;

/**
* HA zookeeper address
*/
public String zookeeperServer;

/**
* session time
*/
public int zookeeperSessionTimeoutMs;

/**
* connection time
*/
public int zookeeperConnectionTimeoutMs;

/**
* maxRetries
*/
public int zookeeperMaxRetries;

/**
* retryWaitMs
*/
public int zookeeperRetryWaitMs;


/**
* Build a default configuration object.
*/
Expand Down Expand Up @@ -741,12 +767,22 @@ protected MaxwellOptionParser buildOptionParser() {
.withRequiredArg();
parser.separator();

parser.accepts( "ha", "enable high-availability mode via jgroups-raft" )
.withOptionalArg().ofType(Boolean.class);
parser.accepts( "ha", "enable high-availability mode via jgroups-raft or zookeeper" )
.withOptionalArg();
parser.accepts( "jgroups_config", "location of jgroups xml configuration file" )
.withRequiredArg();
parser.accepts( "raft_member_id", "raft memberID. (may also be specified in raft.xml)" )
.withRequiredArg();
parser.accepts("zookeeper_server","enable maxwell High Availability using zookeeper")
.withRequiredArg();
parser.accepts("zookeeper_session_timeout_ms","session timeout duration (maxwellHA on zk)")
.withRequiredArg().ofType(Integer.class);
wanghangyu817 marked this conversation as resolved.
Show resolved Hide resolved
parser.accepts("zookeeper_connection_timeout_ms","connection timeout duration (maxwellHA on zk)")
.withRequiredArg().ofType(Integer.class);
wanghangyu817 marked this conversation as resolved.
Show resolved Hide resolved
parser.accepts("zookeeper_max_retries","maximum retry (maxwellHA on zk)")
.withRequiredArg().ofType(Integer.class);
wanghangyu817 marked this conversation as resolved.
Show resolved Hide resolved
parser.accepts("zookeeper_retry_wait_ms","initial retry wait time (maxwellHA on zk)")
.withRequiredArg().ofType(Integer.class);
wanghangyu817 marked this conversation as resolved.
Show resolved Hide resolved

parser.separator();

Expand Down Expand Up @@ -1206,11 +1242,17 @@ private void setup(OptionSet options, Properties properties) {

setupEncryptionOptions(options, properties);

this.haMode = fetchBooleanOption("ha", options, properties, false);
this.haMode = fetchStringOption("ha", options, properties, null);
this.jgroupsConf = fetchStringOption("jgroups_config", options, properties, "raft.xml");
this.raftMemberID = fetchStringOption("raft_member_id", options, properties, null);
this.replicationReconnectionRetries = fetchIntegerOption("replication_reconnection_retries", options, properties, 1);

this.zookeeperServer = fetchStringOption("zookeeper_server", options, properties, null);
this.zookeeperSessionTimeoutMs = fetchIntegerOption("zookeeper_session_timeout_ms", options, properties, 6000);
this.zookeeperConnectionTimeoutMs = fetchIntegerOption("zookeeper_connection_timeout_ms", options, properties, 6000);
this.zookeeperMaxRetries = fetchIntegerOption("zookeeper_max_retries", options, properties, 3);
this.zookeeperRetryWaitMs = fetchIntegerOption("zookeeper_retry_wait_ms", options, properties, 1000);

this.binlogEventQueueSize = fetchIntegerOption("binlog_event_queue_size", options, properties, BinlogConnectorReplicator.BINLOG_QUEUE_SIZE);
}

Expand Down
108 changes: 105 additions & 3 deletions src/main/java/com/zendesk/maxwell/MaxwellHA.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,32 @@
package com.zendesk.maxwell;

import com.zendesk.maxwell.util.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.jgroups.JChannel;
import org.jgroups.protocols.raft.Log;
import org.jgroups.protocols.raft.Role;
import org.jgroups.raft.RaftHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetAddress;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Class that joins a jgroups-raft cluster of servers
* Class that joins a jgroups-raft cluster of servers or zookeeper
*/
public class MaxwellHA {
static final Logger LOGGER = LoggerFactory.getLogger(MaxwellHA.class);

private final Maxwell maxwell;
private final String jgroupsConf, raftMemberID, clientID;
private String jgroupsConf, raftMemberID, clientID;
private String zookeeperServer;
private int sessionTimeoutMs, connectionTimeoutMs, maxRetries, baseSleepTimeMs;
private boolean hasRun = false;
private AtomicBoolean isRaftLeader = new AtomicBoolean(false);

Expand All @@ -33,6 +44,26 @@ public MaxwellHA(Maxwell maxwell, String jgroupsConf, String raftMemberID, Strin
this.clientID = clientID;
}

/**
* Build a MaxwellHA object
* @param maxwell The Maxwell instance that will be run when an election is won
* @param zookeeperServer zookeeper adds
* @param sessionTimeoutMs
* @param connectionTimeoutMs
* @param maxRetries
* @param baseSleepTimeMs
* @param clientID The maxwell clientID. This will be the only one through which the actual path is stored
*/
public MaxwellHA(Maxwell maxwell, String zookeeperServer, int sessionTimeoutMs, int connectionTimeoutMs, int maxRetries, int baseSleepTimeMs, String clientID) {
this.maxwell = maxwell;
this.zookeeperServer = zookeeperServer;
this.sessionTimeoutMs = sessionTimeoutMs;
this.connectionTimeoutMs = connectionTimeoutMs;
this.maxRetries = maxRetries;
this.baseSleepTimeMs = baseSleepTimeMs;
this.clientID = clientID;
}

private void run() {
try {
if (hasRun)
Expand All @@ -53,7 +84,7 @@ private void run() {
* Does not return.
* @throws Exception if there's any issues
*/
public void startHA() throws Exception {
public void startHAJGroups() throws Exception {
JChannel ch=new JChannel(jgroupsConf);
RaftHandle handle=new RaftHandle(ch, null);
if ( raftMemberID != null )
Expand Down Expand Up @@ -83,4 +114,75 @@ public void startHA() throws Exception {

Thread.sleep(Long.MAX_VALUE);
}

/**
* indicates that Ha is started in zookeeper mode
* @throws Exception
*/
public void startHAZookeeper() throws Exception {

Lock lock = new ReentrantLock();
String hostAddress = InetAddress.getLocalHost().getHostAddress();

String electPath = "/" + clientID + "/services";
String masterPath = "/" + clientID + "/leader";
CuratorUtils cu = new CuratorUtils();
cu.setZookeeperServer(zookeeperServer);
cu.setSessionTimeoutMs(sessionTimeoutMs);
cu.setConnectionTimeoutMs(connectionTimeoutMs);
cu.setMaxRetries(maxRetries);
cu.setBaseSleepTimeMs(baseSleepTimeMs);
cu.setClientId(clientID);
cu.setElectPath(electPath);
cu.setMasterPath(masterPath);
cu.init();
CuratorFramework client = cu.getClient();
LeaderLatch leader = new LeaderLatch(client, cu.getElectPath(),hostAddress,LeaderLatch.CloseMode.NOTIFY_LEADER);
leader.start();
LOGGER.info("this node:" + hostAddress + " is participating in the election of the leader ....");
leader.addListener(new LeaderLatchListener() {
@Override
public void isLeader() {
try {
lock.lock();
cu.register();
} catch (Exception e) {
e.printStackTrace();
LOGGER.error("The node registration is abnormal, check whether the maxwell host communicates properly with the zookeeper network");
cu.stop();
System.exit(1);
}finally {
lock.unlock();
}
LOGGER.info("node:" + hostAddress + " is current leader, starting Maxwell....");
LOGGER.info("hasLeadership = " + leader.hasLeadership());

run();

try {
leader.close();
} catch (IOException e) {
e.printStackTrace();
}
cu.stop();
}

@Override
public void notLeader() {
try {
lock.lock();
LOGGER.warn("node:" + hostAddress + " lost leader");
LOGGER.warn("master-slave switchover......");
LOGGER.warn("The leadership went from " + hostAddress + " to " + leader.getLeader());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this case shut down the current maxwell process given that it has lost the leadership status?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what you mean

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I mean by the time we get this call, it means the current Maxwell process has lost leadership. That means we should probably stop the process altogether or shut down the replicator and go back into the election mode waiting for our turn once again.

Logging a warn does nothing and means we are going to keep replicator threads running and pumping duplicate data into whatever producer is configured. Additionally, the positions store will start getting conflicting writes from two different processes.

@osheroff Do I understand it correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand. I have tested this point. When I try to kill the process, the code will go straight to this location and print the results we want, and also print the node information of the next leader. If there are other exceptions that make it impossible to execute this code, please tell me and I will solve it

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave it to Ben to make a call on what should happen in this case, but I feel simply shutting down the process gracefully may be the easiest way to avoid conflicts after a leadership loss. Alternatively, it may be possible to call maxwell.terminate(); the way the JGroups-based HA implementation does it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, when the leader becomes a follower, it only means that the zookeeper connection is abnormal or the current node does not meet the conditions of the leader. During this period, the program does not care about maxwell connector status (of course, maxwell status monitoring can be added later). It's a simple switch from one node to another. If the zookeeper connection problem causes the master/slave switchover, the program will not quit, but become a follower. Next, I will add maxwell to the indicator monitoring in iterations to determine whether to switch the master/slave according to these indicators, which requires Ben @osheroff guidance

Copy link

@kovyrin kovyrin Feb 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During this period, the program does not care about maxwell connector status
If we have lost ZK connection and, consequently, lost the leader status, then the current Maxwell instance will at the very least start producing duplicate events since the other instance that is the leader now is replicating the same set of changes already. Additionally, there is a chance of both instances overwriting each other's position information in maxwell's database, which AFAIU can have negative consequences as well.

If the zookeeper connection problem causes the master/slave switchover, the program will not quit, but become a follower

What do you feel a should it mean for a Maxwell instance to become a follower? (AFAIU, there is no notion of a follower mode in current maxwell codebase)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example,if I start three maxwell instances,we label them as 1,2,3, where 1 is the leader and 2,3 are the followers. When 1 is the leader,2,3 is just a daemon process that doesn't do anything. When 1 exit the leader(not the exit caused by maxwell,but the server failure:For example,exit caused by restart,memory overflow,disk space, etc.),then 2 or 3 takes over from 1 to continue the collection task. If the maxwell process caused by mysql exits,no matter how many instances are started, the problem still presists. This is not something that can be fixed by high availability. What I need to do is to ensure that maxwell itself is highly available.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your solution absolutely solves the scenarios described. One thing I feel it is missing (or I may be confused!) is a scenario when a leader, doing its leader stuff, replicating data data, etc, loses its leadership while remaining alive and seemingly healthy (due to ZK connectivity issues, ZK restart, any other issues that force a new election). In those cases the old leader needs to step down and stop doing its usual leader things and move to a quiet follower mode (stop binlog replicator, don't write into the position store anymore, etc).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tested the scenario you described and got corresponding results. Please let me know if you have any other problems

}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
});

Thread.sleep(Long.MAX_VALUE);
}

}
Loading