Skip to content
This repository has been archived by the owner on Apr 6, 2022. It is now read-only.

Feature/to cql #17

Open
wants to merge 18 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,22 @@
<version>2.5</version>
</dependency>

<!-- Hector -->
<dependency>
<groupId>org.hectorclient</groupId>
<artifactId>hector-core</artifactId>
<version>1.1-4</version>
<exclusions>
<exclusion>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
</exclusion>
</exclusions>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.22</version>
</dependency>

<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.9.3</version>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.3.0</version>
</dependency>

<!-- Monitoring -->
<dependency>
<groupId>com.appmetr</groupId>
Expand Down Expand Up @@ -136,9 +134,6 @@
</testResource>
</testResources>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
Expand Down Expand Up @@ -167,6 +162,14 @@
<artifactId>maven-release-plugin</artifactId>
<version>2.5</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/appmetr/hercules/FieldFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@

public interface FieldFilter {

public boolean accept(Field field);
boolean accept(Field field);
}
63 changes: 35 additions & 28 deletions src/main/java/com/appmetr/hercules/Hercules.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,17 @@
import com.appmetr.hercules.mutations.ExecutableMutation;
import com.appmetr.hercules.mutations.MutationsQueue;
import com.appmetr.hercules.partition.PartitioningStarter;
import com.datastax.driver.core.AbstractTableMetadata;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.TypeCodec;
import com.google.inject.Inject;
import com.google.inject.Injector;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
import me.prettyprint.hector.api.ddl.ComparatorType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.stream.Collectors;

