Skip to content

Commit

Permalink
Revert "add kafka announcement only logic (linkedin#1027)"
Browse files Browse the repository at this point in the history
This reverts commit bd54a98.
  • Loading branch information
dg-builder committed Oct 18, 2024
1 parent bd54a98 commit 83e1038
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 150 deletions.
6 changes: 1 addition & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.59.0] - 2024-10-07
- Add support for announcing/deannoucing service only to INDIS

## [29.58.11] - 2024-10-03
- Add getters in ZookeeperAnnouncer

Expand Down Expand Up @@ -5746,8 +5743,7 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.59.0...master
[29.59.0]: https://github.com/linkedin/rest.li/compare/v29.58.11...v29.59.0
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.11...master
[29.58.11]: https://github.com/linkedin/rest.li/compare/v29.58.10...v29.58.11
[29.58.10]: https://github.com/linkedin/rest.li/compare/v29.58.9...v29.58.10
[29.58.9]: https://github.com/linkedin/rest.li/compare/v29.58.8...v29.58.9
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,4 @@ void addUriSpecificProperty(String clusterName,
void start(Callback<None> callback);

void shutdown(Callback<None> callback);

String getConnectString();
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@

import com.linkedin.common.callback.Callback;
import com.linkedin.common.util.None;
import com.linkedin.d2.balancer.LoadBalancerServer;
import com.linkedin.d2.balancer.properties.PartitionData;
import com.linkedin.d2.balancer.properties.PropertyKeys;
import com.linkedin.d2.balancer.properties.UriProperties;
import com.linkedin.d2.balancer.util.partitions.DefaultPartitionAccessor;
import com.linkedin.d2.discovery.event.D2ServiceDiscoveryEventHelper;
import com.linkedin.d2.discovery.event.LogOnlyServiceDiscoveryEventEmitter;
import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter;
import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter.StatusUpdateActionType;
import com.linkedin.d2.discovery.event.D2ServiceDiscoveryEventHelper;
import com.linkedin.d2.discovery.stores.zk.ZooKeeperEphemeralStore;
import com.linkedin.util.ArgumentUtil;

Expand Down Expand Up @@ -67,7 +66,7 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper
public static final int DEFAULT_DARK_WARMUP_DURATION = 0;
public static final String DEFAULT_DARK_WARMUP_CLUSTER_NAME = null;

private final LoadBalancerServer _server;
private final ZooKeeperServer _server;
private static final Logger _log = LoggerFactory.getLogger(ZooKeeperAnnouncer.class);
private volatile String _cluster;
private volatile URI _uri;
Expand Down Expand Up @@ -141,24 +140,24 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper
// Field to store the dark warm-up time duration in seconds, defaults to zero
private int _warmupDuration;

public ZooKeeperAnnouncer(LoadBalancerServer server)
public ZooKeeperAnnouncer(ZooKeeperServer server)
{
this(server, true);
}

public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp)
public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp)
{
this(server, initialIsUp, DEFAULT_DARK_WARMUP_ENABLED, DEFAULT_DARK_WARMUP_CLUSTER_NAME, DEFAULT_DARK_WARMUP_DURATION, (ScheduledExecutorService) null);
}

public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp,
boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService)
{
this(server, initialIsUp, isDarkWarmupEnabled, warmupClusterName, warmupDuration, executorService,
new LogOnlyServiceDiscoveryEventEmitter()); // default to use log-only event emitter
}

public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp,
boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService, ServiceDiscoveryEventEmitter eventEmitter)
{
_server = server;
Expand All @@ -176,10 +175,7 @@ public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
_executorService = executorService;
_eventEmitter = eventEmitter;

if (server instanceof ZooKeeperServer)
{
((ZooKeeperServer) server).setServiceDiscoveryEventHelper(this);
}
_server.setServiceDiscoveryEventHelper(this);
}

