diff --git a/dubbo-metadata-report-extensions/dubbo-metadata-report-consul/src/main/java/org/apache/dubbo/metadata/store/consul/ConsulMetadataReport.java b/dubbo-metadata-report-extensions/dubbo-metadata-report-consul/src/main/java/org/apache/dubbo/metadata/store/consul/ConsulMetadataReport.java index e678bd463..fc4069030 100644 --- a/dubbo-metadata-report-extensions/dubbo-metadata-report-consul/src/main/java/org/apache/dubbo/metadata/store/consul/ConsulMetadataReport.java +++ b/dubbo-metadata-report-extensions/dubbo-metadata-report-consul/src/main/java/org/apache/dubbo/metadata/store/consul/ConsulMetadataReport.java @@ -18,24 +18,49 @@ package org.apache.dubbo.metadata.store.consul; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.config.configcenter.ConfigChangeType; +import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent; +import org.apache.dubbo.common.config.configcenter.ConfigItem; +import org.apache.dubbo.common.config.configcenter.ConfigurationListener; +import org.apache.dubbo.common.constants.CommonConstants; +import org.apache.dubbo.common.constants.LoggerCodeConstants; +import org.apache.dubbo.common.utils.JsonUtils; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.configcenter.consul.ConsulConstants; +import org.apache.dubbo.metadata.MappingChangedEvent; +import org.apache.dubbo.metadata.MappingListener; +import org.apache.dubbo.metadata.MetadataInfo; +import org.apache.dubbo.metadata.ServiceNameMapping; import org.apache.dubbo.metadata.report.identifier.BaseMetadataIdentifier; import org.apache.dubbo.metadata.report.identifier.KeyTypeEnum; import org.apache.dubbo.metadata.report.identifier.MetadataIdentifier; import org.apache.dubbo.metadata.report.identifier.ServiceMetadataIdentifier; import org.apache.dubbo.metadata.report.identifier.SubscriberMetadataIdentifier; import org.apache.dubbo.metadata.report.support.AbstractMetadataReport; +import org.apache.dubbo.rpc.Constants; import org.apache.dubbo.rpc.RpcException; import com.ecwid.consul.v1.ConsulClient; import com.ecwid.consul.v1.Response; import com.ecwid.consul.v1.kv.model.GetValue; +import com.google.common.net.HostAndPort; +import com.orbitz.consul.Consul; +import com.orbitz.consul.KeyValueClient; +import com.orbitz.consul.cache.ConsulCache; +import com.orbitz.consul.cache.KVCache; +import com.orbitz.consul.model.kv.Value; -import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.StringJoiner; +import java.util.concurrent.ConcurrentHashMap; /** @@ -43,14 +68,124 @@ */ public class ConsulMetadataReport extends AbstractMetadataReport { + private final Map watchListenerMap = new ConcurrentHashMap<>(); + + private final Map casListenerMap = new ConcurrentHashMap<>(); + private ConsulClient client; + private KeyValueClient kvClient; + + private final int watchTimeout; + + /** + * The ACL token + */ + private final String token; + + private final String root; + public ConsulMetadataReport(URL url) { super(url); + token = url.getParameter(Constants.TOKEN_KEY, (String) null); + root = url.getGroup(DEFAULT_ROOT); String host = url.getHost(); int port = ConsulConstants.INVALID_PORT != url.getPort() ? url.getPort() : ConsulConstants.DEFAULT_PORT; client = new ConsulClient(host, port); + + Consul.Builder builder = Consul.builder().withHostAndPort(HostAndPort.fromParts(host, port)); + if (StringUtils.isNotEmpty(token)) { + builder.withAclToken(token); + } + + this.kvClient = builder.build().keyValueClient(); + this.watchTimeout = url.getParameter(ConsulConstants.WATCH_TIMEOUT, ConsulConstants.DEFAULT_WATCH_TIMEOUT); + } + + @Override + public void publishAppMetadata(SubscriberMetadataIdentifier identifier, MetadataInfo metadataInfo) { + if (metadataInfo.getContent() != null) { + this.storeMetadata(identifier, metadataInfo.getContent()); + } + } + + @Override + public void unPublishAppMetadata(SubscriberMetadataIdentifier identifier, MetadataInfo metadataInfo) { + this.deleteMetadata(identifier); + } + + @Override + public MetadataInfo getAppMetadata(SubscriberMetadataIdentifier identifier, Map instanceMetadata) { + String content = new String(Base64.getDecoder().decode(getMetadata(identifier))); + return JsonUtils.toJavaObject(content, MetadataInfo.class); + } + + @Override + public ConfigItem getConfigItem(String serviceKey, String group) { + String key = buildMappingKey(group, serviceKey); + String content = getConfig(key); + return new ConfigItem(content, content); + } + + @Override + public boolean registerServiceAppMapping(String serviceInterface, String defaultMappingGroup, + String newConfigContent, Object ticket) { + try { + if (null != ticket && !(ticket instanceof String)) { + throw new IllegalArgumentException("redis publishConfigCas requires stat type ticket"); + } + + String key = buildMappingKey(defaultMappingGroup, serviceInterface); + Response response; + + if (token == null) { + response = client.setKVValue(key, newConfigContent); + } else { + response = client.setKVValue(key, newConfigContent, token, null); + } + + return response != null && response.getValue(); + } catch (Exception e) { + logger.warn(LoggerCodeConstants.TRANSPORT_FAILED_RESPONSE, "", "", "consul publishConfigCas failed.", e); + return false; + } + } + + @Override + public Set getServiceAppMapping(String serviceKey, MappingListener listener, URL url) { + String group = ServiceNameMapping.DEFAULT_MAPPING_GROUP; + + if (casListenerMap.get(buildListenerKey(serviceKey, group)) == null) { + addCasServiceMappingListener(serviceKey, group, listener); + } + + String key = buildMappingKey(group, serviceKey); + String content = getConfig(key); + return ServiceNameMapping.getAppNames(content); + } + + @Override + public Set getServiceAppMapping(String serviceKey, URL url) { + String group = url.getGroup(ServiceNameMapping.DEFAULT_MAPPING_GROUP); + String key = buildMappingKey(group, serviceKey); + String content = getConfig(key); + return ServiceNameMapping.getAppNames(content); + } + + @Override + public void removeServiceAppMappingListener(String serviceKey, MappingListener listener) { + String group = ServiceNameMapping.DEFAULT_MAPPING_GROUP; + + MappingDataListener mappingDataListener = casListenerMap.get(buildListenerKey(serviceKey, group)); + if (mappingDataListener != null) { + removeCasServiceMappingListener(serviceKey, group, listener); + } + } + + @Override + public String getServiceDefinition(MetadataIdentifier metadataIdentifier) { + return getMetadata(metadataIdentifier); } @Override @@ -80,7 +215,7 @@ protected List doGetExportedURLs(ServiceMetadataIdentifier metadataIdent if (StringUtils.isEmpty(content)) { return Collections.emptyList(); } - return new ArrayList(Arrays.asList(URL.decode(content))); + return Arrays.asList(URL.decode(content)); } @Override @@ -93,9 +228,30 @@ protected String doGetSubscribedURLs(SubscriberMetadataIdentifier subscriberMeta return getMetadata(subscriberMetadataIdentifier); } + private String getConfig(String key) { + Response response; + + if (token == null) { + response = client.getKVValue(key); + } else { + response = client.getKVValue(key, token); + } + + if (response == null || response.getValue() == null) { + return null; + } + + Base64.Decoder decoder = Base64.getDecoder(); + return new String(decoder.decode(response.getValue().getValue())); + } + private void storeMetadata(BaseMetadataIdentifier identifier, String v) { try { - client.setKVValue(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), v); + if (token == null) { + client.setKVValue(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), v); + } else { + client.setKVValue(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), v, token, null); + } } catch (Throwable t) { logger.error("Failed to put " + identifier + " to consul " + v + ", cause: " + t.getMessage(), t); throw new RpcException("Failed to put " + identifier + " to consul " + v + ", cause: " + t.getMessage(), t); @@ -104,7 +260,11 @@ private void storeMetadata(BaseMetadataIdentifier identifier, String v) { private void deleteMetadata(BaseMetadataIdentifier identifier) { try { - client.deleteKVValue(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY)); + if (token == null) { + client.deleteKVValue(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY)); + } else { + client.deleteKVValue(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), token); + } } catch (Throwable t) { logger.error("Failed to delete " + identifier + " from consul , cause: " + t.getMessage(), t); throw new RpcException("Failed to delete " + identifier + " from consul , cause: " + t.getMessage(), t); @@ -113,11 +273,18 @@ private void deleteMetadata(BaseMetadataIdentifier identifier) { private String getMetadata(BaseMetadataIdentifier identifier) { try { - Response value = client.getKVValue(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY)); + Response response; + + if (token == null) { + response = client.getKVValue(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY)); + } else { + response = client.getKVValue(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), token); + } + //FIXME CHECK - if (value != null && value.getValue() != null) { + if (response != null && response.getValue() != null) { //todo check decode value and value diff - return value.getValue().getValue(); + return response.getValue().getValue(); } return null; } catch (Throwable t) { @@ -126,8 +293,141 @@ private String getMetadata(BaseMetadataIdentifier identifier) { } } - @Override - public String getServiceDefinition(MetadataIdentifier metadataIdentifier) { - return getMetadata(metadataIdentifier); + private void addCasServiceMappingListener(String serviceKey, String group, MappingListener listener) { + String listenerKey = buildListenerKey(ServiceNameMapping.DEFAULT_MAPPING_GROUP, serviceKey); + MappingDataListener mappingDataListener = casListenerMap.computeIfAbsent(listenerKey, + (k) -> new MappingDataListener(serviceKey)); + + mappingDataListener.addListener(listener); + + ConsulListener consulListener = watchListenerMap.computeIfAbsent(listenerKey, + (k) -> new ConsulListener(serviceKey, group)); + consulListener.addListener(mappingDataListener); + } + + private void removeCasServiceMappingListener(String serviceKey, String group, MappingListener listener) { + String listenerKey = buildListenerKey(ServiceNameMapping.DEFAULT_MAPPING_GROUP, serviceKey); + MappingDataListener mappingDataListener = casListenerMap.get(listenerKey); + + if (mappingDataListener != null) { + mappingDataListener.removeListener(listener); + if (mappingDataListener.isEmpty()) { + ConsulListener consulListener = watchListenerMap.get(listenerKey); + + if (consulListener != null) { + consulListener.removeListener(mappingDataListener); + watchListenerMap.remove(listenerKey); + } + + casListenerMap.remove(listenerKey, mappingDataListener); + } + } } + + private String buildMappingKey(String group, String serviceKey) { + StringJoiner joiner = new StringJoiner(CommonConstants.GROUP_CHAR_SEPARATOR); + + joiner.add(root).add(group).add(serviceKey); + + return joiner.toString(); + } + + private String buildListenerKey(String group, String serviceKey) { + StringJoiner joiner = new StringJoiner(CommonConstants.PROPERTIES_CHAR_SEPARATOR); + + joiner.add(root).add(group).add(serviceKey); + + return joiner.toString(); + } + + class ConsulListener implements ConsulCache.Listener { + + private KVCache kvCache; + + private final Set listeners = new LinkedHashSet<>(); + + private final String normalizedKey; + + private final String group; + + public ConsulListener(String normalizedKey, String group) { + this.normalizedKey = normalizedKey; + this.group = group; + this.initKVCache(); + } + + private void initKVCache() { + this.kvCache = KVCache.newCache(ConsulMetadataReport.this.kvClient, normalizedKey, + ConsulMetadataReport.this.watchTimeout); + this.kvCache.addListener(this); + this.kvCache.start(); + } + + public void notify(Map newValues) { + Optional newValue = newValues.values().stream().filter((value) -> value.getKey().equals(normalizedKey)) + .findAny(); + newValue.ifPresent((value) -> { + Optional decodedValue = newValue.get().getValueAsString(); + decodedValue.ifPresent((v) -> { + this.listeners.forEach((l) -> { + ConfigChangedEvent event = new ConfigChangedEvent(normalizedKey, group, v, + ConfigChangeType.MODIFIED); + l.process(event); + }); + }); + }); + } + + private void addListener(ConfigurationListener listener) { + this.listeners.add(listener); + } + + private void removeListener(ConfigurationListener listener) { + this.listeners.remove(listener); + } + + } + + static class MappingDataListener implements ConfigurationListener { + + private String serviceKey; + + private Set listeners; + + public MappingDataListener(String serviceKey) { + this.serviceKey = serviceKey; + this.listeners = new HashSet<>(); + } + + public void addListener(MappingListener listener) { + this.listeners.add(listener); + } + + public void removeListener(MappingListener listener) { + this.listeners.remove(listener); + } + + public boolean isEmpty() { + return listeners.isEmpty(); + } + + @Override + public void process(ConfigChangedEvent event) { + if (ConfigChangeType.DELETED == event.getChangeType()) { + return; + } + + if (!serviceKey.equals(event.getKey()) || !serviceKey.equals(event.getGroup())) { + return; + } + + Set apps = ServiceNameMapping.getAppNames(event.getContent()); + + MappingChangedEvent mappingChangedEvent = new MappingChangedEvent(serviceKey, apps); + + listeners.forEach((listener) -> listener.onEvent(mappingChangedEvent)); + } + + } + }