Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add kafka announcement only logic #1027

Merged
merged 12 commits into from
Oct 15, 2024
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.59.0] - 2024-10-07
- Add support for announcing/deannoucing service only to INDIS

## [29.58.11] - 2024-10-03
- Add getters in ZookeeperAnnouncer

Expand Down Expand Up @@ -5743,7 +5746,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.11...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.59.0...master
[29.59.0]: https://github.com/linkedin/rest.li/compare/v29.58.11...v29.59.0
[29.58.11]: https://github.com/linkedin/rest.li/compare/v29.58.10...v29.58.11
[29.58.10]: https://github.com/linkedin/rest.li/compare/v29.58.9...v29.58.10
[29.58.9]: https://github.com/linkedin/rest.li/compare/v29.58.8...v29.58.9
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,6 @@ void addUriSpecificProperty(String clusterName,
void start(Callback<None> callback);

void shutdown(Callback<None> callback);

String getConnectString();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package com.linkedin.d2.balancer.servers;

import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.Callbacks;
import com.linkedin.common.util.None;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* ConnectionManager is an abstract class responsible for managing connections to external systems.
* It can be extended to handle specific service registries (e.g., Zookeeper).
* For example, see {@link com.linkedin.d2.balancer.servers.ZooKeeperConnectionManager} for managing Zookeeper
* connections during D2 server announcements.
* This class provides basic functionalities such as start, shutdown, markDownAllServers, and markUpAllServers which
* is called during D2 server announcements/de-announcement.
*/
public abstract class ConnectionManager
brycezhongqing marked this conversation as resolved.
Show resolved Hide resolved
{
private final ZooKeeperAnnouncer[] _servers;

private static final Logger LOG = LoggerFactory.getLogger(ConnectionManager.class);

protected ConnectionManager(ZooKeeperAnnouncer[] servers)
{
_servers = servers;
}

abstract public void start(Callback<None> callback);

abstract public void shutdown(final Callback<None> callback);

abstract public String getAnnouncementTargetIdentifier();

public void markDownAllServers(final Callback<None> callback)
{
Callback<None> markDownCallback;
if (callback != null)
{
markDownCallback = callback;
}
else
{
markDownCallback = new Callback<None>()
{
@Override
public void onError(Throwable e)
{
LOG.error("failed to mark down servers", e);
}

@Override
public void onSuccess(None result)
{
LOG.info("mark down all servers successful");
}
};
}
Callback<None> multiCallback = Callbacks.countDown(markDownCallback, _servers.length);
for (ZooKeeperAnnouncer server : _servers)
{
server.markDown(multiCallback);
}
}

public void markUpAllServers(final Callback<None> callback)
{
Callback<None> markUpCallback;
if (callback != null)
{
markUpCallback = callback;
}
else
{
markUpCallback = new Callback<None>()
{
@Override
public void onError(Throwable e)
{
LOG.error("failed to mark up servers", e);
}

@Override
public void onSuccess(None result)
{
LOG.info("mark up all servers successful");
}
};
}
Callback<None> multiCallback = Callbacks.countDown(markUpCallback, _servers.length);
for (ZooKeeperAnnouncer server : _servers)
{
server.markUp(multiCallback);
}

}

public ZooKeeperAnnouncer[] getAnnouncers()
{
return _servers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@

import com.linkedin.common.callback.Callback;
import com.linkedin.common.util.None;
import com.linkedin.d2.balancer.LoadBalancerServer;
import com.linkedin.d2.balancer.properties.PartitionData;
import com.linkedin.d2.balancer.properties.PropertyKeys;
import com.linkedin.d2.balancer.properties.UriProperties;
import com.linkedin.d2.balancer.util.partitions.DefaultPartitionAccessor;
import com.linkedin.d2.discovery.event.D2ServiceDiscoveryEventHelper;
import com.linkedin.d2.discovery.event.LogOnlyServiceDiscoveryEventEmitter;
import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter;
import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter.StatusUpdateActionType;
import com.linkedin.d2.discovery.event.D2ServiceDiscoveryEventHelper;
import com.linkedin.d2.discovery.stores.zk.ZooKeeperEphemeralStore;
import com.linkedin.util.ArgumentUtil;

Expand Down Expand Up @@ -66,7 +67,7 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper
public static final int DEFAULT_DARK_WARMUP_DURATION = 0;
public static final String DEFAULT_DARK_WARMUP_CLUSTER_NAME = null;

private final ZooKeeperServer _server;
private final LoadBalancerServer _server;
private static final Logger _log = LoggerFactory.getLogger(ZooKeeperAnnouncer.class);
private volatile String _cluster;
private volatile URI _uri;
Expand Down Expand Up @@ -140,24 +141,24 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper
// Field to store the dark warm-up time duration in seconds, defaults to zero
private int _warmupDuration;

public ZooKeeperAnnouncer(ZooKeeperServer server)
public ZooKeeperAnnouncer(LoadBalancerServer server)
{
this(server, true);
}

public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp)
public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp)
{
this(server, initialIsUp, DEFAULT_DARK_WARMUP_ENABLED, DEFAULT_DARK_WARMUP_CLUSTER_NAME, DEFAULT_DARK_WARMUP_DURATION, (ScheduledExecutorService) null);
}

public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp,
public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService)
{
this(server, initialIsUp, isDarkWarmupEnabled, warmupClusterName, warmupDuration, executorService,
new LogOnlyServiceDiscoveryEventEmitter()); // default to use log-only event emitter
}

