Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Node instead of D2Node and D2URIMap instead of NodeMap for xDS flow #944

Merged
merged 13 commits into from
Nov 13, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.linkedin.d2.balancer.properties;

import com.google.protobuf.Struct;
import com.linkedin.d2.balancer.properties.util.PropertyUtil;
import com.linkedin.d2.balancer.util.JacksonUtil;
import com.linkedin.d2.discovery.PropertyBuilder;
Expand All @@ -33,6 +34,7 @@
import java.util.Set;

import static com.linkedin.d2.balancer.properties.util.PropertyUtil.mapGet;
import static com.linkedin.d2.balancer.properties.util.PropertyUtil.protoStructToMap;


/**
Expand Down Expand Up @@ -84,6 +86,20 @@ public ClusterProperties fromBytes(byte[] bytes, long version) throws PropertySe
clusterProperties.setVersion(version);
return clusterProperties;
}

public ClusterProperties fromStruct(Struct struct, long version) throws PropertySerializationException
{
try
{
ClusterProperties clusterProperties = fromMap(protoStructToMap(struct));
clusterProperties.setVersion(version);
return clusterProperties;
}
catch (Exception e)
{
throw new PropertySerializationException(e);
}
}

@SuppressWarnings("unchecked")
private static <T> T mapGetOrDefault(Map<String, Object> map, String key, T defaultValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.linkedin.d2.balancer.properties;


import com.google.protobuf.Struct;
import com.linkedin.d2.balancer.properties.util.PropertyUtil;
import com.linkedin.d2.balancer.subsetting.SubsettingStrategy;
import com.linkedin.d2.balancer.util.JacksonUtil;
Expand All @@ -37,8 +38,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.d2.balancer.properties.util.PropertyUtil.mapGet;
import static com.linkedin.d2.balancer.properties.util.PropertyUtil.mapGetOrDefault;
import static com.linkedin.d2.balancer.properties.util.PropertyUtil.*;


/**
Expand Down Expand Up @@ -202,6 +202,20 @@ public ServiceProperties fromBytes(byte[] bytes, long version) throws PropertySe
return serviceProperties;
}

public ServiceProperties fromStruct(Struct struct, long version) throws PropertySerializationException
{
try
{
ServiceProperties serviceProperties = fromMap(protoStructToMap(struct));
serviceProperties.setVersion(version);
return serviceProperties;
}
catch (Exception e)
{
throw new PropertySerializationException(e);
}
}

/**
* Always return the composite class {@link ServiceStoreProperties} to include ALL properties stored on service registry (like Zookeeper),
* such as canary configs, distribution strategy, etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@

package com.linkedin.d2.balancer.properties;


import com.google.protobuf.Struct;
import com.linkedin.d2.balancer.properties.util.PropertyUtil;
import com.linkedin.d2.balancer.util.JacksonUtil;
import com.linkedin.d2.balancer.util.partitions.DefaultPartitionAccessor;
import com.linkedin.d2.discovery.PropertyBuilder;
import com.linkedin.d2.discovery.PropertySerializationException;
import com.linkedin.d2.discovery.PropertySerializer;
import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import indis.XdsD2;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UriPropertiesJsonSerializer implements PropertySerializer<UriProperties>, PropertyBuilder<UriProperties>
{
Expand Down Expand Up @@ -109,6 +109,40 @@ public UriProperties fromBytes(byte[] bytes, long version) throws PropertySerial
return uriProperties;
}

public UriProperties fromProto(XdsD2.D2URI uri) throws PropertySerializationException
PapaCharlie marked this conversation as resolved.
Show resolved Hide resolved
{
try
{
Map<URI, Map<String, Object>> applicationProperties = new HashMap<>(uri.getUriSpecificPropertiesCount());
for (Map.Entry<String, Struct> entry : uri.getUriSpecificPropertiesMap().entrySet())
{
applicationProperties.put(URI.create(entry.getKey()), PropertyUtil.protoStructToMap(entry.getValue()));
}

Map<URI, Map<Integer, PartitionData>> partitionDesc = new HashMap<>();
for (Map.Entry<String, XdsD2.D2URI.PartitionData> entry : uri.getPartitionDescMap().entrySet())
{
Map<Integer, PartitionData> partitions = new HashMap<>(entry.getValue().getWeightsCount());
for (Map.Entry<Integer, Double> partition : entry.getValue().getWeightsMap().entrySet())
{
partitions.put(partition.getKey(), new PartitionData(partition.getValue()));
}
partitionDesc.put(URI.create(entry.getKey()), partitions);
}

return new UriProperties(
uri.getClusterName(),
partitionDesc,
applicationProperties,
uri.getStat().getMzxid()
);
}
catch (Exception e)
{
throw new PropertySerializationException(e);
}
}

@Override
@SuppressWarnings("unchecked")
public UriProperties fromMap(Map<String, Object> map)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

package com.linkedin.d2.balancer.properties.util;

import com.linkedin.data.template.TemplateOutputCastException;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import com.linkedin.util.ArgumentUtil;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class PropertyUtil
Expand Down Expand Up @@ -152,4 +155,60 @@ else if (value instanceof Double && clazz.equals(Integer.class))
}
return (T) value;
}

/**
* Efficiently translates a proto JSON {@link Struct} into a {@code Map<String, Object>} without additional
* serialization or deserialization.
*/
public static Map<String, Object> protoStructToMap(Struct struct)
{
Map<String, Object> map = new HashMap<>(struct.getFieldsMap().size());
for (Map.Entry<String, Value> entry : struct.getFieldsMap().entrySet())
{
map.put(entry.getKey(), valueToObject(entry.getValue()));
}
return map;
}

