Skip to content

Commit

Permalink
🎨 Better cluster topology
Browse files Browse the repository at this point in the history
  • Loading branch information
mathieucarbou committed Jun 17, 2016
1 parent 0501594 commit 5300175
Show file tree
Hide file tree
Showing 17 changed files with 391 additions and 526 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
/**
* @author Mathieu Carbou
*/
public final class Client extends AbstractNodeWithManageable<Cluster, Client> implements Serializable {
public final class Client extends AbstractNode<Cluster> implements Serializable {

private static final long serialVersionUID = 1;

Expand All @@ -44,12 +44,22 @@ public final class Client extends AbstractNodeWithManageable<Cluster, Client> im
private final ClientIdentifier clientIdentifier;
private final SortedSet<String> tags = new TreeSet<>();
private String hostName;
private ManagementRegistry managementRegistry;

private Client(ClientIdentifier clientIdentifier) {
super(clientIdentifier.getClientId());
this.clientIdentifier = Objects.requireNonNull(clientIdentifier);
}

public Optional<ManagementRegistry> getManagementRegistry() {
return Optional.ofNullable(managementRegistry);
}

public Client setManagementRegistry(ManagementRegistry managementRegistry) {
this.managementRegistry = managementRegistry;
return this;
}

public SortedSet<String> getTags() {
return tags;
}
Expand Down Expand Up @@ -161,16 +171,14 @@ public Optional<Connection> removeConnection(String id) {
return connection;
}

public Stream<Manageable> serverManageableStream() {
public Stream<ServerEntity> fetchedServerEntityStream() {
return connectionStream()
.filter(Connection::isConnectedToActiveServer)
.flatMap(Connection::serverManageableStream);
.flatMap(Connection::fetchedServerEntityStream);
}

public int getServerManageableCount() {
public int getFetchedServerEntityCount() {
return connectionStream()
.filter(Connection::isConnectedToActiveServer)
.mapToInt(Connection::getServerManageableCount).sum();
.mapToInt(Connection::getFetchedServerEntityCount).sum();
}

@Override
Expand All @@ -189,16 +197,12 @@ public boolean isConnected() {
return connectionStream().filter(Connection::isConnected).findFirst().isPresent();
}

public boolean isConnectedToActive() {
return connectionStream().filter(Connection::isConnectedToActiveServer).findFirst().isPresent();
}

public boolean isConnectedToServerManageable(String name, String type) {
return getServerManageable(name, type).isPresent();
public boolean hasFetchedServerEntity(String name, String type) {
return getFetchedServerEntity(name, type).isPresent();
}

public Optional<Manageable> getServerManageable(String name, String type) {
return serverManageableStream().filter(manageable -> manageable.is(name, type)).findFirst();
public Optional<ServerEntity> getFetchedServerEntity(String name, String type) {
return fetchedServerEntityStream().filter(serverEntity -> serverEntity.is(name, type)).findFirst();
}

@Override
Expand All @@ -216,7 +220,9 @@ public boolean equals(Object o) {

if (!connections.equals(client.connections)) return false;
if (!clientIdentifier.equals(client.clientIdentifier)) return false;
return hostName != null ? hostName.equals(client.hostName) : client.hostName == null;
if (!tags.equals(client.tags)) return false;
if (hostName != null ? !hostName.equals(client.hostName) : client.hostName != null) return false;
return managementRegistry != null ? managementRegistry.equals(client.managementRegistry) : client.managementRegistry == null;

}

Expand All @@ -225,7 +231,9 @@ public int hashCode() {
int result = super.hashCode();
result = 31 * result + connections.hashCode();
result = 31 * result + clientIdentifier.hashCode();
result = 31 * result + tags.hashCode();
result = 31 * result + (hostName != null ? hostName.hashCode() : 0);
result = 31 * result + (managementRegistry != null ? managementRegistry.hashCode() : 0);
return result;
}

Expand All @@ -241,6 +249,7 @@ public Map<String, Object> toMap() {
map.put("hostName", getHostName());
map.put("tags", tags);
map.put("connections", connectionStream().sorted((o1, o2) -> o1.getId().compareTo(o2.getId())).map(Connection::toMap).collect(Collectors.toList()));
map.put("managementRegistry", managementRegistry == null ? null : managementRegistry.toMap());
return map;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,8 @@ public Optional<Stripe> removeStripe(String id) {
return stripe;
}

public Optional<Manageable> getManageable(Context context) {
Optional<Manageable> manageable = getStripe(context).flatMap(s -> s.getActiveManageable(context));
if (manageable.isPresent()) {
return manageable;
}
return getClient(context).flatMap(c -> c.getManageable(context));
public Optional<ServerEntity> getServerEntity(Context context) {
return getStripe(context).flatMap(s -> s.getServerEntity(context));
}

public Optional<Server> getServer(Context context) {
Expand All @@ -146,13 +142,10 @@ public List<? extends Node> getNodes(Context context) {
nodes.add(stripe1);
stripe1.getServer(context).ifPresent(server -> {
nodes.add(server);
server.getManageable(context).ifPresent(nodes::add);
server.getServerEntity(context).ifPresent(nodes::add);
});
});
getClient(context).ifPresent(client -> {
nodes.add(client);
client.getManageable(context).ifPresent(nodes::add);
});
getClient(context).ifPresent(nodes::add);
return nodes;
}

Expand All @@ -165,16 +158,8 @@ public String getPath(Context context) {
return sb.toString();
}

public Stream<Manageable> allManageableStream() {
return Stream.concat(serverManageableStream(), clientManageableStream());
}

public Stream<Manageable> serverManageableStream() {
return stripeStream().flatMap(Stripe::activeManageableStream);
}

public Stream<Manageable> clientManageableStream() {
return clientStream().flatMap(Client::manageableStream);
public Stream<ServerEntity> serverEntityStream() {
return stripeStream().flatMap(Stripe::serverEntityStream);
}

public Stream<Server> serverStream() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public final class Connection extends AbstractNode<Client> implements Serializab

public static final String KEY = "connectionId";

private final Collection<String> manageableIds = new ConcurrentSkipListSet<>();
private final Collection<String> serverEntityIds = new ConcurrentSkipListSet<>();
private final Endpoint clientEndpoint;
private final String stripeId;
private final String serverId;
Expand Down Expand Up @@ -77,19 +77,19 @@ public Optional<Server> getServer() {
}
}

public Stream<Manageable> serverManageableStream() {
public Stream<ServerEntity> fetchedServerEntityStream() {
return getServer()
.map(server -> manageableIds.stream()
.map(server::getManageable)
.map(server -> serverEntityIds.stream()
.map(server::getServerEntity)
.filter(Optional::isPresent)
.map(Optional::get))
.orElse(Stream.empty());
}

public int getServerManageableCount() {
public int getFetchedServerEntityCount() {
return getServer()
.map(server -> manageableIds.stream()
.map(server::getManageable)
.map(server -> serverEntityIds.stream()
.map(server::getServerEntity)
.filter(Optional::isPresent)
.count())
.orElse(0L).intValue();
Expand Down Expand Up @@ -118,7 +118,7 @@ public boolean equals(Object o) {

Connection that = (Connection) o;

if (!manageableIds.equals(that.manageableIds)) return false;
if (!serverEntityIds.equals(that.serverEntityIds)) return false;
if (!clientEndpoint.equals(that.clientEndpoint)) return false;
if (stripeId != null ? !stripeId.equals(that.stripeId) : that.stripeId != null) return false;
return serverId != null ? serverId.equals(that.serverId) : that.serverId == null;
Expand All @@ -131,7 +131,7 @@ public int hashCode() {
// and can be different whether we opened/closed several connections in our different tests
//int result = super.hashCode();
int result = 0;
result = 31 * result + manageableIds.hashCode();
result = 31 * result + serverEntityIds.hashCode();
result = 31 * result + clientEndpoint.hashCode();
result = 31 * result + (stripeId != null ? stripeId.hashCode() : 0);
result = 31 * result + (serverId != null ? serverId.hashCode() : 0);
Expand All @@ -145,34 +145,30 @@ public Map<String, Object> toMap() {
map.put("clientEndpoint", clientEndpoint.toMap());
map.put("stripeId", this.stripeId);
map.put("serverId", this.serverId);
map.put("manageableIds", this.manageableIds);
map.put("serverEntityIds", this.serverEntityIds);
return map;
}

public void disconnectServerManageable(String name, String type) {
manageableIds.remove(Manageable.key(name, type));
public void unfetchServerEntity(String name, String type) {
serverEntityIds.remove(ServerEntity.key(name, type));
}

public boolean connectServerManageable(Manageable manageable) {
public boolean fetchServerEntity(ServerEntity serverEntity) {
if (!isConnected()) {
throw new IllegalStateException("not connnected");
}
if (!(manageable.getParent() instanceof Server)) {
throw new IllegalArgumentException(String.valueOf(manageable.getParent()));
if (!(serverEntity.getParent() instanceof Server)) {
throw new IllegalArgumentException(String.valueOf(serverEntity.getParent()));
}
Server server = (Server) manageable.getParent();
Server server = (Server) serverEntity.getParent();
if (server != getServer().get()) {
throw new IllegalStateException("wrong server manageable");
throw new IllegalStateException("wrong server serverEntity");
}
return manageableIds.add(manageable.getId());
return serverEntityIds.add(serverEntity.getId());
}

public boolean isConnectedToServerManageable(String name, String type) {
return serverManageableStream().filter(manageable -> manageable.is(name, type)).findFirst().isPresent();
}

public boolean isConnectedToActiveServer() {
return getServer().filter(Server::isActive).isPresent();
public boolean hasFetchedServerEntity(String name, String type) {
return fetchedServerEntityStream().filter(serverEntity -> serverEntity.is(name, type)).findFirst().isPresent();
}

public boolean isConnectedTo(Server server) {
Expand Down
Loading

0 comments on commit 5300175

Please sign in to comment.