diff --git a/dyno-core/src/main/java/com/netflix/dyno/connectionpool/Host.java b/dyno-core/src/main/java/com/netflix/dyno/connectionpool/Host.java index 36fd1201..9ecde973 100644 --- a/dyno-core/src/main/java/com/netflix/dyno/connectionpool/Host.java +++ b/dyno-core/src/main/java/com/netflix/dyno/connectionpool/Host.java @@ -150,7 +150,7 @@ public InetSocketAddress getSocketAddress() { /** * Equality checks will fail in collections between Host objects created * from the HostSupplier, which may not know the Dynomite port, and the Host - * objects created by the load balancer. + * objects created by the token map supplier. */ @Override public int hashCode() { diff --git a/dyno-core/src/main/java/com/netflix/dyno/connectionpool/HostSupplier.java b/dyno-core/src/main/java/com/netflix/dyno/connectionpool/HostSupplier.java index 2dfa1fa0..91a5b109 100644 --- a/dyno-core/src/main/java/com/netflix/dyno/connectionpool/HostSupplier.java +++ b/dyno-core/src/main/java/com/netflix/dyno/connectionpool/HostSupplier.java @@ -15,7 +15,7 @@ ******************************************************************************/ package com.netflix.dyno.connectionpool; -import java.util.Collection; +import java.util.List; /** * Interface for a supplier of host objects that map to the dynomite cluster. The {@link ConnectionPool} object can use this to @@ -27,8 +27,8 @@ public interface HostSupplier { /** - * Return a collection of dynomite hosts for the connection pool - * @return Collection + * Return a list of dynomite hosts for the connection pool + * @return List */ - public Collection getHosts(); + public List getHosts(); } diff --git a/dyno-core/src/main/java/com/netflix/dyno/connectionpool/impl/ConnectionPoolImpl.java b/dyno-core/src/main/java/com/netflix/dyno/connectionpool/impl/ConnectionPoolImpl.java index c469e5dc..6220ebea 100644 --- a/dyno-core/src/main/java/com/netflix/dyno/connectionpool/impl/ConnectionPoolImpl.java +++ b/dyno-core/src/main/java/com/netflix/dyno/connectionpool/impl/ConnectionPoolImpl.java @@ -125,7 +125,7 @@ public ConnectionPoolImpl(ConnectionFactory cFactory, ConnectionPoolConfigur } ; - this.hostsUpdater = new HostsUpdater(cpConfiguration.getHostSupplier()); + this.hostsUpdater = new HostsUpdater(cpConfiguration.getHostSupplier(), cpConfiguration.getTokenSupplier()); } diff --git a/dyno-core/src/main/java/com/netflix/dyno/connectionpool/impl/HostsUpdater.java b/dyno-core/src/main/java/com/netflix/dyno/connectionpool/impl/HostsUpdater.java index 4878cf84..19f5eda4 100644 --- a/dyno-core/src/main/java/com/netflix/dyno/connectionpool/impl/HostsUpdater.java +++ b/dyno-core/src/main/java/com/netflix/dyno/connectionpool/impl/HostsUpdater.java @@ -15,43 +15,50 @@ */ package com.netflix.dyno.connectionpool.impl; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import com.netflix.dyno.connectionpool.Host; import com.netflix.dyno.connectionpool.HostSupplier; +import com.netflix.dyno.connectionpool.TokenMapSupplier; +import com.netflix.dyno.connectionpool.exception.DynoException; import com.netflix.dyno.connectionpool.exception.NoAvailableHostsException; +import com.netflix.dyno.connectionpool.impl.lb.HostToken; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HostsUpdater { - private final HostSupplier hostSupplier; + private static final Logger Logger = LoggerFactory.getLogger(ConnectionPoolImpl.class); + + private final HostSupplier hostSupplier; + private final TokenMapSupplier tokenMapSupplier; private final AtomicBoolean stop = new AtomicBoolean(false); private final AtomicReference hostTracker = new AtomicReference(null); - public HostsUpdater(HostSupplier hSupplier) { + public HostsUpdater(HostSupplier hSupplier, TokenMapSupplier tokenMapSupplier) { this.hostSupplier = hSupplier; + this.tokenMapSupplier = tokenMapSupplier; this.hostTracker.set(new HostStatusTracker()); } public HostStatusTracker refreshHosts() { - + if (stop.get() || Thread.currentThread().isInterrupted()) { return null; } - Collection allHosts = hostSupplier.getHosts(); + List allHosts = hostSupplier.getHosts(); if (allHosts == null || allHosts.isEmpty()) { throw new NoAvailableHostsException("No available hosts when starting HostsUpdater"); } - - List hostsUp = new ArrayList(); - List hostsDown = new ArrayList(); - + + List hostsUp = new ArrayList<>(); + List hostsDown = new ArrayList<>(); + for (Host host : allHosts) { if (host.isUp()) { hostsUp.add(host); @@ -59,10 +66,54 @@ public HostStatusTracker refreshHosts() { hostsDown.add(host); } } - + + // if nothing has changed, just return the earlier hosttracker. + if (!hostTracker.get().checkIfChanged(new HashSet<>(hostsUp), new HashSet<>(hostsDown))) { + return hostTracker.get(); + } + + /** + * HostTracker should return the hosts that we get from TokenMapSupplier. + * Hence get the hosts from HostSupplier and map them to TokenMapSupplier + * and return them. + */ + Collections.sort(allHosts); + Set hostSet = new HashSet<>(allHosts); + // Create a list of host/Tokens + List hostTokens; + if (tokenMapSupplier != null) { + Logger.info("Getting Hosts from TokenMapSupplier"); + hostTokens = tokenMapSupplier.getTokens(hostSet); + + if (hostTokens.isEmpty()) { + throw new DynoException("No hosts in the TokenMapSupplier"); + } + } else { + throw new DynoException("TokenMapSupplier not provided"); + } + + // The key here really needs to be a object that is overlapping between + // the host from HostSupplier and TokenMapSupplier. Since that is a + // subset of the Host object itself, Host is the key as well as value here. + Map allHostSetFromTokenMapSupplier = new HashMap<>(); + for (HostToken ht : hostTokens) { + allHostSetFromTokenMapSupplier.put(ht.getHost(), ht.getHost()); + } + + hostsUp.clear(); + hostsDown.clear(); + + for (Host host : allHosts) { + if (host.isUp()) { + hostsUp.add(allHostSetFromTokenMapSupplier.get(host)); + } else { + hostsDown.add(allHostSetFromTokenMapSupplier.get(host)); + } + } + HostStatusTracker newTracker = hostTracker.get().computeNewHostStatus(hostsUp, hostsDown); hostTracker.set(newTracker); - + return hostTracker.get(); } diff --git a/dyno-core/src/test/java/com/netflix/dyno/connectionpool/impl/ConnectionPoolImplTest.java b/dyno-core/src/test/java/com/netflix/dyno/connectionpool/impl/ConnectionPoolImplTest.java index eacce574..25c25344 100644 --- a/dyno-core/src/test/java/com/netflix/dyno/connectionpool/impl/ConnectionPoolImplTest.java +++ b/dyno-core/src/test/java/com/netflix/dyno/connectionpool/impl/ConnectionPoolImplTest.java @@ -163,7 +163,7 @@ public void beforeTest() { cpConfig.withHostSupplier(new HostSupplier() { @Override - public Collection getHosts() { + public List getHosts() { return hostSupplierHosts; } }); diff --git a/dyno-demo/src/main/java/com/netflix/dyno/demo/redis/CustomTokenSupplierExample.java b/dyno-demo/src/main/java/com/netflix/dyno/demo/redis/CustomTokenSupplierExample.java index ef7e54c7..1530bf10 100644 --- a/dyno-demo/src/main/java/com/netflix/dyno/demo/redis/CustomTokenSupplierExample.java +++ b/dyno-demo/src/main/java/com/netflix/dyno/demo/redis/CustomTokenSupplierExample.java @@ -54,7 +54,7 @@ public void init() throws Exception { final HostSupplier localHostSupplier = new HostSupplier() { @Override - public Collection getHosts() { + public List getHosts() { return Collections.singletonList(localHost); } }; diff --git a/dyno-demo/src/main/java/com/netflix/dyno/demo/redis/DynoJedisDemo.java b/dyno-demo/src/main/java/com/netflix/dyno/demo/redis/DynoJedisDemo.java index ea947f35..e093dee4 100644 --- a/dyno-demo/src/main/java/com/netflix/dyno/demo/redis/DynoJedisDemo.java +++ b/dyno-demo/src/main/java/com/netflix/dyno/demo/redis/DynoJedisDemo.java @@ -73,19 +73,20 @@ public void initWithLocalHost() throws Exception { final int port = 6379; - final Host localHost = new Host("localhost", port, "localrack", Status.Up); final HostSupplier localHostSupplier = new HostSupplier() { + final Host hostSupplierHost = new Host("localhost", localRack, Status.Up); @Override - public Collection getHosts() { - return Collections.singletonList(localHost); + public List getHosts() { + return Collections.singletonList(hostSupplierHost); } }; final TokenMapSupplier tokenSupplier = new TokenMapSupplier() { - final HostToken localHostToken = new HostToken(100000L, localHost); + final Host tokenHost = new Host("localhost", port, localRack, Status.Up); + final HostToken localHostToken = new HostToken(100000L, tokenHost); @Override public List getTokens(Set activeHosts) { @@ -105,7 +106,7 @@ private void initWithRemoteCluster(final List hosts, final int port) throw final HostSupplier clusterHostSupplier = new HostSupplier() { @Override - public Collection getHosts() { + public List getHosts() { return hosts; } }; @@ -125,6 +126,7 @@ public void init(HostSupplier hostSupplier, int port, TokenMapSupplier tokenSupp client = new DynoJedisClient.Builder().withApplicationName("demo").withDynomiteClusterName("dyno_dev") .withHostSupplier(hostSupplier) + .withTokenMapSupplier(tokenSupplier) // .withCPConfig( // new ConnectionPoolConfigurationImpl("demo") // .setCompressionStrategy(ConnectionPoolConfiguration.CompressionStrategy.THRESHOLD) @@ -956,6 +958,7 @@ public static void main(String args[]) throws IOException { } else { demo.initWithRemoteClusterFromEurekaUrl(clusterName, port); } +// demo.initWithLocalHost(); System.out.println("Connected"); diff --git a/dyno-jedis/src/main/java/com/netflix/dyno/jedis/DynoJedisClient.java b/dyno-jedis/src/main/java/com/netflix/dyno/jedis/DynoJedisClient.java index adbfac1d..96938867 100644 --- a/dyno-jedis/src/main/java/com/netflix/dyno/jedis/DynoJedisClient.java +++ b/dyno-jedis/src/main/java/com/netflix/dyno/jedis/DynoJedisClient.java @@ -3673,12 +3673,12 @@ public long pfcount(final byte[] key) { } /** - * NOT SUPPORTED ! Use {@link #dyno_scan(CursorBasedResult, String...)} + * NOT SUPPORTED ! Use {@link #dyno_scan(CursorBasedResult, int, String...)} * instead. * * @param cursor * @return nothing -- throws UnsupportedOperationException when invoked - * @see #dyno_scan(CursorBasedResult, String...) + * @see #dyno_scan(CursorBasedResult, int, String...) */ @Override public ScanResult scan(int cursor) { @@ -3686,12 +3686,12 @@ public ScanResult scan(int cursor) { } /** - * NOT SUPPORTED ! Use {@link #dyno_scan(CursorBasedResult, String...)} + * NOT SUPPORTED ! Use {@link #dyno_scan(CursorBasedResult, int, String...)} * instead. * * @param cursor * @return nothing -- throws UnsupportedOperationException when invoked - * @see #dyno_scan(CursorBasedResult, String...) + * @see #dyno_scan(CursorBasedResult, int, String...) */ @Override public ScanResult scan(String cursor) { @@ -3766,6 +3766,8 @@ public static class Builder { private DynoDualWriterClient.Dial dualWriteDial; private ConnectionPoolMonitor cpMonitor; private SSLSocketFactory sslSocketFactory; + private TokenMapSupplier tokenMapSupplier; + private TokenMapSupplier dualWriteTokenMapSupplier; public Builder() { } @@ -3790,6 +3792,11 @@ public Builder withHostSupplier(HostSupplier hSupplier) { return this; } + public Builder withTokenMapSupplier(TokenMapSupplier tokenMapSupplier) { + this.tokenMapSupplier = tokenMapSupplier; + return this; + } + @Deprecated public Builder withDiscoveryClient(DiscoveryClient client) { discoveryClient = client; @@ -3811,6 +3818,11 @@ public Builder withDualWriteHostSupplier(HostSupplier dualWriteHostSupplier) { return this; } + public Builder withDualWriteTokenMapSupplier(TokenMapSupplier dualWriteTokenMapSupplier) { + this.dualWriteTokenMapSupplier = dualWriteTokenMapSupplier; + return this; + } + public Builder withDualWriteDial(DynoDualWriterClient.Dial dial) { this.dualWriteDial = dial; return this; @@ -3850,7 +3862,8 @@ private DynoDualWriterClient buildDynoDualWriterClient() { // client application startup shadowConfig.setFailOnStartupIfNoHosts(false); - HostSupplier shadowSupplier = null; + //Initialize the Host Supplier + HostSupplier shadowSupplier; if (dualWriteHostSupplier == null) { if (hostSupplier != null && hostSupplier instanceof EurekaHostsSupplier) { EurekaHostsSupplier eurekaSupplier = (EurekaHostsSupplier) hostSupplier; @@ -3868,6 +3881,8 @@ private DynoDualWriterClient buildDynoDualWriterClient() { shadowConfig.withHostSupplier(shadowSupplier); + if (dualWriteTokenMapSupplier != null) + shadowConfig.withTokenSupplier(dualWriteTokenMapSupplier); setLoadBalancingStrategy(shadowConfig); setHashtagConnectionPool(shadowSupplier, shadowConfig); @@ -3924,6 +3939,8 @@ private ConnectionPoolImpl createConnectionPool(String appName, DynoOPMon } cpConfig.withHostSupplier(hostSupplier); + if (tokenMapSupplier != null) + cpConfig.withTokenSupplier(tokenMapSupplier); setLoadBalancingStrategy(cpConfig); setHashtagConnectionPool(hostSupplier, cpConfig); JedisConnectionFactory connFactory = new JedisConnectionFactory(opMonitor, sslSocketFactory); @@ -3987,15 +4004,13 @@ private void setLoadBalancingStrategy(ConnectionPoolConfigurationImpl config) { /** * Set the hash to the connection pool if is provided by Dynomite * @param hostSupplier + * @param config */ private void setHashtagConnectionPool(HostSupplier hostSupplier, ConnectionPoolConfigurationImpl config) { // Find the hosts from host supplier - Collection hosts = hostSupplier.getHosts(); - // Convert the returned collection to an arraylist - ArrayList arrayHosts = new ArrayList(hosts); - Collections.sort(arrayHosts); + List hosts = hostSupplier.getHosts(); + Collections.sort(hosts); // Convert the arraylist to set - Set hostSet = new HashSet(arrayHosts); // Take the token map supplier (aka the token topology from // Dynomite) @@ -4004,6 +4019,7 @@ private void setHashtagConnectionPool(HostSupplier hostSupplier, ConnectionPoolC // Create a list of host/Tokens List hostTokens; if (tokenMapSupplier != null) { + Set hostSet = new HashSet(hosts); hostTokens = tokenMapSupplier.getTokens(hostSet); /* Dyno cannot reach the TokenMapSupplier endpoint, * therefore no nodes can be retrieved. diff --git a/dyno-jedis/src/test/java/com/netflix/dyno/jedis/JedisConnectionFactoryIntegrationTest.java b/dyno-jedis/src/test/java/com/netflix/dyno/jedis/JedisConnectionFactoryIntegrationTest.java index 71f91798..74337130 100644 --- a/dyno-jedis/src/test/java/com/netflix/dyno/jedis/JedisConnectionFactoryIntegrationTest.java +++ b/dyno-jedis/src/test/java/com/netflix/dyno/jedis/JedisConnectionFactoryIntegrationTest.java @@ -39,12 +39,14 @@ public class JedisConnectionFactoryIntegrationTest { private final String rack = "rack1"; + private final String datacenter = "rack"; + private final Host localHost = new Host("localhost", port, rack, Host.Status.Up); private final HostSupplier localHostSupplier = new HostSupplier() { @Override - public Collection getHosts() { + public List getHosts() { return Collections.singletonList(localHost); } }; @@ -115,6 +117,7 @@ private DynoJedisClient constructJedisClient(final boolean withSsl) throws Excep final ConnectionPoolConfigurationImpl connectionPoolConfiguration = new ConnectionPoolConfigurationImpl(rack); connectionPoolConfiguration.withTokenSupplier(supplier); connectionPoolConfiguration.setLocalRack(rack); + connectionPoolConfiguration.setLocalDataCenter(datacenter); final SSLContext sslContext = createAndInitSSLContext("client.jks");