Skip to content

Commit

Permalink
Merge pull request #177 from wl-net/add-zookeeper-cluster-mgmt
Browse files Browse the repository at this point in the history
Add preliminary replacement for Cassandra service management
  • Loading branch information
AndrewX192 authored Jan 19, 2020
2 parents 3c7150a + e09d927 commit 0f076b7
Show file tree
Hide file tree
Showing 5 changed files with 335 additions and 2 deletions.
1 change: 1 addition & 0 deletions platform/arcus-lib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dependencies {
compile libraries.prometheus_servlet
compile libraries.jetty_servlet

compile libraries.zookeeper
compile libraries.kafka
compile libraries.cassandraDriver
compile libraries.guiceCore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,15 @@ public class ClusterConfig {
private boolean exitOnDeregistered = true;
@Inject(optional = true) @Named("cluster.heartbeatTimeoutSec")
private long timeoutMs = TimeUnit.SECONDS.toMillis(20);

@Inject(optional = true) @Named("cluster.zk.pathPrefix")
private String clusterZkPathPrefix = "/arcus";
@Inject(optional = true) @Named("cluster.zk.host")
private String clusterZkHost;
@Inject(optional = true) @Named("cluster.zk.port")
private int clusterZkPort = 2181;
@Inject(optional = true) @Named("cluster.zk.timeout")
private int clusterZkTimeout = 3000;

public ClusterConfig() {
// TODO Auto-generated constructor stub
}
Expand Down Expand Up @@ -114,5 +122,60 @@ public void setExitOnDeregistered(boolean exitOnDeregistered) {
this.exitOnDeregistered = exitOnDeregistered;
}

/**
* @return the cluster zk path prefix
*/
public String getClusterZkPathPrefix() {
return clusterZkPathPrefix.endsWith("/") ? clusterZkPathPrefix : clusterZkPathPrefix + '/';
}

/*
* @param clusterZkPathPrefix the cluster zk path prefix
*/
public void setClusterZkPathPrefix(String clusterZkPathPrefix) {
this.clusterZkPathPrefix = clusterZkPathPrefix;
}

/**
* @return the cluster zk host
*/
public String getClusterZkHost() {
return clusterZkHost;
}

/*
* @param clusterZkHost the cluster zk host
*/
public void setClusterZkHost(String clusterZkHost) {
this.clusterZkHost = clusterZkHost;
}

/**
* @return the cluster zk port
*/
public int getClusterZkPort() {
return clusterZkPort;
}

/*
* @param clusterZkHost the cluster zk port
*/
public void setClusterZkPort(int clusterZkPort) {
this.clusterZkPort = clusterZkPort;
}

/*
* @param clusterZkTimeout the cluster zk timeout
*/
public void setClusterZkTimeout(int clusterZkTimeout) {
this.clusterZkTimeout = clusterZkTimeout;
}

/*
* @return the cluster zk timeout
*/
public int getClusterZkTimeout() {
return clusterZkTimeout;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;

import com.google.inject.Inject;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
Expand All @@ -30,21 +31,41 @@
import com.iris.bootstrap.guice.AbstractIrisModule;
import com.iris.core.dao.cassandra.CassandraModule;
import com.iris.platform.cluster.cassandra.CassandraClusterServiceDao;
import com.iris.platform.cluster.zookeeper.ZookeeperClusterServiceDao;
import com.iris.util.ThreadPoolBuilder;
import com.netflix.governator.annotations.Modules;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
*/
@Modules(include = { CassandraModule.class })
public class ClusterModule extends AbstractIrisModule {
private static final Logger logger = LoggerFactory.getLogger(ClusterModule.class);

@Inject(optional = true) @Named("cluster.service.dao")
private String clusterServiceDao = null;

@Override
protected void configure() {
// TODO move Clock to a more generic module
bind(Clock.class).toInstance(Clock.systemUTC());
bind(ClusterService.class).asEagerSingleton();
bind(ClusterServiceDao.class).to(CassandraClusterServiceDao.class);

switch (clusterServiceDao) {
default:
logger.warn("unknown cluster dao {}: using default instead", clusterServiceDao);
// fall through
case "default":
case "cassandra":
bind(ClusterServiceDao.class).to(CassandraClusterServiceDao.class);
break;
case "zookeeper":
logger.info("using zookeeper for cluster registration");
bind(ClusterServiceDao.class).to(ZookeeperClusterServiceDao.class);
break;
}
OptionalBinder.newOptionalBinder(binder(), new TypeLiteral<Set<ClusterServiceListener>>() {});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Copyright 2020 Arcus Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.iris.platform.cluster.zookeeper;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.iris.core.IrisApplicationModule;
import com.iris.core.dao.metrics.DaoMetrics;
import com.iris.info.IrisApplicationInfo;
import com.iris.platform.cluster.ClusterConfig;
import com.iris.platform.cluster.ClusterServiceDao;
import com.iris.platform.cluster.ClusterServiceRecord;
import com.iris.platform.cluster.exception.ClusterIdUnavailableException;
import com.iris.platform.cluster.exception.ClusterServiceDaoException;
import com.iris.platform.partition.PartitionConfig;
import org.apache.zookeeper.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;

public class ZookeeperClusterServiceDao implements ClusterServiceDao, Watcher {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperClusterServiceDao.class);

private final Clock clock;
private final ZooKeeper zk;
private final ZookeeperMonitor monitor;
private final String service;
private final int members;
private final String host;
private final String zkPathPrefix;

private final Gson gson;

@Inject
public ZookeeperClusterServiceDao(
Clock clock,
PartitionConfig config,
ClusterConfig clusterConfig,
@Named(IrisApplicationModule.NAME_APPLICATION_NAME) String service) throws IOException {
this.clock = clock;
this.zk = new ZooKeeper(clusterConfig.getClusterZkHost(), clusterConfig.getClusterZkTimeout(), this);
this.members = config.getMembers();
this.host = IrisApplicationInfo.getHostName();
this.service = service;
this.gson = new Gson();
this.monitor = new ZookeeperMonitor(zk);

this.zkPathPrefix = clusterConfig.getClusterZkPathPrefix();
}

@Override
public ClusterServiceRecord register() throws ClusterIdUnavailableException {
List<Integer> others =
listMembersByService(service)
.stream()
.sorted(Comparator.comparing(ClusterServiceRecord::getLastHeartbeat))
.map(ClusterServiceRecord::getMemberId)
.collect(Collectors.toList());
try (Timer.Context timer = ZookeeperClusterServiceDao.ClusterServiceMetrics.registerTimer.time()) {
Instant heartbeat = clock.instant();
// grab an empty cluster id, or wait
for (int i = 0; i < members; i++) {
if (others.contains(i)) {
continue;
}

int memberId = i;
ClusterServiceRecord csr = tryInsert(memberId, heartbeat);
if (csr != null) {
return csr;
} else {
ZookeeperClusterServiceDao.ClusterServiceMetrics.clusterRegistrationMissCounter.inc();
}
}

ZookeeperClusterServiceDao.ClusterServiceMetrics.clusterRegistrationFailedCounter.inc();
throw new ClusterIdUnavailableException("No cluster ids for service [" + service + "] were available");
}
}

private ClusterServiceRecord tryInsert(int memberId, Instant heartbeat) {
Date ts = new Date(heartbeat.toEpochMilli());
ClusterServiceRecord csr = new ClusterServiceRecord();
csr.setHost(host);
csr.setRegistered(ts.toInstant());
csr.setLastHeartbeat(ts.toInstant());
csr.setService(service);
csr.setMemberId(memberId);

try {
zk.create(zkPathPrefix + service + '/' + memberId, gson.toJson(csr).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
return csr;
} catch (KeeperException e) {
logger.info("Creating path in zookeeper for {} ", service);
if (e.code() == KeeperException.Code.NONODE) {
try {
zk.create(zkPathPrefix + service, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch(KeeperException | InterruptedException e1) {
logger.error("Failed to create path for service", e1);
return null;
}
}
} catch (InterruptedException e) {
logger.error("Failed to write to zk", e);
return null;
}
return null;
}

@Override
public ClusterServiceRecord heartbeat(ClusterServiceRecord service) throws ClusterServiceDaoException {
// Not required for this implementation - ZooKeeper will automatically expire ephemeral nodes.
return null;
}

@Override
public boolean deregister(ClusterServiceRecord record) {
return false;
}

@Override
public List<ClusterServiceRecord> listMembersByService(String service) {
List<ClusterServiceRecord> records = new ArrayList<ClusterServiceRecord>();
try(Timer.Context timer = ZookeeperClusterServiceDao.ClusterServiceMetrics.listByServiceTimer.time()) {
List<String> children = zk.getChildren(zkPathPrefix + service, false);

for (String child: children) {
ClusterServiceRecord record = transform(zk.getData(zkPathPrefix + service + '/' + child, false, null));
if (record != null) {
records.add(record);
}
}
} catch (InterruptedException e) {
// ignore
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.NONODE) {
logger.info("{} hasn't been registered in zookeeper before, will need to be created", service);
} else {
logger.warn("Failed to list members of service", e);
}
if (e.code() == KeeperException.Code.CONNECTIONLOSS) {
logger.info("Unable to communicate with zookeeper {}", e.getMessage());
}
}

return records;
}

private ClusterServiceRecord transform(byte[] zkdata) {
ClusterServiceRecord record = gson.fromJson(new String(zkdata), ClusterServiceRecord.class);
return record;
}

@Override
public void process(WatchedEvent event) {
monitor.process(event);
}

private static class ClusterServiceMetrics {
static final Timer registerTimer = DaoMetrics.upsertTimer(ClusterServiceDao.class, "register");
static final Timer deregisterTimer = DaoMetrics.deleteTimer(ClusterServiceDao.class, "deregister");
static final Timer listByServiceTimer = DaoMetrics.readTimer(ClusterServiceDao.class, "listMembersByService");
static final Counter clusterIdRegisteredCounter = DaoMetrics.counter(ClusterServiceDao.class, "clusterid.registered");
static final Counter clusterIdLostCounter = DaoMetrics.counter(ClusterServiceDao.class, "clusterid.lost");
static final Counter clusterRegistrationMissCounter = DaoMetrics.counter(ClusterServiceDao.class, "clusterregistration.collision");
static final Counter clusterRegistrationFailedCounter = DaoMetrics.counter(ClusterServiceDao.class, "clusterregistration.failed");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2020 Arcus Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.iris.platform.cluster.zookeeper;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZookeeperMonitor implements Watcher, StatCallback {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperMonitor.class);

private final ZooKeeper zk;

public ZookeeperMonitor(ZooKeeper zk) {
this.zk = zk;
}

@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.None) {
switch (event.getState()) {
case SyncConnected:
break;
case Expired:
// TODO: move this elsewhere?
logger.error("SHUTTING DOWN -- zookeeper session has been marked as expired");
System.err.println("SHUTTING DOWN -- zookeeper session has been marked as expired");
System.exit(-1);

break;
}
} else {

}
}

public void processResult(int rc, String path, Object ctx, Stat stat) {
}
}

0 comments on commit 0f076b7

Please sign in to comment.