Skip to content

4.x: Add advanced shard awareness #517

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: scylla-4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,17 @@ public enum DefaultDriverOption implements DriverOption {
* <p>Value-type: boolean
*/
CONNECTION_WARN_INIT_ERROR("advanced.connection.warn-on-init-error"),
/**
* Whether to use advanced shard awareness.
*
* <p>Value-type: boolean
*/
CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED(
"advanced.connection.advanced-shard-awareness.enabled"),
/** Inclusive lower bound of port range to use in advanced shard awareness */
ADVANCED_SHARD_AWARENESS_PORT_LOW("advanced.connection.advanced-shard-awareness.port-low"),
/** Inclusive upper bound of port range to use in advanced shard awareness */
ADVANCED_SHARD_AWARENESS_PORT_HIGH("advanced.connection.advanced-shard-awareness.port-high"),
/**
* The number of connections in the LOCAL pool.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,9 @@ protected static void fillWithDriverDefaults(OptionsMap map) {
map.put(TypedDriverOption.CONNECTION_MAX_REQUESTS, 1024);
map.put(TypedDriverOption.CONNECTION_MAX_ORPHAN_REQUESTS, 256);
map.put(TypedDriverOption.CONNECTION_WARN_INIT_ERROR, true);
map.put(TypedDriverOption.CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED, true);
map.put(TypedDriverOption.ADVANCED_SHARD_AWARENESS_PORT_LOW, 10000);
map.put(TypedDriverOption.ADVANCED_SHARD_AWARENESS_PORT_HIGH, 60000);
map.put(TypedDriverOption.RECONNECT_ON_INIT, false);
map.put(TypedDriverOption.RECONNECTION_POLICY_CLASS, "ExponentialReconnectionPolicy");
map.put(TypedDriverOption.RECONNECTION_BASE_DELAY, Duration.ofSeconds(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,18 @@ public String toString() {
/** Whether to log non-fatal errors when the driver tries to open a new connection. */
public static final TypedDriverOption<Boolean> CONNECTION_WARN_INIT_ERROR =
new TypedDriverOption<>(DefaultDriverOption.CONNECTION_WARN_INIT_ERROR, GenericType.BOOLEAN);
/** Whether to use advanced shard awareness */
public static final TypedDriverOption<Boolean> CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED =
new TypedDriverOption<>(
DefaultDriverOption.CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED, GenericType.BOOLEAN);
/** Inclusive lower bound of port range to use in advanced shard awareness */
public static final TypedDriverOption<Integer> ADVANCED_SHARD_AWARENESS_PORT_LOW =
new TypedDriverOption<>(
DefaultDriverOption.ADVANCED_SHARD_AWARENESS_PORT_LOW, GenericType.INTEGER);
/** Inclusive upper bound of port range to use in advanced shard awareness */
public static final TypedDriverOption<Integer> ADVANCED_SHARD_AWARENESS_PORT_HIGH =
new TypedDriverOption<>(
DefaultDriverOption.ADVANCED_SHARD_AWARENESS_PORT_HIGH, GenericType.INTEGER);
/** The number of connections in the LOCAL pool. */
public static final TypedDriverOption<Integer> CONNECTION_POOL_LOCAL_SIZE =
new TypedDriverOption<>(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, GenericType.INTEGER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeShardingInfo;
import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric;
import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric;
import com.datastax.oss.driver.internal.core.config.typesafe.TypesafeDriverConfig;
Expand All @@ -51,6 +52,8 @@
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -153,12 +156,27 @@ public CompletionStage<DriverChannel> connect(Node node, DriverChannelOptions op
} else {
nodeMetricUpdater = NoopNodeMetricUpdater.INSTANCE;
}
return connect(node.getEndPoint(), options, nodeMetricUpdater);
return connect(node.getEndPoint(), null, null, options, nodeMetricUpdater);
}

public CompletionStage<DriverChannel> connect(
Node node, Integer shardId, DriverChannelOptions options) {
NodeMetricUpdater nodeMetricUpdater;
if (node instanceof DefaultNode) {
nodeMetricUpdater = ((DefaultNode) node).getMetricUpdater();
} else {
nodeMetricUpdater = NoopNodeMetricUpdater.INSTANCE;
}
return connect(node.getEndPoint(), node.getShardingInfo(), shardId, options, nodeMetricUpdater);
}

@VisibleForTesting
CompletionStage<DriverChannel> connect(
EndPoint endPoint, DriverChannelOptions options, NodeMetricUpdater nodeMetricUpdater) {
EndPoint endPoint,
NodeShardingInfo shardingInfo,
Integer shardId,
DriverChannelOptions options,
NodeMetricUpdater nodeMetricUpdater) {
CompletableFuture<DriverChannel> resultFuture = new CompletableFuture<>();

ProtocolVersion currentVersion;
Expand All @@ -172,13 +190,31 @@ CompletionStage<DriverChannel> connect(
isNegotiating = true;
}

PortIterator portIterator = null;

if (shardId != null && shardingInfo != null) {
int lowestPort =
context
.getConfig()
.getDefaultProfile()
.getInt(DefaultDriverOption.ADVANCED_SHARD_AWARENESS_PORT_LOW);
int highestPort =
context
.getConfig()
.getDefaultProfile()
.getInt(DefaultDriverOption.ADVANCED_SHARD_AWARENESS_PORT_HIGH);
portIterator =
new PortIterator(lowestPort, highestPort, shardingInfo.getShardsCount(), shardId);
}

connect(
endPoint,
options,
nodeMetricUpdater,
currentVersion,
isNegotiating,
attemptedVersions,
portIterator,
resultFuture);
return resultFuture;
}
Expand All @@ -190,6 +226,7 @@ private void connect(
ProtocolVersion currentVersion,
boolean isNegotiating,
List<ProtocolVersion> attemptedVersions,
PortIterator portIterator,
CompletableFuture<DriverChannel> resultFuture) {

NettyOptions nettyOptions = context.getNettyOptions();
Expand All @@ -204,7 +241,16 @@ private void connect(

nettyOptions.afterBootstrapInitialized(bootstrap);

ChannelFuture connectFuture = bootstrap.connect(endPoint.resolve());
ChannelFuture connectFuture;
if (portIterator == null) {
connectFuture = bootstrap.connect(endPoint.resolve());
} else {
if (!portIterator.hasNext()) {
portIterator.reset();
}
connectFuture =
bootstrap.connect(endPoint.resolve(), new InetSocketAddress(portIterator.next()));
}

connectFuture.addListener(
cf -> {
Expand Down Expand Up @@ -258,12 +304,23 @@ private void connect(
downgraded.get(),
true,
attemptedVersions,
portIterator,
resultFuture);
} else {
resultFuture.completeExceptionally(
UnsupportedProtocolVersionException.forNegotiation(
endPoint, attemptedVersions));
}
} else if (isBindException(error)) {
connect(
endPoint,
options,
nodeMetricUpdater,
currentVersion,
true,
attemptedVersions,
portIterator,
resultFuture);
} else {
// Note: might be completed already if the failure happened in initializer(), this is
// fine
Expand All @@ -273,6 +330,16 @@ private void connect(
});
}

private static boolean isBindException(Throwable error) {
while (error != null) {
if (error instanceof java.net.BindException) {
return true;
}
error = error.getCause();
}
return false;
}

@VisibleForTesting
ChannelInitializer<Channel> initializer(
EndPoint endPoint,
Expand All @@ -282,7 +349,7 @@ ChannelInitializer<Channel> initializer(
CompletableFuture<DriverChannel> resultFuture) {
return new ChannelFactoryInitializer(
endPoint, protocolVersion, options, nodeMetricUpdater, resultFuture);
};
}

class ChannelFactoryInitializer extends ChannelInitializer<Channel> {

Expand Down Expand Up @@ -391,4 +458,36 @@ protected void initChannel(Channel channel) {
}
}
}

static class PortIterator implements Iterator<Integer> {
private final int highestPort;
private final int startPort;
private int currentPort;
private final int shardCount;

PortIterator(int lowestPort, int highestPort, int shardCount, int shardId) {
this.highestPort = highestPort;
this.currentPort = lowestPort + (shardCount - lowestPort % shardCount) + shardId;
this.startPort = currentPort;
this.shardCount = shardCount;
}

@Override
public Integer next() {
try {
return currentPort;
} finally {
this.currentPort += shardCount;
}
}

@Override
public boolean hasNext() {
return currentPort <= highestPort;
}

public void reset() {
this.currentPort = startPort;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
Expand Down Expand Up @@ -489,9 +490,23 @@ private CompletionStage<Boolean> addMissingChannels() {
channels.length * wantedCount - Arrays.stream(channels).mapToInt(ChannelSet::size).sum();
LOG.debug("[{}] Trying to create {} missing channels", logPrefix, missing);
DriverChannelOptions options = buildDriverOptions();
for (int i = 0; i < missing; i++) {
CompletionStage<DriverChannel> channelFuture = channelFactory.connect(node, options);
pendingChannels.add(channelFuture);
for (int shard = 0; shard < channels.length; shard++) {
LOG.trace(
"[{}] Missing {} channels for shard {}",
logPrefix,
wantedCount - channels[shard].size(),
shard);
for (int p = channels[shard].size(); p < wantedCount; p++) {
CompletionStage<DriverChannel> channelFuture;
if (config
.getDefaultProfile()
.getBoolean(DefaultDriverOption.CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED)) {
channelFuture = channelFactory.connect(node, shard, options);
} else {
channelFuture = channelFactory.connect(node, options);
}
pendingChannels.add(channelFuture);
}
}
return CompletableFutures.allDone(pendingChannels)
.thenApplyAsync(this::onAllConnected, adminExecutor);
Expand Down Expand Up @@ -551,6 +566,23 @@ private boolean onAllConnected(@SuppressWarnings("unused") Void v) {
channel);
channel.forceClose();
} else {
if (config
.getDefaultProfile()
.getBoolean(DefaultDriverOption.CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED)
&& channel.localAddress() instanceof InetSocketAddress
&& channel.getShardingInfo() != null) {
int port = ((InetSocketAddress) channel.localAddress()).getPort();
int actualShard = channel.getShardId();
int targetShard = port % channel.getShardingInfo().getShardsCount();
if (actualShard != targetShard) {
LOG.warn(
"[{}] New channel {} connected to shard {}, but shard {} was requested. If this is not transient check your driver AND cluster configuration of shard aware port.",
logPrefix,
channel,
actualShard,
targetShard);
}
}
LOG.debug("[{}] New channel added {}", logPrefix, channel);
if (channels[channel.getShardId()].size() < wantedCount) {
addChannel(channel);
Expand Down
35 changes: 35 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,41 @@ datastax-java-driver {
# change.
# Overridable in a profile: no
warn-on-init-error = true


advanced-shard-awareness {
# Whether to use advanced shard awareness when trying to open new connections.
#
# Having this enabled makes sense only for ScyllaDB clusters.
# Results in smaller connection storms in multi-client settings.
# If set to false the driver will not attempt to use this feature.
# If set to true the driver will attempt to use it and will log warnings each time something
# makes it not possible.
# If the node for some reason does not report it's sharding info the driver
# will log a warning and create connection the same way as if this feature was disabled.
# If the cluster ignores the request for specific shard warning will also be logged,
# although the local port will be chosen according to advanced shard awareness rules.
#
# Required: yes
# Modifiable at runtime: yes, the new value will be used for connections created after the
# change.
# Overridable in a profile: no
enabled = true

# Inclusive lower bound of port range to use in advanced shard awareness
# The driver will attempt to reserve ports for connection only within the range.
# Required: yes
# Modifiable at runtime: yes, the new value will be used for calls after the
# change.
port-low = 10000

# Inclusive upper bound of port range to use in advanced shard awareness.
# The driver will attempt to reserve ports for connection only within the range.
# Required: yes
# Modifiable at runtime: yes, the new value will be used for calls after the
# change.
port-high = 60000
}
}

# Advanced options for the built-in load-balancing policies.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ public void should_report_available_ids() {
// When
CompletionStage<DriverChannel> channelFuture =
factory.connect(
SERVER_ADDRESS, DriverChannelOptions.builder().build(), NoopNodeMetricUpdater.INSTANCE);
SERVER_ADDRESS,
null,
null,
DriverChannelOptions.builder().build(),
NoopNodeMetricUpdater.INSTANCE);
completeSimpleChannelInit();

// Then
Expand Down
Loading
Loading