Skip to content

Commit

Permalink
add kafka announcement only logic (#1027)
Browse files Browse the repository at this point in the history
  • Loading branch information
brycezhongqing authored Oct 15, 2024
1 parent af7f497 commit bd54a98
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 86 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.59.0] - 2024-10-07
- Add support for announcing/deannoucing service only to INDIS

## [29.58.11] - 2024-10-03
- Add getters in ZookeeperAnnouncer

Expand Down Expand Up @@ -5743,7 +5746,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.11...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.59.0...master
[29.59.0]: https://github.com/linkedin/rest.li/compare/v29.58.11...v29.59.0
[29.58.11]: https://github.com/linkedin/rest.li/compare/v29.58.10...v29.58.11
[29.58.10]: https://github.com/linkedin/rest.li/compare/v29.58.9...v29.58.10
[29.58.9]: https://github.com/linkedin/rest.li/compare/v29.58.8...v29.58.9
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,6 @@ void addUriSpecificProperty(String clusterName,
void start(Callback<None> callback);

void shutdown(Callback<None> callback);

String getConnectString();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package com.linkedin.d2.balancer.servers;

import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.Callbacks;
import com.linkedin.common.util.None;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* ConnectionManager is an abstract class responsible for managing connections to external systems.
* It can be extended to handle specific service registries (e.g., Zookeeper).
* For example, see {@link com.linkedin.d2.balancer.servers.ZooKeeperConnectionManager} for managing Zookeeper
* connections during D2 server announcements.
* This class provides basic functionalities such as start, shutdown, markDownAllServers, and markUpAllServers which
* is called during D2 server announcements/de-announcement.
*/
public abstract class ConnectionManager
{
private final ZooKeeperAnnouncer[] _servers;

private static final Logger LOG = LoggerFactory.getLogger(ConnectionManager.class);

protected ConnectionManager(ZooKeeperAnnouncer[] servers)
{
_servers = servers;
}

abstract public void start(Callback<None> callback);

abstract public void shutdown(final Callback<None> callback);

abstract public String getAnnouncementTargetIdentifier();

public void markDownAllServers(final Callback<None> callback)
{
Callback<None> markDownCallback;
if (callback != null)
{
markDownCallback = callback;
}
else
{
markDownCallback = new Callback<None>()
{
@Override
public void onError(Throwable e)
{
LOG.error("failed to mark down servers", e);
}

@Override
public void onSuccess(None result)
{
LOG.info("mark down all servers successful");
}
};
}
Callback<None> multiCallback = Callbacks.countDown(markDownCallback, _servers.length);
for (ZooKeeperAnnouncer server : _servers)
{
server.markDown(multiCallback);
}
}

public void markUpAllServers(final Callback<None> callback)
{
Callback<None> markUpCallback;
if (callback != null)
{
markUpCallback = callback;
}
else
{
markUpCallback = new Callback<None>()
{
@Override
public void onError(Throwable e)
{
LOG.error("failed to mark up servers", e);
}

@Override
public void onSuccess(None result)
{
LOG.info("mark up all servers successful");
}
};
}
Callback<None> multiCallback = Callbacks.countDown(markUpCallback, _servers.length);
for (ZooKeeperAnnouncer server : _servers)
{
server.markUp(multiCallback);
}

}

public ZooKeeperAnnouncer[] getAnnouncers()
{
return _servers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@

import com.linkedin.common.callback.Callback;
import com.linkedin.common.util.None;
import com.linkedin.d2.balancer.LoadBalancerServer;
import com.linkedin.d2.balancer.properties.PartitionData;
import com.linkedin.d2.balancer.properties.PropertyKeys;
import com.linkedin.d2.balancer.properties.UriProperties;
import com.linkedin.d2.balancer.util.partitions.DefaultPartitionAccessor;
import com.linkedin.d2.discovery.event.D2ServiceDiscoveryEventHelper;
import com.linkedin.d2.discovery.event.LogOnlyServiceDiscoveryEventEmitter;
import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter;
import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter.StatusUpdateActionType;
import com.linkedin.d2.discovery.event.D2ServiceDiscoveryEventHelper;
import com.linkedin.d2.discovery.stores.zk.ZooKeeperEphemeralStore;
import com.linkedin.util.ArgumentUtil;

Expand Down Expand Up @@ -66,7 +67,7 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper
public static final int DEFAULT_DARK_WARMUP_DURATION = 0;
public static final String DEFAULT_DARK_WARMUP_CLUSTER_NAME = null;

private final ZooKeeperServer _server;
private final LoadBalancerServer _server;
private static final Logger _log = LoggerFactory.getLogger(ZooKeeperAnnouncer.class);
private volatile String _cluster;
private volatile URI _uri;
Expand Down Expand Up @@ -140,24 +141,24 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper
// Field to store the dark warm-up time duration in seconds, defaults to zero
private int _warmupDuration;

public ZooKeeperAnnouncer(ZooKeeperServer server)
public ZooKeeperAnnouncer(LoadBalancerServer server)
{
this(server, true);
}

public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp)
public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp)
{
this(server, initialIsUp, DEFAULT_DARK_WARMUP_ENABLED, DEFAULT_DARK_WARMUP_CLUSTER_NAME, DEFAULT_DARK_WARMUP_DURATION, (ScheduledExecutorService) null);
}

public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp,
public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService)
{
this(server, initialIsUp, isDarkWarmupEnabled, warmupClusterName, warmupDuration, executorService,
new LogOnlyServiceDiscoveryEventEmitter()); // default to use log-only event emitter
}

