Skip to content

Commit

Permalink
check and warn for invalid partition weight
Browse files Browse the repository at this point in the history
  • Loading branch information
bohhyang committed Oct 24, 2024
1 parent 5f9ef25 commit 512fa07
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 23 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.60.1] - 2024-10-24
- Check and warn for invalid partition weight

## [29.60.0] - 2024-10-17
- Restore the old constructor to avoid incompatible issue

Expand Down Expand Up @@ -5749,7 +5752,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.60.0...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.60.1...master
[29.60.1]: https://github.com/linkedin/rest.li/compare/v29.60.0...v29.60.1
[29.60.0]: https://github.com/linkedin/rest.li/compare/v29.59.0...v29.60.0
[29.59.0]: https://github.com/linkedin/rest.li/compare/v29.58.11...v29.59.0
[29.58.11]: https://github.com/linkedin/rest.li/compare/v29.58.10...v29.58.11
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package com.linkedin.d2.balancer.servers;

import java.math.BigDecimal;
import java.net.URI;
import java.util.ArrayDeque;
import java.util.Collections;
Expand Down Expand Up @@ -93,6 +94,7 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper
private final AtomicLong _markDownStartAtRef = new AtomicLong(Long.MAX_VALUE);

private volatile Map<Integer, PartitionData> _partitionDataMap;
private final Integer _maxWeight; // max weight for the weight in PartitionData, if null, no max weight is set.
private volatile Map<String, Object> _uriSpecificProperties;

private ServiceDiscoveryEventEmitter _eventEmitter;
Expand Down Expand Up @@ -123,13 +125,13 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper
private volatile boolean _markUpFailed;

// ScheduledExecutorService to schedule the end of dark warm-up, defaults to null
private ScheduledExecutorService _executorService;
private final ScheduledExecutorService _executorService;

// Boolean flag to indicate if dark warm-up is enabled, defaults to false
private boolean _isDarkWarmupEnabled;
private final boolean _isDarkWarmupEnabled;

// String to store the name of the dark warm-up cluster, defaults to null
private String _warmupClusterName;
private final String _warmupClusterName;
// Similar as _znodePath and _znodeData above but for the warm up cluster.
private final AtomicReference<String> _warmupClusterZnodePathRef = new AtomicReference<>();
private final AtomicReference<String> _warmupClusterZnodeDataRef = new AtomicReference<>();
Expand All @@ -139,7 +141,7 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper
private final AtomicLong _warmupClusterMarkDownStartAtRef = new AtomicLong(Long.MAX_VALUE);

// Field to store the dark warm-up time duration in seconds, defaults to zero
private int _warmupDuration;
private final int _warmupDuration;

/**
* @deprecated Use the constructor {@link #ZooKeeperAnnouncer(LoadBalancerServer)} instead.
Expand Down Expand Up @@ -195,26 +197,18 @@ public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp,
boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService, ServiceDiscoveryEventEmitter eventEmitter)
{
_server = server;
// initialIsUp is used for delay mark up. If it's false, there won't be markup when the announcer is started.
_isUp = initialIsUp;
_isWarmingUp = false;
_isRetryWarmup = false;
_pendingMarkDown = new ArrayDeque<>();
_pendingMarkUp = new ArrayDeque<>();
_pendingWarmupMarkDown = new ArrayDeque<>();

_isDarkWarmupEnabled = isDarkWarmupEnabled;
_warmupClusterName = warmupClusterName;
_warmupDuration = warmupDuration;
_executorService = executorService;
_eventEmitter = eventEmitter;

server.setServiceDiscoveryEventHelper(this);
this(server, initialIsUp, isDarkWarmupEnabled, warmupClusterName, warmupDuration, executorService, eventEmitter, null);
}

public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService, ServiceDiscoveryEventEmitter eventEmitter)
{
this(server, initialIsUp, isDarkWarmupEnabled, warmupClusterName, warmupDuration, executorService, eventEmitter, null);
}

public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService,
ServiceDiscoveryEventEmitter eventEmitter, Integer maxWeight)
{
_server = server;
// initialIsUp is used for delay mark up. If it's false, there won't be markup when the announcer is started.
Expand All @@ -230,6 +224,7 @@ public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
_warmupDuration = warmupDuration;
_executorService = executorService;
_eventEmitter = eventEmitter;
_maxWeight = maxWeight;

if (server instanceof ZooKeeperServer)
{
Expand Down Expand Up @@ -750,11 +745,12 @@ public void setWeight(double weight)

Map<Integer, PartitionData> partitionDataMap = new HashMap<>(1);
partitionDataMap.put(partitionId, new PartitionData(weight));
_partitionDataMap = Collections.unmodifiableMap(partitionDataMap);
setPartitionData(partitionDataMap);
}

public void setPartitionData(Map<Integer, PartitionData> partitionData)
{
validatePartitionData(partitionData);
_partitionDataMap = Collections.unmodifiableMap(new HashMap<>(partitionData));
}

Expand Down Expand Up @@ -826,4 +822,25 @@ private ImmutablePair<String, String> getZnodePathAndData(String cluster) {
}
return new ImmutablePair<>(nodePath, nodeData);
}


private void validatePartitionData(Map<Integer, PartitionData> partitionData) {
for (Map.Entry<Integer, PartitionData> entry : partitionData.entrySet()) {
double weight = entry.getValue().getWeight();
// check max weight
if (_maxWeight != null && weight > _maxWeight) {
_log.warn("", new IllegalArgumentException(String.format("[ACTION NEEDED] Weight %f in Partition %d is greater"
+ " than the max weight allowed: %d. Please correct the weight. It will be force-capped to the max weight "
+ "in the future.", weight, entry.getKey(), _maxWeight)));
// TODO: throw exception or cap weight to max weight
}

// check decimal places
if (BigDecimal.valueOf(weight).scale() > 2) {
_log.warn("", new IllegalArgumentException(String.format("Weight %f in Partition %d has more than 2 decimal "
+ "places. It will be rounded to 2 decimal places in the future.", weight, entry.getKey())));
// TODO: round weight to 2 decimal places.
}
}
}
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.60.0
version=29.60.1
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down

0 comments on commit 512fa07

Please sign in to comment.