Skip to content

Commit

Permalink
Merge pull request #205 from Netflix/Use_TokenMapSupplier_Hosts
Browse files Browse the repository at this point in the history
Use Hosts from Token Map Supplier
  • Loading branch information
shailesh33 authored Feb 14, 2018
2 parents 555c1a9 + b59cdee commit 4d42719
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public InetSocketAddress getSocketAddress() {
/**
* Equality checks will fail in collections between Host objects created
* from the HostSupplier, which may not know the Dynomite port, and the Host
* objects created by the load balancer.
* objects created by the token map supplier.
*/
@Override
public int hashCode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
******************************************************************************/
package com.netflix.dyno.connectionpool;

import java.util.Collection;
import java.util.List;

/**
* Interface for a supplier of host objects that map to the dynomite cluster. The {@link ConnectionPool} object can use this to
Expand All @@ -27,8 +27,8 @@
public interface HostSupplier {

/**
* Return a collection of dynomite hosts for the connection pool
* @return Collection<Host>
* Return a list of dynomite hosts for the connection pool
* @return List<Host>
*/
public Collection<Host> getHosts();
public List<Host> getHosts();
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public ConnectionPoolImpl(ConnectionFactory<CL> cFactory, ConnectionPoolConfigur
}
;

this.hostsUpdater = new HostsUpdater(cpConfiguration.getHostSupplier());
this.hostsUpdater = new HostsUpdater(cpConfiguration.getHostSupplier(), cpConfiguration.getTokenSupplier());

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,54 +15,105 @@
*/
package com.netflix.dyno.connectionpool.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import com.netflix.dyno.connectionpool.Host;
import com.netflix.dyno.connectionpool.HostSupplier;
import com.netflix.dyno.connectionpool.TokenMapSupplier;
import com.netflix.dyno.connectionpool.exception.DynoException;
import com.netflix.dyno.connectionpool.exception.NoAvailableHostsException;
import com.netflix.dyno.connectionpool.impl.lb.HostToken;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HostsUpdater {

private final HostSupplier hostSupplier;
private static final Logger Logger = LoggerFactory.getLogger(ConnectionPoolImpl.class);

private final HostSupplier hostSupplier;
private final TokenMapSupplier tokenMapSupplier;

private final AtomicBoolean stop = new AtomicBoolean(false);
private final AtomicReference<HostStatusTracker> hostTracker = new AtomicReference<HostStatusTracker>(null);

public HostsUpdater(HostSupplier hSupplier) {
public HostsUpdater(HostSupplier hSupplier, TokenMapSupplier tokenMapSupplier) {
this.hostSupplier = hSupplier;
this.tokenMapSupplier = tokenMapSupplier;
this.hostTracker.set(new HostStatusTracker());
}


public HostStatusTracker refreshHosts() {

if (stop.get() || Thread.currentThread().isInterrupted()) {
return null;
}

Collection<Host> allHosts = hostSupplier.getHosts();
List<Host> allHosts = hostSupplier.getHosts();
if (allHosts == null || allHosts.isEmpty()) {
throw new NoAvailableHostsException("No available hosts when starting HostsUpdater");
}
List<Host> hostsUp = new ArrayList<Host>();
List<Host> hostsDown = new ArrayList<Host>();

List<Host> hostsUp = new ArrayList<>();
List<Host> hostsDown = new ArrayList<>();

for (Host host : allHosts) {
if (host.isUp()) {
hostsUp.add(host);
} else {
hostsDown.add(host);
}
}


// if nothing has changed, just return the earlier hosttracker.
if (!hostTracker.get().checkIfChanged(new HashSet<>(hostsUp), new HashSet<>(hostsDown))) {
return hostTracker.get();
}

/**
* HostTracker should return the hosts that we get from TokenMapSupplier.
* Hence get the hosts from HostSupplier and map them to TokenMapSupplier
* and return them.
*/
Collections.sort(allHosts);
Set<Host> hostSet = new HashSet<>(allHosts);
// Create a list of host/Tokens
List<HostToken> hostTokens;
if (tokenMapSupplier != null) {
Logger.info("Getting Hosts from TokenMapSupplier");
hostTokens = tokenMapSupplier.getTokens(hostSet);

if (hostTokens.isEmpty()) {
throw new DynoException("No hosts in the TokenMapSupplier");
}
} else {
throw new DynoException("TokenMapSupplier not provided");
}

// The key here really needs to be a object that is overlapping between
// the host from HostSupplier and TokenMapSupplier. Since that is a
// subset of the Host object itself, Host is the key as well as value here.
Map<Host, Host> allHostSetFromTokenMapSupplier = new HashMap<>();
for (HostToken ht : hostTokens) {
allHostSetFromTokenMapSupplier.put(ht.getHost(), ht.getHost());
}

hostsUp.clear();
hostsDown.clear();

for (Host host : allHosts) {
if (host.isUp()) {
hostsUp.add(allHostSetFromTokenMapSupplier.get(host));
} else {
hostsDown.add(allHostSetFromTokenMapSupplier.get(host));
}
}

HostStatusTracker newTracker = hostTracker.get().computeNewHostStatus(hostsUp, hostsDown);
hostTracker.set(newTracker);

return hostTracker.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void beforeTest() {
cpConfig.withHostSupplier(new HostSupplier() {

@Override
public Collection<Host> getHosts() {
public List<Host> getHosts() {
return hostSupplierHosts;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void init() throws Exception {
final HostSupplier localHostSupplier = new HostSupplier() {

@Override
public Collection<Host> getHosts() {
public List<Host> getHosts() {
return Collections.singletonList(localHost);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,20 @@ public void initWithLocalHost() throws Exception {

final int port = 6379;

final Host localHost = new Host("localhost", port, "localrack", Status.Up);

final HostSupplier localHostSupplier = new HostSupplier() {
final Host hostSupplierHost = new Host("localhost", localRack, Status.Up);

@Override
public Collection<Host> getHosts() {
return Collections.singletonList(localHost);
public List<Host> getHosts() {
return Collections.singletonList(hostSupplierHost);
}
};

final TokenMapSupplier tokenSupplier = new TokenMapSupplier() {

final HostToken localHostToken = new HostToken(100000L, localHost);
final Host tokenHost = new Host("localhost", port, localRack, Status.Up);
final HostToken localHostToken = new HostToken(100000L, tokenHost);

@Override
public List<HostToken> getTokens(Set<Host> activeHosts) {
Expand All @@ -105,7 +106,7 @@ private void initWithRemoteCluster(final List<Host> hosts, final int port) throw
final HostSupplier clusterHostSupplier = new HostSupplier() {

@Override
public Collection<Host> getHosts() {
public List<Host> getHosts() {
return hosts;
}
};
Expand All @@ -125,6 +126,7 @@ public void init(HostSupplier hostSupplier, int port, TokenMapSupplier tokenSupp

client = new DynoJedisClient.Builder().withApplicationName("demo").withDynomiteClusterName("dyno_dev")
.withHostSupplier(hostSupplier)
.withTokenMapSupplier(tokenSupplier)
// .withCPConfig(
// new ConnectionPoolConfigurationImpl("demo")
// .setCompressionStrategy(ConnectionPoolConfiguration.CompressionStrategy.THRESHOLD)
Expand Down Expand Up @@ -956,6 +958,7 @@ public static void main(String args[]) throws IOException {
} else {
demo.initWithRemoteClusterFromEurekaUrl(clusterName, port);
}
// demo.initWithLocalHost();

System.out.println("Connected");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3673,25 +3673,25 @@ public long pfcount(final byte[] key) {
}

/**
* NOT SUPPORTED ! Use {@link #dyno_scan(CursorBasedResult, String...)}
* NOT SUPPORTED ! Use {@link #dyno_scan(CursorBasedResult, int, String...)}
* instead.
*
* @param cursor
* @return nothing -- throws UnsupportedOperationException when invoked
* @see #dyno_scan(CursorBasedResult, String...)
* @see #dyno_scan(CursorBasedResult, int, String...)
*/
@Override
public ScanResult<String> scan(int cursor) {
throw new UnsupportedOperationException("Not supported - use dyno_scan(String, CursorBasedResult");
}

/**
* NOT SUPPORTED ! Use {@link #dyno_scan(CursorBasedResult, String...)}
* NOT SUPPORTED ! Use {@link #dyno_scan(CursorBasedResult, int, String...)}
* instead.
*
* @param cursor
* @return nothing -- throws UnsupportedOperationException when invoked
* @see #dyno_scan(CursorBasedResult, String...)
* @see #dyno_scan(CursorBasedResult, int, String...)
*/
@Override
public ScanResult<String> scan(String cursor) {
Expand Down Expand Up @@ -3766,6 +3766,8 @@ public static class Builder {
private DynoDualWriterClient.Dial dualWriteDial;
private ConnectionPoolMonitor cpMonitor;
private SSLSocketFactory sslSocketFactory;
private TokenMapSupplier tokenMapSupplier;
private TokenMapSupplier dualWriteTokenMapSupplier;

public Builder() {
}
Expand All @@ -3790,6 +3792,11 @@ public Builder withHostSupplier(HostSupplier hSupplier) {
return this;
}

public Builder withTokenMapSupplier(TokenMapSupplier tokenMapSupplier) {
this.tokenMapSupplier = tokenMapSupplier;
return this;
}

@Deprecated
public Builder withDiscoveryClient(DiscoveryClient client) {
discoveryClient = client;
Expand All @@ -3811,6 +3818,11 @@ public Builder withDualWriteHostSupplier(HostSupplier dualWriteHostSupplier) {
return this;
}

public Builder withDualWriteTokenMapSupplier(TokenMapSupplier dualWriteTokenMapSupplier) {
this.dualWriteTokenMapSupplier = dualWriteTokenMapSupplier;
return this;
}

public Builder withDualWriteDial(DynoDualWriterClient.Dial dial) {
this.dualWriteDial = dial;
return this;
Expand Down Expand Up @@ -3850,7 +3862,8 @@ private DynoDualWriterClient buildDynoDualWriterClient() {
// client application startup
shadowConfig.setFailOnStartupIfNoHosts(false);

HostSupplier shadowSupplier = null;
//Initialize the Host Supplier
HostSupplier shadowSupplier;
if (dualWriteHostSupplier == null) {
if (hostSupplier != null && hostSupplier instanceof EurekaHostsSupplier) {
EurekaHostsSupplier eurekaSupplier = (EurekaHostsSupplier) hostSupplier;
Expand All @@ -3868,6 +3881,8 @@ private DynoDualWriterClient buildDynoDualWriterClient() {

shadowConfig.withHostSupplier(shadowSupplier);

if (dualWriteTokenMapSupplier != null)
shadowConfig.withTokenSupplier(dualWriteTokenMapSupplier);
setLoadBalancingStrategy(shadowConfig);
setHashtagConnectionPool(shadowSupplier, shadowConfig);

Expand Down Expand Up @@ -3924,6 +3939,8 @@ private ConnectionPoolImpl<Jedis> createConnectionPool(String appName, DynoOPMon
}

cpConfig.withHostSupplier(hostSupplier);
if (tokenMapSupplier != null)
cpConfig.withTokenSupplier(tokenMapSupplier);
setLoadBalancingStrategy(cpConfig);
setHashtagConnectionPool(hostSupplier, cpConfig);
JedisConnectionFactory connFactory = new JedisConnectionFactory(opMonitor, sslSocketFactory);
Expand Down Expand Up @@ -3987,15 +4004,13 @@ private void setLoadBalancingStrategy(ConnectionPoolConfigurationImpl config) {
/**
* Set the hash to the connection pool if is provided by Dynomite
* @param hostSupplier
* @param config
*/
private void setHashtagConnectionPool(HostSupplier hostSupplier, ConnectionPoolConfigurationImpl config) {
// Find the hosts from host supplier
Collection<Host> hosts = hostSupplier.getHosts();
// Convert the returned collection to an arraylist
ArrayList<Host> arrayHosts = new ArrayList<Host>(hosts);
Collections.sort(arrayHosts);
List<Host> hosts = hostSupplier.getHosts();
Collections.sort(hosts);
// Convert the arraylist to set
Set<Host> hostSet = new HashSet<Host>(arrayHosts);

// Take the token map supplier (aka the token topology from
// Dynomite)
Expand All @@ -4004,6 +4019,7 @@ private void setHashtagConnectionPool(HostSupplier hostSupplier, ConnectionPoolC
// Create a list of host/Tokens
List<HostToken> hostTokens;
if (tokenMapSupplier != null) {
Set<Host> hostSet = new HashSet<Host>(hosts);
hostTokens = tokenMapSupplier.getTokens(hostSet);
/* Dyno cannot reach the TokenMapSupplier endpoint,
* therefore no nodes can be retrieved.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ public class JedisConnectionFactoryIntegrationTest {

private final String rack = "rack1";

private final String datacenter = "rack";

private final Host localHost = new Host("localhost", port, rack, Host.Status.Up);

private final HostSupplier localHostSupplier = new HostSupplier() {

@Override
public Collection<Host> getHosts() {
public List<Host> getHosts() {
return Collections.singletonList(localHost);
}
};
Expand Down Expand Up @@ -115,6 +117,7 @@ private DynoJedisClient constructJedisClient(final boolean withSsl) throws Excep
final ConnectionPoolConfigurationImpl connectionPoolConfiguration = new ConnectionPoolConfigurationImpl(rack);
connectionPoolConfiguration.withTokenSupplier(supplier);
connectionPoolConfiguration.setLocalRack(rack);
connectionPoolConfiguration.setLocalDataCenter(datacenter);

final SSLContext sslContext = createAndInitSSLContext("client.jks");

Expand Down

0 comments on commit 4d42719

Please sign in to comment.