public class Hercules {

Expand All @@ -54,14 +55,14 @@ public class Hercules {

@Inject HerculesConfig config;

private Map<Class, EntityMetadata> entityClassMetadataCache = new HashMap<Class, EntityMetadata>();
private Map<Class, WideEntityMetadata> wideEntityClassMetadataCache = new HashMap<Class, WideEntityMetadata>();
private Map<Class, EntityMetadata> entityClassMetadataCache = new HashMap<>();
private Map<Class, WideEntityMetadata> wideEntityClassMetadataCache = new HashMap<>();

@Inject EntityMetadataExtractor metadataExtractor;
@Inject WideEntityMetadataExtractor wideMetadataExtractor;

private Cluster cluster;
private Keyspace keyspace;
private String keyspace;

@Inject private Injector injector;
@Inject private DataDriver dataDriver;
Expand All @@ -73,9 +74,15 @@ public class Hercules {


public void init() {
cluster = dataDriver.getOrCreateCluster(config.getClusterName(), config.getCassandraHost(),
config.getMaxActiveConnections(), config.getMaxConnectTimeMillis(),
config.getCassandraThriftSocketTimeout(), config.getMaxWaitTimeWhenExhausted());
cluster = dataDriver.getOrCreateCluster(
config.getClusterName(),
config.getCassandraHost(),
config.getMaxActiveConnections(),
config.getMaxConnectTimeMillis()
);
initCodecs();

cluster.connect();
keyspace = dataDriver.getOrCreateKeyspace(config.getKeyspaceName(), config.getReplicationFactor(), cluster);

initEntities();
Expand All @@ -84,6 +91,12 @@ public void init() {
new Thread(partitioningStarter).start();
}

private void initCodecs() {
for (TypeCodec typeCodec : config.getCodecs()) {
cluster.getConfiguration().getCodecRegistry().register(typeCodec);
}
}

public void shutdown() {
partitioningStarter.stop();
mutationsQueue.stop();
Expand All @@ -92,22 +105,15 @@ public void shutdown() {
}

public Set<String> getColumnFamilies() {
List<ColumnFamilyDefinition> columnFamilies = cluster.describeKeyspace(getKeyspaceName()).getCfDefs();
Set<String> columnFamiliesNames = new HashSet<String>();

for (ColumnFamilyDefinition cf : columnFamilies) {
columnFamiliesNames.add(cf.getName());
}

return columnFamiliesNames;
Collection<TableMetadata> tables = cluster.getMetadata().getKeyspace(getKeyspaceName()).getTables();
return tables
.stream()
.map(AbstractTableMetadata::getName)
.collect(Collectors.toSet());
}

public boolean checkAndCreateColumnFamily(String cfName) {
return checkAndCreateColumnFamily(cfName, ComparatorType.UTF8TYPE);
}

public boolean checkAndCreateColumnFamily(String cfName, ComparatorType comparator) {
return dataDriver.checkAndCreateColumnFamily(cluster, config.getKeyspaceName(), cfName, comparator);
return dataDriver.checkAndCreateColumnFamily(cluster, config.getKeyspaceName(), cfName);
}

public boolean deleteColumnFamily(String cfName) {
Expand All @@ -130,7 +136,7 @@ private void initPlainEntities() {
EntityMetadata metadata = metadataExtractor.extract(entityClass);
entityClassMetadataCache.put(entityClass, metadata);

checkAndCreateColumnFamily(metadata.getColumnFamily(), metadata.getComparatorType());
checkAndCreateColumnFamily(metadata.getColumnFamily());

}
//We should have extracted metadata before create indexes
Expand All @@ -148,7 +154,7 @@ private void initiateWideEntities() {
WideEntityMetadata metadata = wideMetadataExtractor.extract(wideEntityClass);
wideEntityClassMetadataCache.put(wideEntityClass, metadata);

checkAndCreateColumnFamily(metadata.getColumnFamily(), metadata.getComparatorType());
checkAndCreateColumnFamily(metadata.getColumnFamily());
}
}

Expand All @@ -175,7 +181,7 @@ public WideEntityMetadata getWideMetadata(Class wideEntityClass) {
public Set<ExecutableMutation> getPartitionMutations() {
final Set<String> columnFamilies = getColumnFamilies();

Set<ExecutableMutation> mutations = new HashSet<ExecutableMutation>();
Set<ExecutableMutation> mutations = new HashSet<>();

for (final Class partitionEntityClass : config.getWideEntityClasses()) {
if (!partitionEntityClass.isAnnotationPresent(Partitioned.class)) {
Expand All @@ -188,7 +194,7 @@ public Set<ExecutableMutation> getPartitionMutations() {
if (!columnFamilies.contains(cfFullName)) {
mutations.add(new ExecutableMutation(ExecutableMutation.MutationType.CREATE, cfFullName) {
@Override public void execute() throws Exception {
checkAndCreateColumnFamily(getCfName(), getWideMetadata(partitionEntityClass).getComparatorType());
checkAndCreateColumnFamily(getCfName());
columnFamilies.add(getCfName());
logger.info("Created partition: " + getCfName());
}
Expand All @@ -208,7 +214,7 @@ public Set<ExecutableMutation> getPartitionMutations() {

public Cluster getCluster() { return cluster; }

public Keyspace getKeyspace() { return keyspace; }
public String getKeyspace() { return keyspace; }

public String getKeyspaceName() { return config.getKeyspaceName(); }

Expand All @@ -217,6 +223,7 @@ public Set<ExecutableMutation> getPartitionMutations() {
public int getReplicationFactor() { return config.getReplicationFactor(); }

public boolean isSchemaModificationEnabled() { return config.isSchemaModificationEnabled(); }
public boolean isSchemaModificationDisabled() { return !isSchemaModificationEnabled(); }

public Injector getInjector() { return injector; }

Expand Down
25 changes: 18 additions & 7 deletions src/main/java/com/appmetr/hercules/HerculesConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import com.appmetr.hercules.annotations.Entity;
import com.appmetr.hercules.annotations.WideEntity;
import com.datastax.driver.core.TypeCodec;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

Expand All @@ -14,13 +16,14 @@ public class HerculesConfig {
private int maxActiveConnections;
private int replicationFactor;
private Boolean schemaModificationEnabled;
private long maxConnectTimeMillis = -1;
private int maxConnectTimeMillis = 10_000;
private int cassandraThriftSocketTimeout;
private long maxWaitTimeWhenExhausted;

/* Fields */
private Set<Class> entityClasses;
private Set<Class> wideEntityClasses;
private Set<Class> entityClasses = Collections.emptySet();
private Set<Class> wideEntityClasses = Collections.emptySet();
private Set<TypeCodec> codecs = Collections.emptySet();

public HerculesConfig() {
}
Expand All @@ -33,8 +36,8 @@ public HerculesConfig(String clusterName, String keyspaceName, String cassandraH
this.replicationFactor = replicationFactor;
this.schemaModificationEnabled = schemaModificationEnabled;

this.entityClasses = new HashSet<Class>();
this.wideEntityClasses = new HashSet<Class>();
this.entityClasses = new HashSet<>();
this.wideEntityClasses = new HashSet<>();

for (Class entityClass : entityClasses) {
if (entityClass.isAnnotationPresent(Entity.class)) {
Expand Down Expand Up @@ -93,6 +96,14 @@ public Set<Class> getEntityClasses() {
return entityClasses;
}

public Set<TypeCodec> getCodecs() {
return codecs;
}

public void setCodecs(Set<TypeCodec> codecs) {
this.codecs = codecs;
}

public void setEntityClasses(Set<Class> entityClasses) {
this.entityClasses = entityClasses;
}
Expand All @@ -105,11 +116,11 @@ public void setWideEntityClasses(Set<Class> wideEntityClasses) {
this.wideEntityClasses = wideEntityClasses;
}

public long getMaxConnectTimeMillis() {
public int getMaxConnectTimeMillis() {
return maxConnectTimeMillis;
}

public void setMaxConnectTimeMillis(long maxConnectTimeMillis) {
public void setMaxConnectTimeMillis(int maxConnectTimeMillis) {
this.maxConnectTimeMillis = maxConnectTimeMillis;
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/appmetr/hercules/HerculesFactory.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.appmetr.hercules;

import com.appmetr.hercules.driver.CqlDataDriver;
import com.appmetr.hercules.driver.DataDriver;
import com.appmetr.hercules.driver.ThriftDataDriver;
import com.appmetr.hercules.manager.EntityManager;
import com.appmetr.hercules.manager.IndexManager;
import com.appmetr.hercules.manager.WideEntityManager;
Expand All @@ -18,7 +18,7 @@ public static Hercules create(final HerculesConfig config) {
@Override protected void configure() {
bind(HerculesConfig.class).toInstance(config);

bind(DataDriver.class).to(ThriftDataDriver.class).in(Scopes.SINGLETON);
bind(DataDriver.class).to(CqlDataDriver.class).in(Scopes.SINGLETON);
bind(EntityManager.class).in(Scopes.SINGLETON);
bind(WideEntityManager.class).in(Scopes.SINGLETON);
bind(IndexManager.class).in(Scopes.SINGLETON);
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/com/appmetr/hercules/HerculesMonitoringGroup.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.appmetr.hercules;

public interface HerculesMonitoringGroup {
public static final String HERCULES_EM = "hercules.em";
public static final String HERCULES_WM = "hercules.wm";
public static final String HERCULES_DD = "hercules.dd";
String HERCULES_EM = "hercules.em";
String HERCULES_WM = "hercules.wm";
String HERCULES_DD = "hercules.dd";

public static final String EM_SIMPLE = "em.simple";
public static final String EM_WIDE = "em.wide";
public static final String EM_PARTITION = "em.partition";
String EM_SIMPLE = "em.simple";
String EM_WIDE = "em.wide";
String EM_PARTITION = "em.partition";
}
6 changes: 3 additions & 3 deletions src/main/java/com/appmetr/hercules/annotations/Id.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.appmetr.hercules.annotations;

import com.appmetr.hercules.serializers.AbstractHerculesSerializer;
import com.datastax.driver.core.TypeCodec;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
Expand All @@ -10,8 +10,8 @@
@Target({ElementType.METHOD, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Id {
Class<? extends AbstractHerculesSerializer> value() default AbstractHerculesSerializer.class;
Class<? extends TypeCodec> value() default TypeCodec.class;

Class keyClass() default Object.class;
Class<? extends AbstractHerculesSerializer> serializer() default AbstractHerculesSerializer.class;
Class<? extends TypeCodec> serializer() default TypeCodec.class;
}
4 changes: 2 additions & 2 deletions src/main/java/com/appmetr/hercules/annotations/Index.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.appmetr.hercules.annotations;

import com.appmetr.hercules.keys.ForeignKey;
import com.appmetr.hercules.serializers.AbstractHerculesSerializer;
import com.datastax.driver.core.TypeCodec;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
Expand All @@ -13,5 +13,5 @@
public @interface Index {
Class<? extends ForeignKey> keyClass();

Class<? extends AbstractHerculesSerializer> serializer() default AbstractHerculesSerializer.class;
Class<? extends TypeCodec> serializer() default TypeCodec.class;
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.appmetr.hercules.annotations;

import com.appmetr.hercules.keys.CollectionKeysExtractor;
import com.appmetr.hercules.serializers.AbstractHerculesSerializer;
import com.datastax.driver.core.TypeCodec;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
Expand All @@ -16,7 +16,7 @@

Class itemClass() default Object.class;

Class<? extends AbstractHerculesSerializer> serializer() default AbstractHerculesSerializer.class;
Class<? extends TypeCodec> serializer() default TypeCodec.class;

Class<? extends CollectionKeysExtractor> keyExtractorClass() default CollectionKeysExtractor.class;

Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/appmetr/hercules/annotations/RowKey.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.appmetr.hercules.annotations;

import com.appmetr.hercules.serializers.AbstractHerculesSerializer;
import com.datastax.driver.core.TypeCodec;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
Expand All @@ -10,8 +10,8 @@
@Target({ElementType.TYPE, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RowKey {
Class<? extends AbstractHerculesSerializer> value() default AbstractHerculesSerializer.class;
Class<? extends TypeCodec> value() default TypeCodec.class;

Class keyClass() default Object.class;
Class<? extends AbstractHerculesSerializer> serializer() default AbstractHerculesSerializer.class;
Class<? extends TypeCodec> serializer() default TypeCodec.class;
}
Loading