private static Object valueToObject(Value value)
{
if (value.hasBoolValue())
{
return value.getBoolValue();
}
else if (value.hasStringValue())
{
return value.getStringValue();
}
else if (value.hasNumberValue())
{
return value.getNumberValue();
}
else if (value.hasNullValue())
{
return null;
}
else if (value.hasStructValue())
{
Map<String, Object> map = new HashMap<>(value.getStructValue().getFieldsCount());
for (Map.Entry<String, Value> entry : value.getStructValue().getFieldsMap().entrySet())
{
map.put(entry.getKey(), valueToObject(entry.getValue()));
}
return map;
}
else if (value.hasListValue())
{
List<Object> list = new ArrayList<>(value.getListValue().getValuesCount());
for (Value element : value.getListValue().getValuesList())
{
list.add(valueToObject(element));
}
return list;
}
else
{
throw new RuntimeException("Unexpected proto value of unknown type: " + value);
}
}
}
24 changes: 12 additions & 12 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public abstract class XdsClient
{
private static final String D2_NODE_TYPE_URL = "type.googleapis.com/indis.D2Node";
PapaCharlie marked this conversation as resolved.
Show resolved Hide resolved
private static final String D2_SYMLINK_NODE_TYPE_URL = "type.googleapis.com/indis.D2SymlinkNode";
private static final String D2_NODE_MAP_TYPE_URL = "type.googleapis.com/indis.D2NodeMap";
private static final String D2_URI_MAP_TYPE_URL = "type.googleapis.com/indis.D2URIMap";

interface ResourceWatcher
{
Expand All @@ -51,9 +51,9 @@ interface D2SymlinkNodeResourceWatcher extends ResourceWatcher
void onChanged(String resourceName, D2SymlinkNodeUpdate update);
}

interface D2NodeMapResourceWatcher extends ResourceWatcher
interface D2URIMapResourceWatcher extends ResourceWatcher
{
void onChanged(D2NodeMapUpdate update);
void onChanged(D2URIMapUpdate update);
}

interface ResourceUpdate
Expand Down Expand Up @@ -105,18 +105,18 @@ public String getVersion()
}
}

