Skip to content

Commit

Permalink
Add metadata suport for consul (#560)
Browse files Browse the repository at this point in the history
Co-authored-by: yong.teng <[email protected]>
  • Loading branch information
eduosi and yong.teng authored Nov 8, 2024
1 parent 0095abd commit 695134f
Showing 1 changed file with 310 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,174 @@
package org.apache.dubbo.metadata.store.consul;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.configcenter.ConfigChangeType;
import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent;
import org.apache.dubbo.common.config.configcenter.ConfigItem;
import org.apache.dubbo.common.config.configcenter.ConfigurationListener;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.constants.LoggerCodeConstants;
import org.apache.dubbo.common.utils.JsonUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.configcenter.consul.ConsulConstants;
import org.apache.dubbo.metadata.MappingChangedEvent;
import org.apache.dubbo.metadata.MappingListener;
import org.apache.dubbo.metadata.MetadataInfo;
import org.apache.dubbo.metadata.ServiceNameMapping;
import org.apache.dubbo.metadata.report.identifier.BaseMetadataIdentifier;
import org.apache.dubbo.metadata.report.identifier.KeyTypeEnum;
import org.apache.dubbo.metadata.report.identifier.MetadataIdentifier;
import org.apache.dubbo.metadata.report.identifier.ServiceMetadataIdentifier;
import org.apache.dubbo.metadata.report.identifier.SubscriberMetadataIdentifier;
import org.apache.dubbo.metadata.report.support.AbstractMetadataReport;
import org.apache.dubbo.rpc.Constants;
import org.apache.dubbo.rpc.RpcException;

import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.kv.model.GetValue;
import com.google.common.net.HostAndPort;
import com.orbitz.consul.Consul;
import com.orbitz.consul.KeyValueClient;
import com.orbitz.consul.cache.ConsulCache;
import com.orbitz.consul.cache.KVCache;
import com.orbitz.consul.model.kv.Value;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;


/**
* metadata report impl for consul
*/
public class ConsulMetadataReport extends AbstractMetadataReport {

private final Map<String, ConsulListener> watchListenerMap = new ConcurrentHashMap<>();

private final Map<String, MappingDataListener> casListenerMap = new ConcurrentHashMap<>();

private ConsulClient client;

private KeyValueClient kvClient;

private final int watchTimeout;

/**
* The ACL token
*/
private final String token;

private final String root;

public ConsulMetadataReport(URL url) {
super(url);

token = url.getParameter(Constants.TOKEN_KEY, (String) null);
root = url.getGroup(DEFAULT_ROOT);
String host = url.getHost();
int port = ConsulConstants.INVALID_PORT != url.getPort() ? url.getPort() : ConsulConstants.DEFAULT_PORT;
client = new ConsulClient(host, port);

Consul.Builder builder = Consul.builder().withHostAndPort(HostAndPort.fromParts(host, port));
if (StringUtils.isNotEmpty(token)) {
builder.withAclToken(token);
}

this.kvClient = builder.build().keyValueClient();
this.watchTimeout = url.getParameter(ConsulConstants.WATCH_TIMEOUT, ConsulConstants.DEFAULT_WATCH_TIMEOUT);
}

@Override
public void publishAppMetadata(SubscriberMetadataIdentifier identifier, MetadataInfo metadataInfo) {
if (metadataInfo.getContent() != null) {
this.storeMetadata(identifier, metadataInfo.getContent());
}
}

@Override
public void unPublishAppMetadata(SubscriberMetadataIdentifier identifier, MetadataInfo metadataInfo) {
this.deleteMetadata(identifier);
}

@Override
public MetadataInfo getAppMetadata(SubscriberMetadataIdentifier identifier, Map<String, String> instanceMetadata) {
String content = new String(Base64.getDecoder().decode(getMetadata(identifier)));
return JsonUtils.toJavaObject(content, MetadataInfo.class);
}

@Override
public ConfigItem getConfigItem(String serviceKey, String group) {
String key = buildMappingKey(group, serviceKey);
String content = getConfig(key);
return new ConfigItem(content, content);
}

@Override
public boolean registerServiceAppMapping(String serviceInterface, String defaultMappingGroup,
String newConfigContent, Object ticket) {
try {
if (null != ticket && !(ticket instanceof String)) {
throw new IllegalArgumentException("redis publishConfigCas requires stat type ticket");
}

String key = buildMappingKey(defaultMappingGroup, serviceInterface);
Response<Boolean> response;

if (token == null) {
response = client.setKVValue(key, newConfigContent);
} else {
response = client.setKVValue(key, newConfigContent, token, null);
}

return response != null && response.getValue();
} catch (Exception e) {
logger.warn(LoggerCodeConstants.TRANSPORT_FAILED_RESPONSE, "", "", "consul publishConfigCas failed.", e);
return false;
}
}

@Override
public Set<String> getServiceAppMapping(String serviceKey, MappingListener listener, URL url) {
String group = ServiceNameMapping.DEFAULT_MAPPING_GROUP;

if (casListenerMap.get(buildListenerKey(serviceKey, group)) == null) {
addCasServiceMappingListener(serviceKey, group, listener);
}

String key = buildMappingKey(group, serviceKey);
String content = getConfig(key);
return ServiceNameMapping.getAppNames(content);
}

@Override
public Set<String> getServiceAppMapping(String serviceKey, URL url) {
String group = url.getGroup(ServiceNameMapping.DEFAULT_MAPPING_GROUP);
String key = buildMappingKey(group, serviceKey);
String content = getConfig(key);
return ServiceNameMapping.getAppNames(content);
}

@Override
public void removeServiceAppMappingListener(String serviceKey, MappingListener listener) {
String group = ServiceNameMapping.DEFAULT_MAPPING_GROUP;

MappingDataListener mappingDataListener = casListenerMap.get(buildListenerKey(serviceKey, group));
if (mappingDataListener != null) {
removeCasServiceMappingListener(serviceKey, group, listener);
}
}

@Override
public String getServiceDefinition(MetadataIdentifier metadataIdentifier) {
return getMetadata(metadataIdentifier);
}

@Override
Expand Down Expand Up @@ -80,7 +215,7 @@ protected List<String> doGetExportedURLs(ServiceMetadataIdentifier metadataIdent
if (StringUtils.isEmpty(content)) {
return Collections.emptyList();
}
return new ArrayList<String>(Arrays.asList(URL.decode(content)));
return Arrays.asList(URL.decode(content));
}

@Override
Expand All @@ -93,9 +228,30 @@ protected String doGetSubscribedURLs(SubscriberMetadataIdentifier subscriberMeta
return getMetadata(subscriberMetadataIdentifier);
}

private String getConfig(String key) {
Response<GetValue> response;

if (token == null) {
response = client.getKVValue(key);
} else {
response = client.getKVValue(key, token);
}

if (response == null || response.getValue() == null) {
return null;
}

Base64.Decoder decoder = Base64.getDecoder();
return new String(decoder.decode(response.getValue().getValue()));
}

private void storeMetadata(BaseMetadataIdentifier identifier, String v) {
try {
client.setKVValue(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), v);
if (token == null) {
client.setKVValue(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), v);
} else {
client.setKVValue(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), v, token, null);
}
} catch (Throwable t) {
logger.error("Failed to put " + identifier + " to consul " + v + ", cause: " + t.getMessage(), t);
throw new RpcException("Failed to put " + identifier + " to consul " + v + ", cause: " + t.getMessage(), t);
Expand All @@ -104,7 +260,11 @@ private void storeMetadata(BaseMetadataIdentifier identifier, String v) {

private void deleteMetadata(BaseMetadataIdentifier identifier) {
try {
client.deleteKVValue(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
if (token == null) {
client.deleteKVValue(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
} else {
client.deleteKVValue(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), token);
}
} catch (Throwable t) {
logger.error("Failed to delete " + identifier + " from consul , cause: " + t.getMessage(), t);
throw new RpcException("Failed to delete " + identifier + " from consul , cause: " + t.getMessage(), t);
Expand All @@ -113,11 +273,18 @@ private void deleteMetadata(BaseMetadataIdentifier identifier) {

private String getMetadata(BaseMetadataIdentifier identifier) {
try {
Response<GetValue> value = client.getKVValue(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
Response<GetValue> response;

if (token == null) {
response = client.getKVValue(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
} else {
response = client.getKVValue(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), token);
}

//FIXME CHECK
if (value != null && value.getValue() != null) {
if (response != null && response.getValue() != null) {
//todo check decode value and value diff
return value.getValue().getValue();
return response.getValue().getValue();
}
return null;
} catch (Throwable t) {
Expand All @@ -126,8 +293,141 @@ private String getMetadata(BaseMetadataIdentifier identifier) {
}
}

@Override
public String getServiceDefinition(MetadataIdentifier metadataIdentifier) {
return getMetadata(metadataIdentifier);
private void addCasServiceMappingListener(String serviceKey, String group, MappingListener listener) {
String listenerKey = buildListenerKey(ServiceNameMapping.DEFAULT_MAPPING_GROUP, serviceKey);
MappingDataListener mappingDataListener = casListenerMap.computeIfAbsent(listenerKey,
(k) -> new MappingDataListener(serviceKey));

mappingDataListener.addListener(listener);

ConsulListener consulListener = watchListenerMap.computeIfAbsent(listenerKey,
(k) -> new ConsulListener(serviceKey, group));
consulListener.addListener(mappingDataListener);
}

private void removeCasServiceMappingListener(String serviceKey, String group, MappingListener listener) {
String listenerKey = buildListenerKey(ServiceNameMapping.DEFAULT_MAPPING_GROUP, serviceKey);
MappingDataListener mappingDataListener = casListenerMap.get(listenerKey);

if (mappingDataListener != null) {
mappingDataListener.removeListener(listener);
if (mappingDataListener.isEmpty()) {
ConsulListener consulListener = watchListenerMap.get(listenerKey);

if (consulListener != null) {
consulListener.removeListener(mappingDataListener);
watchListenerMap.remove(listenerKey);
}

casListenerMap.remove(listenerKey, mappingDataListener);
}
}
}

private String buildMappingKey(String group, String serviceKey) {
StringJoiner joiner = new StringJoiner(CommonConstants.GROUP_CHAR_SEPARATOR);

joiner.add(root).add(group).add(serviceKey);

return joiner.toString();
}

private String buildListenerKey(String group, String serviceKey) {
StringJoiner joiner = new StringJoiner(CommonConstants.PROPERTIES_CHAR_SEPARATOR);

joiner.add(root).add(group).add(serviceKey);

return joiner.toString();
}

class ConsulListener implements ConsulCache.Listener<String, Value> {

private KVCache kvCache;

private final Set<ConfigurationListener> listeners = new LinkedHashSet<>();

private final String normalizedKey;

private final String group;

public ConsulListener(String normalizedKey, String group) {
this.normalizedKey = normalizedKey;
this.group = group;
this.initKVCache();
}

private void initKVCache() {
this.kvCache = KVCache.newCache(ConsulMetadataReport.this.kvClient, normalizedKey,
ConsulMetadataReport.this.watchTimeout);
this.kvCache.addListener(this);
this.kvCache.start();
}

public void notify(Map<String, Value> newValues) {
Optional<Value> newValue = newValues.values().stream().filter((value) -> value.getKey().equals(normalizedKey))
.findAny();
newValue.ifPresent((value) -> {
Optional<String> decodedValue = newValue.get().getValueAsString();
decodedValue.ifPresent((v) -> {
this.listeners.forEach((l) -> {
ConfigChangedEvent event = new ConfigChangedEvent(normalizedKey, group, v,
ConfigChangeType.MODIFIED);
l.process(event);
});
});
});
}

private void addListener(ConfigurationListener listener) {
this.listeners.add(listener);
}

private void removeListener(ConfigurationListener listener) {
this.listeners.remove(listener);
}

}

static class MappingDataListener implements ConfigurationListener {

private String serviceKey;

private Set<MappingListener> listeners;

public MappingDataListener(String serviceKey) {
this.serviceKey = serviceKey;
this.listeners = new HashSet<>();
}

public void addListener(MappingListener listener) {
this.listeners.add(listener);
}

public void removeListener(MappingListener listener) {
this.listeners.remove(listener);
}

public boolean isEmpty() {
return listeners.isEmpty();
}

@Override
public void process(ConfigChangedEvent event) {
if (ConfigChangeType.DELETED == event.getChangeType()) {
return;
}

if (!serviceKey.equals(event.getKey()) || !serviceKey.equals(event.getGroup())) {
return;
}

Set<String> apps = ServiceNameMapping.getAppNames(event.getContent());

MappingChangedEvent mappingChangedEvent = new MappingChangedEvent(serviceKey, apps);

listeners.forEach((listener) -> listener.onEvent(mappingChangedEvent));
}

}

}

0 comments on commit 695134f

Please sign in to comment.