public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp,
public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService, ServiceDiscoveryEventEmitter eventEmitter)
{
_server = server;
Expand All @@ -175,7 +176,10 @@ public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp,
_executorService = executorService;
_eventEmitter = eventEmitter;

_server.setServiceDiscoveryEventHelper(this);
if (server instanceof ZooKeeperServer)
{
((ZooKeeperServer) server).setServiceDiscoveryEventHelper(this);
}
}

/**
Expand Down Expand Up @@ -555,18 +559,21 @@ private void drain(Deque<Callback<None>> callbacks, @Nullable Throwable t)

public void setStore(ZooKeeperEphemeralStore<UriProperties> store)
{
store.setZnodePathAndDataCallback((cluster, path, data) -> {
if (cluster.equals(_cluster)) {
_znodePathRef.set(path);
_znodeDataRef.set(data);
} else if (cluster.equals(_warmupClusterName)) {
_warmupClusterZnodePathRef.set(path);
_warmupClusterZnodeDataRef.set(data);
} else {
_log.warn("znode path and data callback is called with unknown cluster: " + cluster + ", node path: " + path + ", and data: " + data);
}
});
_server.setStore(store);
if (_server instanceof ZooKeeperServer)
{
store.setZnodePathAndDataCallback((cluster, path, data) -> {
if (cluster.equals(_cluster)) {
_znodePathRef.set(path);
_znodeDataRef.set(data);
} else if (cluster.equals(_warmupClusterName)) {
_warmupClusterZnodePathRef.set(path);
_warmupClusterZnodeDataRef.set(data);
} else {
_log.warn("znode path and data callback is called with unknown cluster: " + cluster + ", node path: " + path + ", and data: " + data);
}
});
((ZooKeeperServer) _server).setStore(store);
}
}

public synchronized void changeWeight(final Callback<None> callback, boolean doNotSlowStart)
Expand Down Expand Up @@ -718,6 +725,13 @@ public void setEventEmitter(ServiceDiscoveryEventEmitter emitter) {

@Override
public void emitSDStatusActiveUpdateIntentAndWriteEvents(String cluster, boolean isMarkUp, boolean succeeded, long startAt) {
// since SD event is sent in IndisAnnouncer for INDIS-write-only, inside ZookeeperAnnouncer, any calls to
// "emitSDStatusActiveUpdateIntentAndWriteEvents" should only happen when _server is an instance of
// ZooKeeperServer (which means it only emits the event when it's doing zk-only or dual write).
if (!(_server instanceof ZooKeeperServer))
{
return;
}
if (_eventEmitter == null) {
_log.info("Service discovery event emitter in ZookeeperAnnouncer is null. Skipping emitting events.");
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
* @version $Revision: $
*/