static final class D2NodeMapUpdate implements ResourceUpdate
static final class D2URIMapUpdate implements ResourceUpdate
{
String _version;
Map<String, XdsD2.D2Node> _nodeDataMap;
Map<String, XdsD2.D2URI> _nodeDataMap;
PapaCharlie marked this conversation as resolved.
Show resolved Hide resolved

D2NodeMapUpdate(String version, Map<String, XdsD2.D2Node> nodeDataMap)
D2URIMapUpdate(String version, Map<String, XdsD2.D2URI> nodeDataMap)
{
_version = version;
_nodeDataMap = nodeDataMap;
}

public Map<String, XdsD2.D2Node> getNodeDataMap()
public Map<String, XdsD2.D2URI> getURIMap()
{
return _nodeDataMap;
}
Expand All @@ -129,7 +129,7 @@ public String getVersion()

enum ResourceType
{
UNKNOWN, D2_NODE, D2_SYMLINK_NODE, D2_NODE_MAP;
UNKNOWN, D2_NODE, D2_SYMLINK_NODE, D2_URI_MAP;

static ResourceType fromTypeUrl(String typeUrl)
{
Expand All @@ -141,9 +141,9 @@ static ResourceType fromTypeUrl(String typeUrl)
{
return D2_SYMLINK_NODE;
}
if (typeUrl.equals(D2_NODE_MAP_TYPE_URL))
if (typeUrl.equals(D2_URI_MAP_TYPE_URL))
{
return D2_NODE_MAP;
return D2_URI_MAP;
}
return UNKNOWN;
}
Expand All @@ -156,8 +156,8 @@ String typeUrl()
return D2_NODE_TYPE_URL;
case D2_SYMLINK_NODE:
return D2_SYMLINK_NODE_TYPE_URL;
case D2_NODE_MAP:
return D2_NODE_MAP_TYPE_URL;
case D2_URI_MAP:
return D2_URI_MAP_TYPE_URL;
case UNKNOWN:
default:
throw new AssertionError("Unknown or missing case in enum switch: " + this);
Expand Down
28 changes: 14 additions & 14 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class XdsClientImpl extends XdsClient

private final Map<String, ResourceSubscriber> _d2NodeSubscribers = new HashMap<>();
private final Map<String, ResourceSubscriber> _d2SymlinkNodeSubscribers = new HashMap<>();
private final Map<String, ResourceSubscriber> _d2NodeMapSubscribers = new HashMap<>();
private final Map<String, ResourceSubscriber> _d2URIMapSubscribers = new HashMap<>();

private final Node _node;
private final ManagedChannel _managedChannel;
Expand Down Expand Up @@ -256,19 +256,19 @@ private void handleD2SymlinkNodeResponse(DiscoveryResponseData data)
handleResourceUpdate(updates, data.getResourceType(), data.getNonce(), errors);
}

private void handleD2NodeMapResponse(DiscoveryResponseData data)
private void handleD2URIMapResponse(DiscoveryResponseData data)
{
Map<String, D2NodeMapUpdate> updates = new HashMap<>();
Map<String, D2URIMapUpdate> updates = new HashMap<>();
List<String> errors = new ArrayList<>();

for (Resource resource: data.getResourcesList())
{
String resourceName = resource.getName();
try
{
XdsD2.D2NodeMap d2NodeMap = resource.getResource().unpack(XdsD2.D2NodeMap.class);
Map<String, XdsD2.D2Node> nodeData = d2NodeMap.getNodesMap();
updates.put(resourceName, new D2NodeMapUpdate(resource.getVersion(), nodeData));
XdsD2.D2URIMap uriMap = resource.getResource().unpack(XdsD2.D2URIMap.class);
Map<String, XdsD2.D2URI> nodeData = uriMap.getNodesMap();
updates.put(resourceName, new D2URIMapUpdate(resource.getVersion(), nodeData));
} catch (InvalidProtocolBufferException e)
{
_log.warn("Failed to unpack D2NodeMap response", e);
PapaCharlie marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -307,7 +307,7 @@ private void notifyStreamError(Status error) {
for (ResourceSubscriber subscriber : _d2NodeSubscribers.values()) {
subscriber.onError(error);
}
for (ResourceSubscriber subscriber : _d2NodeMapSubscribers.values()) {
for (ResourceSubscriber subscriber : _d2URIMapSubscribers.values()) {
subscriber.onError(error);
}
}
Expand All @@ -316,7 +316,7 @@ private void notifyStreamReconnect() {
for (ResourceSubscriber subscriber : _d2NodeSubscribers.values()) {
subscriber.onReconnect();
}
for (ResourceSubscriber subscriber : _d2NodeMapSubscribers.values()) {
for (ResourceSubscriber subscriber : _d2URIMapSubscribers.values()) {
subscriber.onReconnect();
}
}
Expand All @@ -329,8 +329,8 @@ private Map<String, ResourceSubscriber> getResourceSubscriberMap(ResourceType ty
return _d2NodeSubscribers;
case D2_SYMLINK_NODE:
return _d2SymlinkNodeSubscribers;
case D2_NODE_MAP:
return _d2NodeMapSubscribers;
case D2_URI_MAP:
return _d2URIMapSubscribers;
case UNKNOWN:
default:
throw new AssertionError("Unknown resource type");
Expand Down Expand Up @@ -375,8 +375,8 @@ private void notifyWatcher(ResourceWatcher watcher, ResourceUpdate update)
case D2_SYMLINK_NODE:
((D2SymlinkNodeResourceWatcher) watcher).onChanged(_resource, (D2SymlinkNodeUpdate) update);
break;
case D2_NODE_MAP:
((D2NodeMapResourceWatcher) watcher).onChanged((D2NodeMapUpdate) update);
case D2_URI_MAP:
((D2URIMapResourceWatcher) watcher).onChanged((D2URIMapUpdate) update);
break;
case UNKNOWN:
default:
Expand Down Expand Up @@ -649,8 +649,8 @@ private void handleResponse(DiscoveryResponseData response)
case D2_SYMLINK_NODE:
handleD2SymlinkNodeResponse(response);
break;
case D2_NODE_MAP:
handleD2NodeMapResponse(response);
case D2_URI_MAP:
handleD2URIMapResponse(response);
break;
case UNKNOWN:
_log.warn("Received an unknown type of DiscoveryResponse\n{}", respNonce);
Expand Down
Loading