Skip to content

Commit

Permalink
Add announcer status delegate interface and support getting server an…
Browse files Browse the repository at this point in the history
…nounce mode (#1035)

* Add announcer status delegate interface and support getting server announce mode

* adjust log msg and comment

* adjust announce mode enum order

* comment on the announcement mode enum instead of reordering it

* adjust comment
  • Loading branch information
bohhyang authored Nov 23, 2024
1 parent 5ec7eb9 commit 1eeec25
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 9 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.63.0] - 2024-11-06
- Add announcer status delegate interface

## [29.62.1] - 2024-11-05
- Enhancements in ByteString and its ByteIterator to reduce object allocation

Expand Down Expand Up @@ -5758,7 +5761,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.62.1...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.63.0...master
[29.63.0]: https://github.com/linkedin/rest.li/compare/v29.62.1...v29.63.0
[29.62.1]: https://github.com/linkedin/rest.li/compare/v29.62.0...v29.62.1
[29.62.0]: https://github.com/linkedin/rest.li/compare/v29.61.0...v29.62.0
[29.61.0]: https://github.com/linkedin/rest.li/compare/v29.60.0...v29.61.0
Expand Down
22 changes: 22 additions & 0 deletions d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,26 @@ void addUriSpecificProperty(String clusterName,
void shutdown(Callback<None> callback);

String getConnectString();

/**
* Get announce mode of the server. Some server may have different announce mode, e.g. dual write mode, force announce
* mode.
*/
AnnounceMode getAnnounceMode();

/**
* NOTE the order in this enum shows the migration progress from an old service registry to a new one.
* The ordinal is used in JMX --- each number higher means one more step completed in the migration --- which can
* ease devs to know the status.
*/
enum AnnounceMode
{
STATIC_OLD_SR_ONLY, // statically only announce to old service registry
DYNAMIC_OLD_SR_ONLY, // dynamically only announce to old service registry
DYNAMIC_DUAL_WRITE, // dynamically announce to both service registries
DYNAMIC_NEW_SR_ONLY, // dynamically only announce to new service registry
DYNAMIC_FORCE_DUAL_WRITE, // Using dynamic server yet forced to announce to both service registries
STATIC_NEW_SR_ONLY, // statically only announce to new service registry
STATIC_NEW_SR_ONLY_NO_WRITE_BACK // statically only announce to new service registry without writing back to old service registry
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.linkedin.d2.balancer.servers;

import java.net.URI;


public interface AnnouncerStatusDelegate
{
/**
* @return true if the markup intent has been sent.
*/
boolean isMarkUpIntentSent();

/**
* @return true if the dark warmup mark up intent has been sent.
*/
boolean isDarkWarmupMarkUpIntentSent();

/**
* @return the name of the regular cluster that the announcer manages.
*/
String getCluster();

/**
* @return the name of the warmup cluster that the announcer manages.
*/
String getWarmupCluster();

/**
* @return the uri that the announcer manages.
*/
URI getURI();
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
* @author Francesco Capponi ([email protected])
*/

public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper
public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper, AnnouncerStatusDelegate
{
public static final boolean DEFAULT_DARK_WARMUP_ENABLED = false;
public static final int DEFAULT_DARK_WARMUP_DURATION = 0;
Expand Down Expand Up @@ -705,6 +705,13 @@ public void onSuccess(None result)
};
}

@Override
public String getWarmupCluster()
{
return _warmupClusterName;
}

@Override
public String getCluster()
{
return _cluster;
Expand All @@ -720,6 +727,12 @@ public String getUri()
return _uri.toString();
}

@Override
public URI getURI()
{
return _uri;
}

public void setUri(String uri)
{
_uri = URI.create(uri);
Expand Down Expand Up @@ -816,11 +829,13 @@ public boolean isMarkUpFailed()
return _markUpFailed;
}

@Override
public boolean isMarkUpIntentSent()
{
return _isMarkUpIntentSent.get();
}

@Override
public boolean isDarkWarmupMarkUpIntentSent()
{
return _isDarkWarmupMarkUpIntentSent.get();
Expand All @@ -836,16 +851,22 @@ public int getWeightDecimalPlacesBreachedCount()
return _weightDecimalPlacesBreachedCount.get();
}

public LoadBalancerServer.AnnounceMode getServerAnnounceMode()
{
return _server.getAnnounceMode();
}

public void setEventEmitter(ServiceDiscoveryEventEmitter emitter) {
_eventEmitter = emitter;
}

@Override
public void emitSDStatusActiveUpdateIntentAndWriteEvents(String cluster, boolean isMarkUp, boolean succeeded, long startAt) {
// since SD event is sent in IndisAnnouncer for INDIS-write-only, inside ZookeeperAnnouncer, any calls to
// "emitSDStatusActiveUpdateIntentAndWriteEvents" should only happen when _server is an instance of
// ZooKeeperServer (which means it only emits the event when it's doing zk-only or dual write).
if (!(_server instanceof ZooKeeperServer))
// In this class, SD event should be sent only when the announcing mode is to old service registry or dual write,
// so we can directly return when _server is NOT an instance of ZooKeeperServer or the announcement mode is dynamic
// new SR only.
if (!(_server instanceof ZooKeeperServer)
|| _server.getAnnounceMode() == LoadBalancerServer.AnnounceMode.DYNAMIC_NEW_SR_ONLY)
{
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package com.linkedin.d2.balancer.servers;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -74,6 +76,9 @@ public class ZooKeeperConnectionManager extends ConnectionManager

private volatile ZooKeeperEphemeralStore<UriProperties> _store;

// Additional watchers that want to watch the connection status
private final Set<ZooKeeperConnectionWatcher> _zooKeeperConnectionWatchers = ConcurrentHashMap.newKeySet();

public ZooKeeperConnectionManager(ZKPersistentConnection zkConnection,
String zkBasePath,
ZKStoreFactory<UriProperties,ZooKeeperEphemeralStore<UriProperties>> factory,
Expand Down Expand Up @@ -292,6 +297,8 @@ public void notifyEvent(ZKPersistentConnection.Event event)
{
server.retry(Callbacks.empty());
}

_zooKeeperConnectionWatchers.forEach(ZooKeeperConnectionWatcher::onConnected);
}
break;
}
Expand All @@ -302,6 +309,11 @@ public void notifyEvent(ZKPersistentConnection.Event event)
}
}

public interface ZooKeeperConnectionWatcher
{
void onConnected();
}

/**
* Store should only be started if two conditions are satisfied
* 1. store is ready. store is ready when connection is established
Expand Down Expand Up @@ -401,4 +413,9 @@ public String getZooKeeperBasePath()
{
return _zkBasePath;
}

public void addConnectionWatcher(ZooKeeperConnectionWatcher watcher)
{
_zooKeeperConnectionWatchers.add(watcher);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public String getConnectString() {
return _store.getConnectString();
}

@Override
public AnnounceMode getAnnounceMode() {
return AnnounceMode.STATIC_OLD_SR_ONLY;
}

@Override
public void start(Callback<None> callback)
{
Expand Down Expand Up @@ -189,7 +194,7 @@ else if (!uris.Uris().contains(uri))
}
else
{
warn(_log, _store, " marked down for cluster ", clusterName, "with uri: ", uri);
warn(_log, _store, " marked down for cluster ", clusterName, " with uri: ", uri);
Map<URI, Map<Integer, PartitionData>> partitionData = new HashMap<>(2);
partitionData.put(uri, Collections.emptyMap());
_store.removePartial(clusterName, new UriProperties(clusterName, partitionData), callback);
Expand Down Expand Up @@ -367,7 +372,7 @@ protected UriProperties constructUriPropertiesForNode(final String clusterName,
return new UriProperties(clusterName, partitionDesc, uriToUriSpecificProperties);
}

private void storeGet(final String clusterName, final Callback<UriProperties> callback)
protected void storeGet(final String clusterName, final Callback<UriProperties> callback)
{
if (_store == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,9 @@ public int getWeightDecimalPlacesBreachedCount()
{
return _announcer.getWeightDecimalPlacesBreachedCount();
}

@Override
public int getServerAnnounceMode() {
return _announcer.getServerAnnounceMode().ordinal();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package com.linkedin.d2.jmx;

import com.linkedin.d2.balancer.LoadBalancerServer;
import com.linkedin.d2.balancer.properties.PartitionData;
import com.linkedin.d2.discovery.stores.PropertyStoreException;

Expand Down Expand Up @@ -95,4 +96,9 @@ void setPartitionDataUsingJson(String partitionDataJson)
* @return the times that the max number of decimal places on weight has been breached.
*/
int getWeightDecimalPlacesBreachedCount();

/**
* @return the server announce mode corresponding to {@link LoadBalancerServer#getAnnounceMode()}
*/
int getServerAnnounceMode();
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.62.1
version=29.63.0
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down

0 comments on commit 1eeec25

Please sign in to comment.