public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp,
public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService, ServiceDiscoveryEventEmitter eventEmitter)
{
_server = server;
Expand All @@ -175,7 +176,9 @@ public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp,
_executorService = executorService;
_eventEmitter = eventEmitter;

_server.setServiceDiscoveryEventHelper(this);
if(server instanceof ZooKeeperServer){
((ZooKeeperServer) server).setServiceDiscoveryEventHelper(this);
}
}

/**
Expand Down Expand Up @@ -555,18 +558,21 @@ private void drain(Deque<Callback<None>> callbacks, @Nullable Throwable t)

public void setStore(ZooKeeperEphemeralStore<UriProperties> store)
{
store.setZnodePathAndDataCallback((cluster, path, data) -> {
if (cluster.equals(_cluster)) {
_znodePathRef.set(path);
_znodeDataRef.set(data);
} else if (cluster.equals(_warmupClusterName)) {
_warmupClusterZnodePathRef.set(path);
_warmupClusterZnodeDataRef.set(data);
} else {
_log.warn("znode path and data callback is called with unknown cluster: " + cluster + ", node path: " + path + ", and data: " + data);
}
});
_server.setStore(store);
if (_server instanceof ZooKeeperServer)
{
store.setZnodePathAndDataCallback((cluster, path, data) -> {
if (cluster.equals(_cluster)) {
_znodePathRef.set(path);
_znodeDataRef.set(data);
} else if (cluster.equals(_warmupClusterName)) {
_warmupClusterZnodePathRef.set(path);
_warmupClusterZnodeDataRef.set(data);
} else {
_log.warn("znode path and data callback is called with unknown cluster: " + cluster + ", node path: " + path + ", and data: " + data);
}
});
((ZooKeeperServer) _server).setStore(store);
}
}

public synchronized void changeWeight(final Callback<None> callback, boolean doNotSlowStart)
Expand Down Expand Up @@ -718,6 +724,13 @@ public void setEventEmitter(ServiceDiscoveryEventEmitter emitter) {

@Override
public void emitSDStatusActiveUpdateIntentAndWriteEvents(String cluster, boolean isMarkUp, boolean succeeded, long startAt) {
// since SD event is sent in IndisAnnouncer for INDIS-write-only, inside ZookeeperAnnouncer, any calls to
// "emitSDStatusActiveUpdateIntentAndWriteEvents" should only happen when _server is an instance of
// ZooKeeperServer (which means it only emits the event when it's doing zk-only or dual write).
if (!(_server instanceof ZooKeeperServer))
brycezhongqing marked this conversation as resolved.
Show resolved Hide resolved
{
return;
}
if (_eventEmitter == null) {
_log.info("Service discovery event emitter in ZookeeperAnnouncer is null. Skipping emitting events.");
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
* @version $Revision: $
*/