/**
Expand Down Expand Up @@ -559,21 +555,18 @@ private void drain(Deque<Callback<None>> callbacks, @Nullable Throwable t)

public void setStore(ZooKeeperEphemeralStore<UriProperties> store)
{
if (_server instanceof ZooKeeperServer)
{
store.setZnodePathAndDataCallback((cluster, path, data) -> {
if (cluster.equals(_cluster)) {
_znodePathRef.set(path);
_znodeDataRef.set(data);
} else if (cluster.equals(_warmupClusterName)) {
_warmupClusterZnodePathRef.set(path);
_warmupClusterZnodeDataRef.set(data);
} else {
_log.warn("znode path and data callback is called with unknown cluster: " + cluster + ", node path: " + path + ", and data: " + data);
}
});
((ZooKeeperServer) _server).setStore(store);
}
store.setZnodePathAndDataCallback((cluster, path, data) -> {
if (cluster.equals(_cluster)) {
_znodePathRef.set(path);
_znodeDataRef.set(data);
} else if (cluster.equals(_warmupClusterName)) {
_warmupClusterZnodePathRef.set(path);
_warmupClusterZnodeDataRef.set(data);
} else {
_log.warn("znode path and data callback is called with unknown cluster: " + cluster + ", node path: " + path + ", and data: " + data);
}
});
_server.setStore(store);
}

public synchronized void changeWeight(final Callback<None> callback, boolean doNotSlowStart)
Expand Down Expand Up @@ -725,13 +718,6 @@ public void setEventEmitter(ServiceDiscoveryEventEmitter emitter) {

@Override
public void emitSDStatusActiveUpdateIntentAndWriteEvents(String cluster, boolean isMarkUp, boolean succeeded, long startAt) {
// since SD event is sent in IndisAnnouncer for INDIS-write-only, inside ZookeeperAnnouncer, any calls to
// "emitSDStatusActiveUpdateIntentAndWriteEvents" should only happen when _server is an instance of
// ZooKeeperServer (which means it only emits the event when it's doing zk-only or dual write).
if (!(_server instanceof ZooKeeperServer))
{
return;
}
if (_eventEmitter == null) {
_log.info("Service discovery event emitter in ZookeeperAnnouncer is null. Skipping emitting events.");
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
* @version $Revision: $
*/

public class ZooKeeperConnectionManager extends ConnectionManager
public class ZooKeeperConnectionManager
{
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperConnectionManager.class);

Expand Down Expand Up @@ -79,7 +79,6 @@ public ZooKeeperConnectionManager(ZKPersistentConnection zkConnection,
ZKStoreFactory<UriProperties,ZooKeeperEphemeralStore<UriProperties>> factory,
ZooKeeperAnnouncer... servers)
{
super(servers);
_zkBasePath = zkBasePath;
_zkConnection = zkConnection;
_factory = factory;
Expand All @@ -95,7 +94,6 @@ public ZooKeeperConnectionManager(String zkConnectString, int zkSessionTimeout,
ZKStoreFactory<UriProperties,ZooKeeperEphemeralStore<UriProperties>> factory,
ZooKeeperAnnouncer... servers)
{
super(servers);
_zkConnectString = zkConnectString;
_zkSessionTimeout = zkSessionTimeout;
_zkBasePath = zkBasePath;
Expand Down Expand Up @@ -134,7 +132,6 @@ public ZooKeeperConnectionManager(String zkConnectString, int zkSessionTimeout,
this(zkConnectString, zkSessionTimeout, zkBasePath, factory, servers);
}

@Override
public void start(Callback<None> callback)
{
_managerStarted = true;
Expand All @@ -157,7 +154,6 @@ public void start(Callback<None> callback)
}
}

@Override
public void shutdown(final Callback<None> callback)
{
_managerStarted = false;
Expand All @@ -184,6 +180,68 @@ protected None convertResponse(None none) throws Exception
}
}

public void markDownAllServers(final Callback<None> callback)
{
Callback<None> markDownCallback;
if (callback != null)
{
markDownCallback = callback;
}
else
{
markDownCallback = new Callback<None>()
{
@Override
public void onError(Throwable e)
{
LOG.error("failed to mark down servers", e);
}

@Override
public void onSuccess(None result)
{
LOG.info("mark down all servers successful");
}
};
}
Callback<None> multiCallback = Callbacks.countDown(markDownCallback, _servers.length);
for (ZooKeeperAnnouncer server : _servers)
{
server.markDown(multiCallback);
}
}

public void markUpAllServers(final Callback<None> callback)
{
Callback<None> markUpCallback;
if (callback != null)
{
markUpCallback = callback;
}
else
{
markUpCallback = new Callback<None>()
{
@Override
public void onError(Throwable e)
{
LOG.error("failed to mark up servers", e);
}

@Override
public void onSuccess(None result)
{
LOG.info("mark up all servers successful");
}
};
}
Callback<None> multiCallback = Callbacks.countDown(markUpCallback, _servers.length);
for (ZooKeeperAnnouncer server : _servers)
{
server.markUp(multiCallback);
}
}

private class Listener implements ZKPersistentConnection.EventListener
{
@Override
Expand Down Expand Up @@ -295,10 +353,9 @@ public interface ZKStoreFactory<P, Z extends ZooKeeperStore<P>>
Z createStore(ZKConnection connection, String path);
}

@Override
public String getAnnouncementTargetIdentifier()
public ZooKeeperAnnouncer[] getAnnouncers()
{
return getZooKeeperConnectString();
return _servers;
}

public boolean isSessionEstablished()
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.59.0
version=29.58.11
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down

0 comments on commit 83e1038

Please sign in to comment.