Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

local_dc from first connection and default keyspace #26

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ The main configuration is via the normal config.json file for a Vert.x module.
{
"cassandra": {
"seeds": [<seeds>],

"KEYSPACE": <string>,
"query": {
"consistency_level": <string>,
"serial_consistency_level": <string>,
Expand Down Expand Up @@ -77,11 +77,12 @@ The main configuration is via the normal config.json file for a Vert.x module.

* `seeds` - an array of string seed IP or host names. At least one seed must be provided.
* `lb_policy_name` - (optional) the load balancing policy name. The following values are accepted:
* "DCAwareRoundRobinPolicy" - requires string field `local_dc` and optional numeric field `used_hosts_per_remote_dc`
* "DCAwareRoundRobinPolicy" - optional string field `local_dc` and optional numeric field `used_hosts_per_remote_dc`
* Any FQCN such of a class that implements `LoadBalancingPolicy`
* `reconnect_policy_name` - (optional) the reconnect policy name. The following values are accepted:
* "constant"|"ConstantReconnectionPolicy" - creates a `ConstantReconnectionPolicy` policy. Expects additional numeric field `delay` in ms.
* "exponential"|"ExponentialReconnectionPolicy" - creates an `ExponentialReconnectionPolicy` policy. Expects additional numeric fields `base_delay` and `max_delay` in ms.
* 'KEYSPACE' - the default keyspace name, optional.

Refer to the [Cassandra Java driver documentation](http://www.datastax.com/documentation/developer/java-driver/2.0/index.html) for a description of the remaining configuration options.

Expand Down
20 changes: 16 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
<!--Dependency versions-->
<cassandra.driver.version>3.3.2</cassandra.driver.version>
<cassandra.version>3.11.1</cassandra.version>
<vertx.version>3.5.0</vertx.version>
<vertx.version>3.6.0</vertx.version>
<vertx.hk2.version>2.4.0</vertx.hk2.version>
<vertx.guice.version>2.3.0</vertx.guice.version>
<vertx.guice.version>2.3.1</vertx.guice.version>
<vertx.when.version>4.2.0</vertx.when.version>
<vertx.curator.version>3.1.0</vertx.curator.version>
<netty.version>4.1.15.Final</netty.version>
<netty.version>4.1.19.Final</netty.version>

<!--Test properties-->
<test.cassandra.seeds>"localhost"</test.cassandra.seeds>
Expand Down Expand Up @@ -145,6 +145,18 @@
</systemPropertyVariables>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
Expand Down Expand Up @@ -174,5 +186,5 @@
<tag>HEAD</tag>
<url>https://github.com/ef-labs/${project.artifactId}</url>
</scm>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,6 @@ public interface CassandraConfigurator {
* @param callback
*/
void onReady(Handler<AsyncResult<Void>> callback);

String getKeyspace();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.englishtown.vertx.cassandra.CassandraConfigurator;
import com.englishtown.vertx.cassandra.CassandraSession;
import com.englishtown.vertx.cassandra.FutureUtils;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import io.vertx.core.AsyncResult;
Expand Down Expand Up @@ -104,6 +105,9 @@ protected void init(CassandraConfigurator configurator) {
if (!configurator.getMetricsOptions().isJMXReportingEnabled()) {
clusterBuilder.withoutJMXReporting();
}
if (!configurator.getMetricsOptions().isEnabled()){
clusterBuilder.withoutMetrics();
}
}

if (configurator.getAuthProvider() != null) {
Expand Down Expand Up @@ -176,7 +180,8 @@ public void reconnect() {
public void reconnectAsync(Handler<AsyncResult<Void>> callback) {
logger.debug("Call to reconnect the session has been made");
Session oldSession = session;
FutureUtils.addCallback(cluster.connectAsync(), new FutureCallback<Session>() {
FutureUtils.addCallback(Strings.isNullOrEmpty(this.getConfigurator().getKeyspace()) ? cluster.connectAsync()
: cluster.connectAsync(this.getConfigurator().getKeyspace()), new FutureCallback<Session>() {
@Override
public void onSuccess(Session session) {
DefaultCassandraSession.this.session = session;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class EnvironmentCassandraConfigurator extends JsonCassandraConfigurator
public static final String ENV_VAR_LOCAL_DC = "CASSANDRA_LOCAL_DC";
public static final String ENV_VAR_USERNAME = "CASSANDRA_USERNAME";
public static final String ENV_VAR_PASSWORD = "CASSANDRA_PASSWORD";
public static final String ENV_KEYSPACE = "KEYSPACE";

public static final Logger logger = LoggerFactory.getLogger(EnvironmentCassandraConfigurator.class);
private final EnvVarDelegate envVarDelegate;
Expand All @@ -42,6 +43,7 @@ private void init() {
initSeeds();
initLoadBalancingPolicy();
initAuthProvider();
initKeyspace();
}

private void initSeeds() {
Expand Down Expand Up @@ -88,4 +90,11 @@ public String get(String name) {
}
}

private void initKeyspace(){
String keyspace = envVarDelegate.get(ENV_KEYSPACE);
if (!Strings.isNullOrEmpty(keyspace)){
this.keyspace = keyspace;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import javafx.util.Builder;

import javax.inject.Inject;
import java.util.ArrayList;
Expand All @@ -31,6 +32,8 @@ public class JsonCassandraConfigurator implements CassandraConfigurator {
protected MetricsOptions metricsOptions;
protected AuthProvider authProvider;

protected String keyspace;

protected final List<String> DEFAULT_SEEDS = ImmutableList.of("127.0.0.1");

public static final String CONFIG_CASSANDRA = "cassandra";
Expand Down Expand Up @@ -61,6 +64,8 @@ public class JsonCassandraConfigurator implements CassandraConfigurator {
public static final String CONSISTENCY_SERIAL = "SERIAL";
public static final String CONSISTENCY_LOCAL_SERIAL = "LOCAL_SERIAL";

public static final String KEYSPACE = "KEYSPACE";

@Inject
public JsonCassandraConfigurator(Vertx vertx) {
this(vertx.getOrCreateContext().config().getJsonObject(CONFIG_CASSANDRA, new JsonObject()));
Expand Down Expand Up @@ -135,6 +140,7 @@ protected void init(JsonObject config) {
initQueryOptions(config.getJsonObject(CONFIG_QUERY, config));
initMetricsOptions(config.getJsonObject(CONFIG_METRICS));
initAuthProvider(config.getJsonObject(CONFIG_AUTH));
initKeyspace(config.getString(KEYSPACE));

}

Expand All @@ -152,6 +158,12 @@ protected void initSeeds(JsonArray seeds) {
}
}

protected void initKeyspace(String keyspace){
if (!Strings.isNullOrEmpty(keyspace)){
this.keyspace = keyspace;
}
}

protected void initPort(JsonObject config) {
Integer i = config.getInteger(CONFIG_PORT);

Expand Down Expand Up @@ -189,14 +201,16 @@ protected void initLoadBalancingPolicy(JsonObject loadBalancing) {
String localDc = loadBalancing.getString("local_dc");
int usedHostsPerRemoteDc = loadBalancing.getInteger("used_hosts_per_remote_dc", 0);

if (localDc == null || localDc.isEmpty()) {
throw new IllegalArgumentException("A DCAwareRoundRobinPolicy requires a local_dc in configuration.");
}
// if (localDc == null || localDc.isEmpty()) {
// throw new IllegalArgumentException("A DCAwareRoundRobinPolicy requires a local_dc in configuration.");
// }

loadBalancingPolicy = DCAwareRoundRobinPolicy.builder()
.withLocalDc(localDc)
.withUsedHostsPerRemoteDc(usedHostsPerRemoteDc)
.build();
DCAwareRoundRobinPolicy.Builder builder= DCAwareRoundRobinPolicy.builder()
.withUsedHostsPerRemoteDc(usedHostsPerRemoteDc);
if (!Strings.isNullOrEmpty(localDc)){
builder.withLocalDc(localDc);
}
loadBalancingPolicy = builder.build();

} else {

Expand Down Expand Up @@ -238,7 +252,7 @@ protected void initReconnectionPolicy(JsonObject reconnection) {
throw new IllegalArgumentException("ConstantReconnectionPolicy requires a delay in configuration");
}

reconnectionPolicy = new ConstantReconnectionPolicy(delay.intValue());
reconnectionPolicy = new ConstantReconnectionPolicy(delay);

} else if ("ExponentialReconnectionPolicy".equalsIgnoreCase(name) || "exponential".equalsIgnoreCase(name)) {
Long baseDelay = reconnection.getLong("base_delay");
Expand All @@ -248,7 +262,7 @@ protected void initReconnectionPolicy(JsonObject reconnection) {
throw new IllegalArgumentException("ExponentialReconnectionPolicy requires base_delay and max_delay in configuration");
}

reconnectionPolicy = new ExponentialReconnectionPolicy(baseDelay.longValue(), maxDelay.longValue());
reconnectionPolicy = new ExponentialReconnectionPolicy(baseDelay, maxDelay);

} else {
Class<?> clazz;
Expand Down Expand Up @@ -471,4 +485,7 @@ protected void initAuthProvider(JsonObject auth) {

}

public String getKeyspace() {
return keyspace;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.englishtown.vertx.cassandra.impl.EnvironmentCassandraConfigurator;
import com.englishtown.vertx.curator.CuratorClient;
import com.englishtown.vertx.curator.promises.WhenConfiguratorHelper;
import com.google.common.base.Strings;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
Expand Down Expand Up @@ -135,6 +136,18 @@ private void initZooKeeper() {
}));
}

if (Strings.isNullOrEmpty(keyspace)){
promises.add(helper.getConfigElement(ZKPaths.makePath(getPathPrefix(), "KEYSPACE")).then(
value -> {
String keyspace = value.asString();
if (!Strings.isNullOrEmpty(keyspace)){
initKeyspace(keyspace);
}
return null;
}
));
}

when.all(promises)
.then(aVoid -> {
runOnReadyCallbacks(Future.succeededFuture(null));
Expand Down Expand Up @@ -164,5 +177,4 @@ public void onReady(Handler<AsyncResult<Void>> callback) {
protected String getPathPrefix() {
return pathPrefix;
}

}