public class ZooKeeperConnectionManager
public class ZooKeeperConnectionManager extends ConnectionManager
{
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperConnectionManager.class);

Expand Down Expand Up @@ -79,6 +79,7 @@ public ZooKeeperConnectionManager(ZKPersistentConnection zkConnection,
ZKStoreFactory<UriProperties,ZooKeeperEphemeralStore<UriProperties>> factory,
ZooKeeperAnnouncer... servers)
{
super(servers);
_zkBasePath = zkBasePath;
_zkConnection = zkConnection;
_factory = factory;
Expand All @@ -94,6 +95,7 @@ public ZooKeeperConnectionManager(String zkConnectString, int zkSessionTimeout,
ZKStoreFactory<UriProperties,ZooKeeperEphemeralStore<UriProperties>> factory,
ZooKeeperAnnouncer... servers)
{
super(servers);
_zkConnectString = zkConnectString;
_zkSessionTimeout = zkSessionTimeout;
_zkBasePath = zkBasePath;
Expand Down Expand Up @@ -132,6 +134,7 @@ public ZooKeeperConnectionManager(String zkConnectString, int zkSessionTimeout,
this(zkConnectString, zkSessionTimeout, zkBasePath, factory, servers);
}

@Override
public void start(Callback<None> callback)
{
_managerStarted = true;
Expand All @@ -154,6 +157,7 @@ public void start(Callback<None> callback)
}
}

@Override
public void shutdown(final Callback<None> callback)
{
_managerStarted = false;
Expand All @@ -180,68 +184,6 @@ protected None convertResponse(None none) throws Exception
}
}

public void markDownAllServers(final Callback<None> callback)
{
Callback<None> markDownCallback;
if (callback != null)
{
markDownCallback = callback;
}
else
{
markDownCallback = new Callback<None>()
{
@Override
public void onError(Throwable e)
{
LOG.error("failed to mark down servers", e);
}

@Override
public void onSuccess(None result)
{
LOG.info("mark down all servers successful");
}
};
}
Callback<None> multiCallback = Callbacks.countDown(markDownCallback, _servers.length);
for (ZooKeeperAnnouncer server : _servers)
{
server.markDown(multiCallback);
}
}

public void markUpAllServers(final Callback<None> callback)
{
Callback<None> markUpCallback;
if (callback != null)
{
markUpCallback = callback;
}
else
{
markUpCallback = new Callback<None>()
{
@Override
public void onError(Throwable e)
{
LOG.error("failed to mark up servers", e);
}

@Override
public void onSuccess(None result)
{
LOG.info("mark up all servers successful");
}
};
}
Callback<None> multiCallback = Callbacks.countDown(markUpCallback, _servers.length);
for (ZooKeeperAnnouncer server : _servers)
{
server.markUp(multiCallback);
}
}

private class Listener implements ZKPersistentConnection.EventListener
{
@Override
Expand Down Expand Up @@ -353,9 +295,10 @@ public interface ZKStoreFactory<P, Z extends ZooKeeperStore<P>>
Z createStore(ZKConnection connection, String path);
}

public ZooKeeperAnnouncer[] getAnnouncers()
@Override
public String getAnnouncementTargetIdentifier()
{
return _servers;
return getZooKeeperConnectString();
}

public boolean isSessionEstablished()
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.58.11
version=29.59.0
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down
Loading