Skip to content

Commit

Permalink
Initial commit of streamzi event-registry project
Browse files Browse the repository at this point in the history
  • Loading branch information
hhiden committed Jun 12, 2018
1 parent f56bb2e commit 1a08dd7
Show file tree
Hide file tree
Showing 15 changed files with 933 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@

# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
/target/
69 changes: 69 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<name>event-registry</name>
<version>0.0.1-SNAPSHOT</version>
<groupId>io.streamzi</groupId>
<artifactId>core</artifactId>
<packaging>jar</packaging>

<properties>
<runSuite>io.streamzi.registry.tests.CoreTests</runSuite>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<test>CoreTestSuite</test>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.12.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.3.0</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.2.3</version>
</dependency>


<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
235 changes: 235 additions & 0 deletions src/main/java/io/streamzi/registry/RegistryConnection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
package io.streamzi.registry;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Logger;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
* This class maintains a connection to a Zookeper server
*
* @author hhiden
*/
public class RegistryConnection implements Closeable {
private static final Logger logger = Logger.getLogger(RegistryConnection.class.getName());

private String baseKey = "/strombrau";
private ExecutorService executor = Executors.newSingleThreadExecutor();
private CopyOnWriteArrayList<RegistryKeyListener> listeners = new CopyOnWriteArrayList<>();


@Override
public void close() throws IOException {
// No more operations
executor.shutdownNow();

// Shutdown the listners
for(RegistryKeyListener l : listeners){
l.close();
}

// CLose the client
client.close();
}

/**
* Perform an operation synchronously
*/
public void executeSync(RegistryOperation operation) throws RegistryException {
assertConnected();
operation.setConnection(this);
operation.performOperation();
}

/**
* Perform an operation on the registry
* @param operation to be performed
* @return a Future object that can be used late r
* @throws RegistryException if the registry connection has been closed
*/
public CompletableFuture execute(RegistryOperation operation) {
if(!executor.isShutdown()){
CompletableFuture<RegistryOperation> future = new CompletableFuture<>();
operation.setConnection(this);
executor.submit(()->{
try {
operation.performOperation();
future.complete(operation);
} catch (Exception e){
future.completeExceptionally(e);
}
});
return future;
} else {
CompletableFuture<RegistryOperation> future = new CompletableFuture<>();
executor.submit(()->{
future.completeExceptionally(new RegistryException("Registry not connected"));
});
return future;
}
}

/**
* Add a listener
*/
public RegistryKeyListener addKeyListener(String key, RegistryKeyListener listener) throws RegistryException {
assertConnected();
try {
// Add the key if needed
if(!keyExists(buildKey(key))){
client.create().creatingParentContainersIfNeeded().forPath(buildKey(key));
}

synchronized(listeners){
listener.setup(key, this);
listeners.add(listener);
}
return listener;
} catch (Exception e){
throw new RegistryException("Error adding RegistryKeyListener: " + e.getMessage(), e);
}
}

/**
* Remove a listener
*/
protected void removeKeyListener(RegistryKeyListener listener){
synchronized(listeners){
listeners.remove(listener);
}
}


/**
* URL pointing to the zookeeper instance that maintains all of the state data
*/
private String zookeeperUrl;

private CuratorFramework client;
private volatile boolean connected = false;

public String getBaseKey() {
return baseKey;
}

public void setBaseKey(String baseKey) {
this.baseKey = baseKey;
}

public RegistryConnection(String zookeeperUrl) {
this.zookeeperUrl = zookeeperUrl;
}

public void connect() {
try {
client = CuratorFrameworkFactory.newClient(zookeeperUrl, new ExponentialBackoffRetry(1000, 3));
client.start();
client.blockUntilConnected();

client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
logger.info("Connection state change: " + newState.toString());
}
});

connected = true;

} catch (Exception e) {
connected = false;
}
}

public CuratorFramework getClient() {
return client;
}

private void assertConnected() throws RegistryException {
if (!connected) {
throw new RegistryException("Registry not connected to Zookeper");
}
}

public void addChildForKey(String key, String child) throws RegistryException {
assertConnected();
try {
if(!keyExists(buildKey(key) + "/" + child)){
client.create().creatingParentContainersIfNeeded().forPath(buildKey(key) + "/" + child);
}
} catch (Exception e){
throw new RegistryException("Error adding child: " + child, e);
}
}

public void setStringForKey(String key, String value) throws RegistryException {
assertConnected();
try {
if(!keyExists(buildKey(key))){
client.create().creatingParentContainersIfNeeded().forPath(buildKey(key));
}
client.setData().forPath(buildKey(key), value.getBytes());
} catch (Exception e){
throw new RegistryException("Error setting data for key: " + key, e);
}
}

public List<String> getChildStringsForKey(String key) throws RegistryException {
assertConnected();
try {
List<String> children = client.getChildren().forPath(buildKey(key));
for(String c : children){
logger.info(c);
}
return children;
} catch (Exception e){
throw new RegistryException("Error listing children for key: " + key, e);
}
}

public String getStringForKey(String key) throws RegistryException {
assertConnected();
try {
byte[] data = client.getData().forPath(buildKey(key));
return new String(data);
} catch (Exception e) {
throw new RegistryException("Error getting data for: " + key, e);
}
}

public void removeKey(String key) throws RegistryException {
assertConnected();
try {
if(keyExists(buildKey(key))){
client.delete().forPath(buildKey(key));
}
} catch (Exception e){
throw new RegistryException("Error removing key: " + key, e);
}
}

private boolean keyExists(String key) throws RegistryException {
assertConnected();
try {
return(client.checkExists().forPath(key)!=null);
} catch (Exception e){
throw new RegistryException("Error checking if key exists: " + key, e);
}
}

protected String buildKey(String key){
return baseKey + "/" + key;
}

public boolean isConnected() {
return connected;
}
}
32 changes: 32 additions & 0 deletions src/main/java/io/streamzi/registry/RegistryException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.streamzi.registry;

/**
*
* @author hhiden
*/
public class RegistryException extends Exception {

/**
* Creates a new instance of <code>RegistryException</code> without detail message.
*/
public RegistryException() {
}

/**
* Constructs an instance of <code>RegistryException</code> with the specified detail message.
*
* @param msg the detail message.
*/
public RegistryException(String msg) {
super(msg);
}

/**
* Counstructs an exception with both a detail message and a cause
* @param msg detail message
* @param cause underlying cause
*/
public RegistryException(String msg, Throwable cause){
super(msg, cause);
}
}
Loading

0 comments on commit 1a08dd7

Please sign in to comment.