public class ZooKeeperConnectionManager
public class ZooKeeperConnectionManager extends ConnectionManager
{
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperConnectionManager.class);

Expand Down Expand Up @@ -79,6 +79,7 @@ public ZooKeeperConnectionManager(ZKPersistentConnection zkConnection,
ZKStoreFactory<UriProperties,ZooKeeperEphemeralStore<UriProperties>> factory,
ZooKeeperAnnouncer... servers)
{
super(servers);
_zkBasePath = zkBasePath;
_zkConnection = zkConnection;
_factory = factory;
Expand All @@ -94,6 +95,7 @@ public ZooKeeperConnectionManager(String zkConnectString, int zkSessionTimeout,
ZKStoreFactory<UriProperties,ZooKeeperEphemeralStore<UriProperties>> factory,
ZooKeeperAnnouncer... servers)
{
super(servers);
_zkConnectString = zkConnectString;
_zkSessionTimeout = zkSessionTimeout;
_zkBasePath = zkBasePath;
Expand Down Expand Up @@ -132,6 +134,7 @@ public ZooKeeperConnectionManager(String zkConnectString, int zkSessionTimeout,
this(zkConnectString, zkSessionTimeout, zkBasePath, factory, servers);
}

@Override
public void start(Callback<None> callback)
{
_managerStarted = true;
Expand All @@ -154,6 +157,7 @@ public void start(Callback<None> callback)
}
}

@Override
public void shutdown(final Callback<None> callback)
{
_managerStarted = false;
Expand All @@ -180,68 +184,6 @@ protected None convertResponse(None none) throws Exception
}
}

public void markDownAllServers(final Callback<None> callback)
{
Callback<None> markDownCallback;
if (callback != null)
{
markDownCallback = callback;
}
else
{
markDownCallback = new Callback<None>()
{
@Override
public void onError(Throwable e)
{
LOG.error("failed to mark down servers", e);
}

@Override
public void onSuccess(None result)
{
LOG.info("mark down all servers successful");
}
};
}
Callback<None> multiCallback = Callbacks.countDown(markDownCallback, _servers.length);
for (ZooKeeperAnnouncer server : _servers)
{
server.markDown(multiCallback);
}
}

public void markUpAllServers(final Callback<None> callback)
{
Callback<None> markUpCallback;
if (callback != null)
{
markUpCallback = callback;
}
else
{
markUpCallback = new Callback<None>()
{
@Override
public void onError(Throwable e)
{
LOG.error("failed to mark up servers", e);
}

@Override
public void onSuccess(None result)
{
LOG.info("mark up all servers successful");
}
};
}
Callback<None> multiCallback = Callbacks.countDown(markUpCallback, _servers.length);
for (ZooKeeperAnnouncer server : _servers)
{
server.markUp(multiCallback);
}
}

private class Listener implements ZKPersistentConnection.EventListener
{
@Override
Expand Down Expand Up @@ -353,9 +295,10 @@ public interface ZKStoreFactory<P, Z extends ZooKeeperStore<P>>
Z createStore(ZKConnection connection, String path);
}

public ZooKeeperAnnouncer[] getAnnouncers()
@Override
public String getAnnouncementTargetIdentifier()
{
return _servers;
return getZooKeeperConnectString();
}

public boolean isSessionEstablished()
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.58.11
version=29.59.0
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down

0 comments on commit bd54a98

Please sign in to comment.