diff --git a/src/acceptance-test/java/org/zalando/nakadi/repository/db/EventTypeCacheTestAT.java b/src/acceptance-test/java/org/zalando/nakadi/repository/db/EventTypeCacheTestAT.java index 1253908dbf..bf7be36bca 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/repository/db/EventTypeCacheTestAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/repository/db/EventTypeCacheTestAT.java @@ -15,7 +15,7 @@ import org.mockito.Mockito; import org.zalando.nakadi.config.RepositoriesConfig; import org.zalando.nakadi.domain.EventType; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.repository.EventTypeRepository; import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; diff --git a/src/acceptance-test/java/org/zalando/nakadi/repository/db/StorageDbRepositoryTest.java b/src/acceptance-test/java/org/zalando/nakadi/repository/db/StorageDbRepositoryTest.java index e5fabc6eae..08737d196f 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/repository/db/StorageDbRepositoryTest.java +++ b/src/acceptance-test/java/org/zalando/nakadi/repository/db/StorageDbRepositoryTest.java @@ -3,8 +3,10 @@ import org.junit.Before; import org.junit.Test; import org.zalando.nakadi.domain.EventType; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.KafkaConfiguration; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; +import org.zalando.nakadi.domain.storage.ZookeeperConnection; import org.zalando.nakadi.exceptions.runtime.NoSuchStorageException; import org.zalando.nakadi.exceptions.runtime.StorageIsUsedException; import org.zalando.nakadi.utils.TestUtils; @@ -35,13 +37,8 @@ public void setUp() throws Exception { eventTypeDbRepository = new EventTypeDbRepository(template, TestUtils.OBJECT_MAPPER); } - static Storage createStorage(final String name, - final String exhibitorAddress, - final int exhibitorPort, - final String zkAddress, - final String zkPath) { - final Storage.KafkaConfiguration config = - new Storage.KafkaConfiguration(exhibitorAddress, exhibitorPort, zkAddress, zkPath); + static Storage createStorage(final String name, final ZookeeperConnection zkConnection) { + final KafkaConfiguration config = new KafkaConfiguration(zkConnection); final Storage storage = new Storage(); storage.setId(name); storage.setType(Storage.Type.KAFKA); @@ -50,9 +47,9 @@ static Storage createStorage(final String name, } @Test - public void testStorageCreated() throws Exception { + public void testStorageCreated() { final String name = randomUUID(); - final Storage storage = createStorage(name, "exaddress", 8181, "address", "path"); + final Storage storage = createStorage(name, ZookeeperConnection.valueOf("exhibitor://exaddress:8181/path")); repository.createStorage(storage); @@ -64,15 +61,15 @@ public void testStorageCreated() throws Exception { } @Test - public void testStorageOrdered() throws Exception { + public void testStorageOrdered() { final String namePrefix = randomValidStringOfLength(31); final Storage storage2 = repository.createStorage( - createStorage(namePrefix + "2", "exaddress", 8181, "address1", "path3")); + createStorage(namePrefix + "2", ZookeeperConnection.valueOf("exhibitor://exaddress1:8181/path3"))); final Storage storage1 = repository.createStorage( - createStorage(namePrefix + "1", "exaddress", 8181, "address2", "path2")); + createStorage(namePrefix + "1", ZookeeperConnection.valueOf("exhibitor://exaddress2:8181/path2"))); final Storage storage3 = repository.createStorage( - createStorage(namePrefix + "3", "exaddress", 8181, "address3", "path1")); + createStorage(namePrefix + "3", ZookeeperConnection.valueOf("exhibitor://exaddress3:8181/path1"))); final List storages = repository.listStorages().stream() .filter(st -> st.getId() != null) @@ -86,23 +83,25 @@ public void testStorageOrdered() throws Exception { } @Test - public void testStorageDeleted() throws Exception { + public void testStorageDeleted() { final String name = randomUUID(); - final Storage storage = repository.createStorage(createStorage(name, "exaddress", 8181, "address2", "path2")); + final Storage storage = repository.createStorage( + createStorage(name, ZookeeperConnection.valueOf("exhibitor://exaddress:8181/path"))); assertEquals(storage, repository.getStorage(storage.getId()).get()); repository.deleteStorage(storage.getId()); assertFalse(repository.getStorage(storage.getId()).isPresent()); } @Test(expected = NoSuchStorageException.class) - public void testDeleteNoneExistingStorage() throws Exception { + public void testDeleteNoneExistingStorage() { repository.deleteStorage(randomUUID()); } @Test(expected = StorageIsUsedException.class) - public void testDeleteUsedStorage() throws Exception { + public void testDeleteUsedStorage() { final String name = randomUUID(); - final Storage storage = repository.createStorage(createStorage(name, "exaddress", 8181, "address", "path")); + final Storage storage = repository.createStorage( + createStorage(name, ZookeeperConnection.valueOf("exhibitor://exaddress:8181/path"))); final EventType eventType = eventTypeDbRepository.saveEventType(TestUtils.buildDefaultEventType()); final Timeline timeline = TimelineDbRepositoryTest.createTimeline(storage, UUID.randomUUID(), 0, "topic", diff --git a/src/acceptance-test/java/org/zalando/nakadi/repository/db/TimelineDbRepositoryTest.java b/src/acceptance-test/java/org/zalando/nakadi/repository/db/TimelineDbRepositoryTest.java index 91817c42ed..855c4810a9 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/repository/db/TimelineDbRepositoryTest.java +++ b/src/acceptance-test/java/org/zalando/nakadi/repository/db/TimelineDbRepositoryTest.java @@ -5,8 +5,9 @@ import org.junit.Before; import org.junit.Test; import org.zalando.nakadi.domain.EventType; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; +import org.zalando.nakadi.domain.storage.ZookeeperConnection; import org.zalando.nakadi.exceptions.runtime.DuplicatedTimelineException; import org.zalando.nakadi.utils.TestUtils; @@ -32,8 +33,8 @@ public void setUp() throws Exception { final StorageDbRepository sRepository = new StorageDbRepository(template, TestUtils.OBJECT_MAPPER); final EventTypeDbRepository eRepository = new EventTypeDbRepository(template, TestUtils.OBJECT_MAPPER); - storage = sRepository.createStorage( - StorageDbRepositoryTest.createStorage(randomUUID(), "localhost", 8181, "test", "path")); + storage = sRepository.createStorage(StorageDbRepositoryTest.createStorage( + randomUUID(), ZookeeperConnection.valueOf("zookeeper://localhost:8181/test"))); testEt = eRepository.saveEventType(TestUtils.buildDefaultEventType()); } diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/BaseAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/BaseAT.java index 0c84ad6a7b..af3f1c865a 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/BaseAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/BaseAT.java @@ -8,7 +8,9 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.datasource.DriverManagerDataSource; import org.zalando.nakadi.config.JsonConfig; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.KafkaConfiguration; +import org.zalando.nakadi.domain.storage.Storage; +import org.zalando.nakadi.domain.storage.ZookeeperConnection; import org.zalando.nakadi.exceptions.runtime.DuplicatedStorageException; import org.zalando.nakadi.repository.db.StorageDbRepository; import org.zalando.nakadi.repository.db.TimelineDbRepository; @@ -25,6 +27,8 @@ public abstract class BaseAT { public static final String URL = "http://localhost:" + PORT; protected static final String ZOOKEEPER_URL = "localhost:2181"; + protected static final ZookeeperConnection ZOOKEEPER_CONNECTION = + ZookeeperConnection.valueOf("zookeeper://" + ZOOKEEPER_URL); protected static final String KAFKA_URL = "localhost:29092"; private static final JdbcTemplate JDBC_TEMPLATE = new JdbcTemplate( @@ -47,7 +51,7 @@ public static void createDefaultStorage() { final Storage storage = new Storage(); storage.setId("default"); storage.setType(Storage.Type.KAFKA); - storage.setConfiguration(new Storage.KafkaConfiguration(null, null, ZOOKEEPER_URL, "")); + storage.setConfiguration(new KafkaConfiguration(ZOOKEEPER_CONNECTION)); try { STORAGE_DB_REPOSITORY.createStorage(storage); } catch (final DuplicatedStorageException ignore) { diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/StorageControllerAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/StorageControllerAT.java index bd5ddb8447..e363abf845 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/StorageControllerAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/StorageControllerAT.java @@ -13,9 +13,14 @@ public class StorageControllerAT extends BaseAT { @Test public void shouldChangeDefaultStorageWhenRequested() throws Exception { given() - .body("{\"id\": \"default-test\",\"kafka_configuration\": {\"exhibitor_address\": null," + - "\"exhibitor_port\": 0,\"zk_address\": \"zookeeper:2181\",\"zk_path\": \"\"}," + - "\"storage_type\": \"kafka\"}") + .body("{" + + "\"id\": \"default-test\"," + + "\"kafka_configuration\": {" + + "\"zookeeper_connection\":{" + + "\"type\": \"zookeeper\"," + + "\"addresses\":[{\"address\":\"zookeeper\", \"port\":2181}]" + + "}" + + "},\"storage_type\": \"kafka\"}") .contentType(JSON).post("/storages"); NakadiTestUtils.createEventTypeInNakadi(EventTypeTestBuilder.builder().name("event_a").build()); diff --git a/src/main/java/org/zalando/nakadi/config/NakadiConfig.java b/src/main/java/org/zalando/nakadi/config/NakadiConfig.java index 89f828cb8f..dd28643b82 100644 --- a/src/main/java/org/zalando/nakadi/config/NakadiConfig.java +++ b/src/main/java/org/zalando/nakadi/config/NakadiConfig.java @@ -12,8 +12,10 @@ import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.annotation.EnableScheduling; -import org.zalando.nakadi.domain.DefaultStorage; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.DefaultStorage; +import org.zalando.nakadi.domain.storage.KafkaConfiguration; +import org.zalando.nakadi.domain.storage.Storage; +import org.zalando.nakadi.domain.storage.ZookeeperConnection; import org.zalando.nakadi.exceptions.runtime.DuplicatedStorageException; import org.zalando.nakadi.exceptions.runtime.InternalNakadiException; import org.zalando.nakadi.repository.db.StorageDbRepository; @@ -46,11 +48,8 @@ public DefaultStorage defaultStorage(final StorageDbRepository storageDbReposito final Storage storage = new Storage(); storage.setId(storageId); storage.setType(Storage.Type.KAFKA); - storage.setConfiguration(new Storage.KafkaConfiguration( - environment.getProperty("nakadi.zookeeper.exhibitor.brokers"), - Integer.valueOf(environment.getProperty("nakadi.zookeeper.exhibitor.port", "0")), - environment.getProperty("nakadi.zookeeper.brokers"), - environment.getProperty("nakadi.zookeeper.kafkaNamespace", ""))); + storage.setConfiguration(new KafkaConfiguration( + ZookeeperConnection.valueOf(environment.getProperty("nakadi.zookeeper.connectionString")))); try { storageDbRepository.createStorage(storage); } catch (final DuplicatedStorageException e) { diff --git a/src/main/java/org/zalando/nakadi/controller/EventStreamController.java b/src/main/java/org/zalando/nakadi/controller/EventStreamController.java index ee9168a511..89a4333cba 100644 --- a/src/main/java/org/zalando/nakadi/controller/EventStreamController.java +++ b/src/main/java/org/zalando/nakadi/controller/EventStreamController.java @@ -23,7 +23,7 @@ import org.zalando.nakadi.domain.EventType; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.PartitionStatistics; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.runtime.AccessDeniedException; import org.zalando.nakadi.exceptions.runtime.InternalNakadiException; diff --git a/src/main/java/org/zalando/nakadi/controller/StoragesController.java b/src/main/java/org/zalando/nakadi/controller/StoragesController.java index 1c8f1a3b90..1de1663a1d 100644 --- a/src/main/java/org/zalando/nakadi/controller/StoragesController.java +++ b/src/main/java/org/zalando/nakadi/controller/StoragesController.java @@ -9,7 +9,7 @@ import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.NativeWebRequest; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.exceptions.runtime.DbWriteOperationsBlockedException; import org.zalando.nakadi.exceptions.runtime.DuplicatedStorageException; import org.zalando.nakadi.exceptions.runtime.ForbiddenOperationException; diff --git a/src/main/java/org/zalando/nakadi/domain/Timeline.java b/src/main/java/org/zalando/nakadi/domain/Timeline.java index 2260fe8d43..1dda5e0b82 100644 --- a/src/main/java/org/zalando/nakadi/domain/Timeline.java +++ b/src/main/java/org/zalando/nakadi/domain/Timeline.java @@ -1,5 +1,6 @@ package org.zalando.nakadi.domain; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.repository.kafka.KafkaCursor; import org.zalando.nakadi.util.UUIDGenerator; diff --git a/src/main/java/org/zalando/nakadi/domain/storage/AddressPort.java b/src/main/java/org/zalando/nakadi/domain/storage/AddressPort.java new file mode 100644 index 0000000000..8391915c48 --- /dev/null +++ b/src/main/java/org/zalando/nakadi/domain/storage/AddressPort.java @@ -0,0 +1,55 @@ +package org.zalando.nakadi.domain.storage; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +public class AddressPort { + private final String address; + private final int port; + + public AddressPort( + @JsonProperty("address") final String address, + @JsonProperty("port") final int port) { + this.address = address; + this.port = port; + } + + public String getAddress() { + return address; + } + + public int getPort() { + return port; + } + + public String asAddressPort() { + return address + ":" + port; + } + + @Override + public String toString() { + return "AddressPort{" + + "address='" + address + '\'' + + ", port=" + port + + '}'; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final AddressPort that = (AddressPort) o; + return port == that.port && + Objects.equals(address, that.address); + } + + @Override + public int hashCode() { + return Objects.hash(address, port); + } +} diff --git a/src/main/java/org/zalando/nakadi/domain/DefaultStorage.java b/src/main/java/org/zalando/nakadi/domain/storage/DefaultStorage.java similarity index 88% rename from src/main/java/org/zalando/nakadi/domain/DefaultStorage.java rename to src/main/java/org/zalando/nakadi/domain/storage/DefaultStorage.java index fb3675de6c..756060a257 100644 --- a/src/main/java/org/zalando/nakadi/domain/DefaultStorage.java +++ b/src/main/java/org/zalando/nakadi/domain/storage/DefaultStorage.java @@ -1,4 +1,4 @@ -package org.zalando.nakadi.domain; +package org.zalando.nakadi.domain.storage; public class DefaultStorage { diff --git a/src/main/java/org/zalando/nakadi/domain/storage/KafkaConfiguration.java b/src/main/java/org/zalando/nakadi/domain/storage/KafkaConfiguration.java new file mode 100644 index 0000000000..a27ac23258 --- /dev/null +++ b/src/main/java/org/zalando/nakadi/domain/storage/KafkaConfiguration.java @@ -0,0 +1,40 @@ +package org.zalando.nakadi.domain.storage; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +public class KafkaConfiguration { + private final ZookeeperConnection zookeeperConnection; + + public KafkaConfiguration( + @JsonProperty(value = "zookeeper_connection") final ZookeeperConnection zookeeperConnection) { + this.zookeeperConnection = zookeeperConnection; + } + + public ZookeeperConnection getZookeeperConnection() { + return zookeeperConnection; + } + + @Override + public String toString() { + return "KafkaConfiguration{" + zookeeperConnection + '}'; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final KafkaConfiguration that = (KafkaConfiguration) o; + return Objects.equals(zookeeperConnection, that.zookeeperConnection); + } + + @Override + public int hashCode() { + return Objects.hash(zookeeperConnection); + } +} diff --git a/src/main/java/org/zalando/nakadi/domain/Storage.java b/src/main/java/org/zalando/nakadi/domain/storage/Storage.java similarity index 51% rename from src/main/java/org/zalando/nakadi/domain/Storage.java rename to src/main/java/org/zalando/nakadi/domain/storage/Storage.java index ac5df752ae..8ee19bc764 100644 --- a/src/main/java/org/zalando/nakadi/domain/Storage.java +++ b/src/main/java/org/zalando/nakadi/domain/storage/Storage.java @@ -1,7 +1,8 @@ -package org.zalando.nakadi.domain; +package org.zalando.nakadi.domain.storage; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; +import org.zalando.nakadi.domain.Timeline; import javax.annotation.Nullable; import java.io.IOException; @@ -19,89 +20,6 @@ public enum Type { } } - public static class KafkaConfiguration { - private String exhibitorAddress; - private Integer exhibitorPort; - private String zkAddress; - private String zkPath; - - public KafkaConfiguration( - @JsonProperty(value="exhibitor_address") final String exhibitorAddress, - @JsonProperty(value="exhibitor_port", defaultValue = "8181") final Integer exhibitorPort, - @JsonProperty(value="zk_address", defaultValue = "zookeeper:2181") final String zkAddress, - @JsonProperty(value="zk_path", defaultValue = "") final String zkPath) { - this.exhibitorAddress = exhibitorAddress; - this.exhibitorPort = exhibitorPort; - this.zkAddress = zkAddress; - this.zkPath = zkPath; - } - - public String getZkAddress() { - return zkAddress; - } - - public void setZkAddress(final String zkAddress) { - this.zkAddress = zkAddress; - } - - public String getZkPath() { - return zkPath; - } - - public void setZkPath(final String zkPath) { - this.zkPath = zkPath; - } - - public String getExhibitorAddress() { - return exhibitorAddress; - } - - public void setExhibitorAddress(final String exhibitorAddress) { - this.exhibitorAddress = exhibitorAddress; - } - - public Integer getExhibitorPort() { - return exhibitorPort; - } - - public void setExhibitorPort(final Integer exhibitorPort) { - this.exhibitorPort = exhibitorPort; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - - if (o == null || getClass() != o.getClass()) { - return false; - } - - final KafkaConfiguration that = (KafkaConfiguration) o; - return Objects.equals(exhibitorAddress, that.exhibitorAddress) && - Objects.equals(exhibitorPort, that.exhibitorPort) && - Objects.equals(zkAddress, that.zkAddress) && - Objects.equals(zkPath, that.zkPath); - } - - @Override - public int hashCode() { - return Objects.hash(exhibitorAddress, exhibitorPort, zkAddress, zkPath); - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder("KafkaConfiguration{"); - sb.append("exhibitorAddress='").append(exhibitorAddress).append('\''); - sb.append(", exhibitorPort=").append(exhibitorPort); - sb.append(", zkAddress='").append(zkAddress).append('\''); - sb.append(", zkPath='").append(zkPath).append('\''); - sb.append('}'); - return sb.toString(); - } - } - private String id; @JsonProperty("storage_type") private Type type; diff --git a/src/main/java/org/zalando/nakadi/domain/storage/ZookeeperConnection.java b/src/main/java/org/zalando/nakadi/domain/storage/ZookeeperConnection.java new file mode 100644 index 0000000000..81bf8206f2 --- /dev/null +++ b/src/main/java/org/zalando/nakadi/domain/storage/ZookeeperConnection.java @@ -0,0 +1,121 @@ +package org.zalando.nakadi.domain.storage; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.codehaus.jackson.annotate.JsonIgnore; + +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class ZookeeperConnection { + + private final ZookeeperConnectionType type; + + private final List addresses; + + private final String path; + + public ZookeeperConnection( + @JsonProperty("type") final ZookeeperConnectionType type, + @JsonProperty("addresses") final List addresses, + @JsonProperty("path") final String path) { + this.type = type; + this.addresses = addresses; + this.path = path; + } + + public ZookeeperConnectionType getType() { + return type; + } + + public List getAddresses() { + return addresses; + } + + @Nullable + public String getPath() { + return path; + } + + @JsonIgnore + public String getPathPrepared() { + if (null == path) { + return ""; + } else { + return path.startsWith("/") ? path : ("/" + path); + } + } + + @Override + public String toString() { + return "ZookeeperConnection{" + + "type=" + type + + ", addresses=" + addresses + + ", path='" + path + '\'' + + '}'; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ZookeeperConnection that = (ZookeeperConnection) o; + return type == that.type && + Objects.equals(addresses, that.addresses) && + Objects.equals(path, that.path); + } + + @Override + public int hashCode() { + return Objects.hash(type, addresses, path); + } + + private static final Pattern ZK_CONN_PATTERN = + Pattern.compile("^(\\w+)://([\\w,:\\.-]*)(/[\\w\\-\\./]*)?$"); + + public static ZookeeperConnection valueOf(final String connStr) { + final Matcher m = ZK_CONN_PATTERN.matcher(connStr); + if (!m.matches()) { + throw new RuntimeException("Unsupported format of zk connection string " + connStr); + } + final String typeString = m.group(1); + final ZookeeperConnectionType type = Arrays.stream(ZookeeperConnectionType.values()) + .filter(v -> v.name().equalsIgnoreCase(typeString)) + .findAny() + .orElseThrow(() -> + new RuntimeException("Can not detect type of connection" + typeString + " in " + connStr)); + + final String[] hostsPorts = m.group(2).split(","); + final List ports = Arrays.stream(hostsPorts) + .filter(v -> v.contains(":")) + .map(v -> v.substring(v.lastIndexOf(':') + 1)) + .map(Integer::parseInt) + .collect(Collectors.toList()); + if (ports.isEmpty()) { + throw new RuntimeException("Ports are not defined for hosts, no default port could be found in " + connStr); + } + final Integer defaultPort = ports.get(ports.size() - 1); + final List addresses = Arrays.stream(hostsPorts).map(hostPort -> { + final int sepIndex = hostPort.lastIndexOf(':'); + if (sepIndex > 0) { + return new AddressPort( + hostPort.substring(0, sepIndex), Integer.parseInt(hostPort.substring(sepIndex + 1))); + } else { + return new AddressPort(hostPort, defaultPort); + } + + }).collect(Collectors.toList()); + final String path = m.groupCount() >= 3 ? m.group(3) : null; + + return new ZookeeperConnection(type, addresses, path); + + } +} diff --git a/src/main/java/org/zalando/nakadi/domain/storage/ZookeeperConnectionType.java b/src/main/java/org/zalando/nakadi/domain/storage/ZookeeperConnectionType.java new file mode 100644 index 0000000000..46d4380711 --- /dev/null +++ b/src/main/java/org/zalando/nakadi/domain/storage/ZookeeperConnectionType.java @@ -0,0 +1,6 @@ +package org.zalando.nakadi.domain.storage; + +public enum ZookeeperConnectionType { + ZOOKEEPER, + EXHIBITOR, +} diff --git a/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java b/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java index bec0616082..75018dcd88 100644 --- a/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java +++ b/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java @@ -6,7 +6,8 @@ import org.springframework.stereotype.Component; import org.zalando.nakadi.config.NakadiSettings; import org.zalando.nakadi.domain.NakadiCursor; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.KafkaConfiguration; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException; import org.zalando.nakadi.exceptions.runtime.TopicRepositoryException; @@ -52,12 +53,9 @@ public KafkaRepositoryCreator( @Override public TopicRepository createTopicRepository(final Storage storage) throws TopicRepositoryException { try { - final Storage.KafkaConfiguration kafkaConfiguration = storage.getKafkaConfiguration(); + final KafkaConfiguration kafkaConfiguration = storage.getKafkaConfiguration(); final ZooKeeperHolder zooKeeperHolder = new ZooKeeperHolder( - kafkaConfiguration.getZkAddress(), - kafkaConfiguration.getZkPath(), - kafkaConfiguration.getExhibitorAddress(), - kafkaConfiguration.getExhibitorPort(), + kafkaConfiguration.getZookeeperConnection(), zookeeperSettings.getZkSessionTimeoutMs(), zookeeperSettings.getZkConnectionTimeoutMs(), nakadiSettings); diff --git a/src/main/java/org/zalando/nakadi/repository/TopicRepositoryCreator.java b/src/main/java/org/zalando/nakadi/repository/TopicRepositoryCreator.java index fd6472b8bd..0cd2dd8ab6 100644 --- a/src/main/java/org/zalando/nakadi/repository/TopicRepositoryCreator.java +++ b/src/main/java/org/zalando/nakadi/repository/TopicRepositoryCreator.java @@ -1,7 +1,7 @@ package org.zalando.nakadi.repository; import org.zalando.nakadi.domain.NakadiCursor; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException; import org.zalando.nakadi.exceptions.runtime.TopicRepositoryException; diff --git a/src/main/java/org/zalando/nakadi/repository/TopicRepositoryHolder.java b/src/main/java/org/zalando/nakadi/repository/TopicRepositoryHolder.java index 00c60a7e03..81a98db006 100644 --- a/src/main/java/org/zalando/nakadi/repository/TopicRepositoryHolder.java +++ b/src/main/java/org/zalando/nakadi/repository/TopicRepositoryHolder.java @@ -7,7 +7,7 @@ import org.springframework.stereotype.Component; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.PartitionStatistics; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException; import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; diff --git a/src/main/java/org/zalando/nakadi/repository/db/StorageDbRepository.java b/src/main/java/org/zalando/nakadi/repository/db/StorageDbRepository.java index 9611453cf5..22c4d8cd2b 100644 --- a/src/main/java/org/zalando/nakadi/repository/db/StorageDbRepository.java +++ b/src/main/java/org/zalando/nakadi/repository/db/StorageDbRepository.java @@ -10,7 +10,7 @@ import org.springframework.jdbc.core.RowMapper; import org.springframework.stereotype.Repository; import org.zalando.nakadi.annotations.DB; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.exceptions.runtime.DuplicatedStorageException; import org.zalando.nakadi.exceptions.runtime.NoSuchStorageException; import org.zalando.nakadi.exceptions.runtime.RepositoryProblemException; diff --git a/src/main/java/org/zalando/nakadi/repository/zookeeper/ZooKeeperHolder.java b/src/main/java/org/zalando/nakadi/repository/zookeeper/ZooKeeperHolder.java index c10cb60edd..acc3bad200 100644 --- a/src/main/java/org/zalando/nakadi/repository/zookeeper/ZooKeeperHolder.java +++ b/src/main/java/org/zalando/nakadi/repository/zookeeper/ZooKeeperHolder.java @@ -1,8 +1,8 @@ package org.zalando.nakadi.repository.zookeeper; -import org.apache.curator.RetryPolicy; import org.apache.curator.ensemble.EnsembleProvider; import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient; +import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider; import org.apache.curator.ensemble.exhibitor.ExhibitorRestClient; import org.apache.curator.ensemble.exhibitor.Exhibitors; import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; @@ -10,13 +10,13 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.zalando.nakadi.config.NakadiSettings; +import org.zalando.nakadi.domain.storage.AddressPort; +import org.zalando.nakadi.domain.storage.ZookeeperConnection; import org.zalando.nakadi.exceptions.runtime.ZookeeperException; import java.io.Closeable; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; public class ZooKeeperHolder { @@ -24,29 +24,18 @@ public class ZooKeeperHolder { private static final int EXHIBITOR_RETRY_MAX = 3; private static final int EXHIBITOR_POLLING_MS = 300000; - private final String zookeeperBrokers; - private final String zookeeperKafkaNamespace; - private final String exhibitorAddresses; - private final Integer exhibitorPort; - private final Integer sessionTimeoutMs; private final Integer connectionTimeoutMs; private final long maxCommitTimeoutMs; + private final ZookeeperConnection conn; private CuratorFramework zooKeeper; private CuratorFramework subscriptionCurator; - public ZooKeeperHolder(final String zookeeperBrokers, - final String zookeeperKafkaNamespace, - final String exhibitorAddresses, - final Integer exhibitorPort, + public ZooKeeperHolder(final ZookeeperConnection conn, final Integer sessionTimeoutMs, final Integer connectionTimeoutMs, final NakadiSettings nakadiSettings) throws Exception { - this.zookeeperBrokers = zookeeperBrokers; - this.zookeeperKafkaNamespace = zookeeperKafkaNamespace; - this.exhibitorAddresses = exhibitorAddresses; - this.exhibitorPort = exhibitorPort; - this.sessionTimeoutMs = sessionTimeoutMs; + this.conn = conn; this.connectionTimeoutMs = connectionTimeoutMs; this.maxCommitTimeoutMs = TimeUnit.SECONDS.toMillis(nakadiSettings.getMaxCommitTimeout()); @@ -92,7 +81,7 @@ public StaticCuratorFramework(final CuratorFramework curatorFramework) { } @Override - public void close() throws IOException { + public void close() { // do not ever close this particular instance of curator } } @@ -104,7 +93,7 @@ public DisposableCuratorFramework(final CuratorFramework curatorFramework) { } @Override - public void close() throws IOException { + public void close() { getCuratorFramework().close(); } } @@ -122,32 +111,35 @@ private CuratorFramework createCuratorFramework(final int sessionTimeoutMs, } private EnsembleProvider createEnsembleProvider() throws Exception { - final EnsembleProvider ensembleProvider; - final RetryPolicy retryPolicy = new ExponentialBackoffRetry(EXHIBITOR_RETRY_TIME, EXHIBITOR_RETRY_MAX); - if (exhibitorAddresses != null) { - final Collection exhibitorHosts = Arrays.asList(exhibitorAddresses.split("\\s*,\\s*")); - final Exhibitors exhibitors = new Exhibitors(exhibitorHosts, exhibitorPort, - () -> zookeeperBrokers + zookeeperKafkaNamespace); - final ExhibitorRestClient exhibitorRestClient = new DefaultExhibitorRestClient(); - ensembleProvider = new ExhibitorEnsembleProvider(exhibitors, - exhibitorRestClient, "/exhibitor/v1/cluster/list", EXHIBITOR_POLLING_MS, retryPolicy); - ((ExhibitorEnsembleProvider) ensembleProvider).pollForInitialEnsemble(); - } else { - ensembleProvider = new FixedEnsembleProvider(zookeeperBrokers + zookeeperKafkaNamespace); - } - return ensembleProvider; - } - - private class ExhibitorEnsembleProvider extends org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider { - - ExhibitorEnsembleProvider(final Exhibitors exhibitors, final ExhibitorRestClient restClient, - final String restUriPath, final int pollingMs, final RetryPolicy retryPolicy) { - super(exhibitors, restClient, restUriPath, pollingMs, retryPolicy); - } - - @Override - public String getConnectionString() { - return super.getConnectionString() + zookeeperKafkaNamespace; + switch (conn.getType()) { + case EXHIBITOR: + final Exhibitors exhibitors = new Exhibitors( + conn.getAddresses().stream().map(AddressPort::getAddress).collect(Collectors.toList()), + conn.getAddresses().get(0).getPort(), + () -> { + throw new RuntimeException("There is no backup connection string (or it is wrong)"); + }); + final ExhibitorRestClient exhibitorRestClient = new DefaultExhibitorRestClient(); + final ExhibitorEnsembleProvider result = new ExhibitorEnsembleProvider( + exhibitors, + exhibitorRestClient, + "/exhibitor/v1/cluster/list", + EXHIBITOR_POLLING_MS, + new ExponentialBackoffRetry(EXHIBITOR_RETRY_TIME, EXHIBITOR_RETRY_MAX)) { + @Override + public String getConnectionString() { + return super.getConnectionString() + conn.getPathPrepared(); + } + }; + result.pollForInitialEnsemble(); + return result; + case ZOOKEEPER: + final String address = conn.getAddresses().stream() + .map(AddressPort::asAddressPort) + .collect(Collectors.joining(",")); + return new FixedEnsembleProvider(address + conn.getPathPrepared()); + default: + throw new RuntimeException("Connection type " + conn.getType() + " is not supported"); } } } diff --git a/src/main/java/org/zalando/nakadi/repository/zookeeper/ZookeeperConfig.java b/src/main/java/org/zalando/nakadi/repository/zookeeper/ZookeeperConfig.java index 4a83f3c180..ac987e0774 100644 --- a/src/main/java/org/zalando/nakadi/repository/zookeeper/ZookeeperConfig.java +++ b/src/main/java/org/zalando/nakadi/repository/zookeeper/ZookeeperConfig.java @@ -5,6 +5,7 @@ import org.springframework.context.annotation.Profile; import org.springframework.core.env.Environment; import org.zalando.nakadi.config.NakadiSettings; +import org.zalando.nakadi.domain.storage.ZookeeperConnection; @Configuration @Profile("!test") @@ -14,10 +15,7 @@ public class ZookeeperConfig { public ZooKeeperHolder zooKeeperHolder(final Environment environment, final NakadiSettings nakadiSettings) throws Exception { return new ZooKeeperHolder( - environment.getProperty("nakadi.zookeeper.brokers"), - environment.getProperty("nakadi.zookeeper.kafkaNamespace", ""), - environment.getProperty("nakadi.zookeeper.exhibitor.brokers"), - Integer.parseInt(environment.getProperty("nakadi.zookeeper.exhibitor.port", "0")), + ZookeeperConnection.valueOf(environment.getProperty("nakadi.zookeeper.connectionString")), Integer.parseInt(environment.getProperty("nakadi.zookeeper.sessionTimeoutMs")), Integer.parseInt(environment.getProperty("nakadi.zookeeper.connectionTimeoutMs")), nakadiSettings diff --git a/src/main/java/org/zalando/nakadi/service/CursorOperationsService.java b/src/main/java/org/zalando/nakadi/service/CursorOperationsService.java index c58f81f1c1..2f28f56a4e 100644 --- a/src/main/java/org/zalando/nakadi/service/CursorOperationsService.java +++ b/src/main/java/org/zalando/nakadi/service/CursorOperationsService.java @@ -9,7 +9,7 @@ import org.zalando.nakadi.domain.PartitionEndStatistics; import org.zalando.nakadi.domain.PartitionStatistics; import org.zalando.nakadi.domain.ShiftedNakadiCursor; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.runtime.InternalNakadiException; import org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation; diff --git a/src/main/java/org/zalando/nakadi/service/StaticStorageWorkerFactory.java b/src/main/java/org/zalando/nakadi/service/StaticStorageWorkerFactory.java index aa935b2777..54bbc75fcb 100644 --- a/src/main/java/org/zalando/nakadi/service/StaticStorageWorkerFactory.java +++ b/src/main/java/org/zalando/nakadi/service/StaticStorageWorkerFactory.java @@ -1,6 +1,6 @@ package org.zalando.nakadi.service; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation; import org.zalando.nakadi.repository.kafka.KafkaCursor; diff --git a/src/main/java/org/zalando/nakadi/service/StorageService.java b/src/main/java/org/zalando/nakadi/service/StorageService.java index ef2f71a501..18f3acf8a3 100644 --- a/src/main/java/org/zalando/nakadi/service/StorageService.java +++ b/src/main/java/org/zalando/nakadi/service/StorageService.java @@ -12,8 +12,8 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionException; -import org.zalando.nakadi.domain.DefaultStorage; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.DefaultStorage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.exceptions.runtime.DbWriteOperationsBlockedException; import org.zalando.nakadi.exceptions.runtime.DuplicatedStorageException; import org.zalando.nakadi.exceptions.runtime.InternalNakadiException; diff --git a/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsJob.java b/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsJob.java index 3fa7ccacea..aec713a27d 100644 --- a/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsJob.java +++ b/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsJob.java @@ -6,7 +6,7 @@ import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.domain.TopicPartition; import org.zalando.nakadi.repository.TopicRepositoryHolder; diff --git a/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java b/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java index 3c715fc3b2..9b3c835075 100644 --- a/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java +++ b/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java @@ -13,13 +13,13 @@ import org.springframework.transaction.support.TransactionTemplate; import org.zalando.nakadi.config.NakadiSettings; import org.zalando.nakadi.domain.CleanupPolicy; -import org.zalando.nakadi.domain.DefaultStorage; +import org.zalando.nakadi.domain.storage.DefaultStorage; import org.zalando.nakadi.domain.EventType; import org.zalando.nakadi.domain.EventTypeBase; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.PartitionStatistics; import org.zalando.nakadi.domain.ResourceImpl; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.runtime.AccessDeniedException; import org.zalando.nakadi.exceptions.runtime.ConflictException; diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 354c5792af..a887272519 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -70,8 +70,7 @@ nakadi: delivery.timeout.ms: 30000 # request.timeout.ms + linger.ms max.block.ms: 5000 # kafka default 60000 zookeeper: - kafkaNamespace: - brokers: zookeeper:2181 + connectionString: zookeeper://zookeeper:2181 sessionTimeoutMs: 10000 connectionTimeoutMs: 3000 oauth2: diff --git a/src/test/java/org/zalando/nakadi/config/NakadiConfigTest.java b/src/test/java/org/zalando/nakadi/config/NakadiConfigTest.java index d0800eb4bb..fd8b5da9a5 100644 --- a/src/test/java/org/zalando/nakadi/config/NakadiConfigTest.java +++ b/src/test/java/org/zalando/nakadi/config/NakadiConfigTest.java @@ -7,8 +7,8 @@ import org.junit.Test; import org.mockito.Mockito; import org.springframework.core.env.Environment; -import org.zalando.nakadi.domain.DefaultStorage; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.DefaultStorage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.repository.db.StorageDbRepository; import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; import org.zalando.nakadi.service.StorageService; @@ -28,7 +28,8 @@ public void setUp() { zooKeeperHolder = Mockito.mock(ZooKeeperHolder.class); Mockito.when(zooKeeperHolder.get()).thenReturn(curatorFramework); Mockito.when(curatorFramework.getData()).thenReturn(dataBuilder); - Mockito.when(environment.getProperty("nakadi.zookeeper.exhibitor.port", "0")).thenReturn("0"); + Mockito.when(environment.getProperty("nakadi.zookeeper.connectionString")) + .thenReturn("exhibitor://localhost:8181/path"); } @Test diff --git a/src/test/java/org/zalando/nakadi/controller/EventStreamControllerTest.java b/src/test/java/org/zalando/nakadi/controller/EventStreamControllerTest.java index d1a77a8ba0..590b85396e 100644 --- a/src/test/java/org/zalando/nakadi/controller/EventStreamControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/EventStreamControllerTest.java @@ -18,7 +18,7 @@ import org.zalando.nakadi.domain.EventTypeBase; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.PartitionStatistics; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.runtime.AccessDeniedException; import org.zalando.nakadi.exceptions.runtime.InvalidCursorException; diff --git a/src/test/java/org/zalando/nakadi/controller/PartitionsControllerTest.java b/src/test/java/org/zalando/nakadi/controller/PartitionsControllerTest.java index 805d95dcc7..dc6d0b98b8 100644 --- a/src/test/java/org/zalando/nakadi/controller/PartitionsControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/PartitionsControllerTest.java @@ -14,7 +14,7 @@ import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.NakadiCursorLag; import org.zalando.nakadi.domain.PartitionStatistics; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.runtime.InternalNakadiException; import org.zalando.nakadi.exceptions.runtime.NoSuchEventTypeException; diff --git a/src/test/java/org/zalando/nakadi/controller/StoragesControllerTest.java b/src/test/java/org/zalando/nakadi/controller/StoragesControllerTest.java index 11494d1c47..38261e5b63 100644 --- a/src/test/java/org/zalando/nakadi/controller/StoragesControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/StoragesControllerTest.java @@ -8,7 +8,9 @@ import org.zalando.nakadi.config.SecuritySettings; import org.zalando.nakadi.controller.advice.NakadiProblemExceptionHandler; import org.zalando.nakadi.controller.advice.SettingsExceptionHandler; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.KafkaConfiguration; +import org.zalando.nakadi.domain.storage.Storage; +import org.zalando.nakadi.domain.storage.ZookeeperConnection; import org.zalando.nakadi.plugin.api.authz.AuthorizationService; import org.zalando.nakadi.security.ClientResolver; import org.zalando.nakadi.service.AdminService; @@ -121,8 +123,8 @@ public static Storage createKafkaStorage(final String id) { final Storage storage = new Storage(); storage.setId(id); storage.setType(Storage.Type.KAFKA); - final Storage.KafkaConfiguration config = - new Storage.KafkaConfiguration("https://localhost", 8181, "https://localhost", "/path/to/kafka"); + final KafkaConfiguration config = new KafkaConfiguration( + ZookeeperConnection.valueOf("exhibitor://localhost:8181/path/to/kafka")); storage.setConfiguration(config); return storage; @@ -133,8 +135,7 @@ private JSONObject createJsonKafkaStorage(final String id) { json.put("id", id); json.put("storage_type", "kafka"); final JSONObject config = new JSONObject(); - config.put("zk_address", "http://localhost"); - config.put("zk_path", "/path/to/kafka"); + config.put("zoookeeper_connection", "exhibitor://localhost:8181/path/to/kafka"); json.put("kafka_configuration", config); return json; diff --git a/src/test/java/org/zalando/nakadi/controller/TimelinesControllerTest.java b/src/test/java/org/zalando/nakadi/controller/TimelinesControllerTest.java index 6e2a6dc44e..66c00be20e 100644 --- a/src/test/java/org/zalando/nakadi/controller/TimelinesControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/TimelinesControllerTest.java @@ -12,7 +12,7 @@ import org.springframework.test.web.servlet.setup.MockMvcBuilders; import org.zalando.nakadi.config.SecuritySettings; import org.zalando.nakadi.controller.advice.NakadiProblemExceptionHandler; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.plugin.api.authz.AuthorizationService; import org.zalando.nakadi.security.ClientResolver; diff --git a/src/test/java/org/zalando/nakadi/domain/storage/ZookeeperConnectionTest.java b/src/test/java/org/zalando/nakadi/domain/storage/ZookeeperConnectionTest.java new file mode 100644 index 0000000000..df00d59396 --- /dev/null +++ b/src/test/java/org/zalando/nakadi/domain/storage/ZookeeperConnectionTest.java @@ -0,0 +1,79 @@ +package org.zalando.nakadi.domain.storage; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +@RunWith(Parameterized.class) +public class ZookeeperConnectionTest { + + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList( + new Object[]{ + "exhibitor://localhost:8181/path", + new ZookeeperConnection( + ZookeeperConnectionType.EXHIBITOR, + Collections.singletonList(new AddressPort("localhost", 8181)), + "/path") + }, new Object[]{ + "zookeeper://localhost:8181/path", + new ZookeeperConnection( + ZookeeperConnectionType.ZOOKEEPER, + Collections.singletonList(new AddressPort("localhost", 8181)), + "/path") + }, new Object[]{ + "zookeeper://localhost:8181", + new ZookeeperConnection( + ZookeeperConnectionType.ZOOKEEPER, + Collections.singletonList(new AddressPort("localhost", 8181)), + null) + }, new Object[]{ + "zookeeper://localhost:8181,localhost2:2181", + new ZookeeperConnection( + ZookeeperConnectionType.ZOOKEEPER, + Arrays.asList( + new AddressPort("localhost", 8181), + new AddressPort("localhost2", 2181)), + null) + }, new Object[]{ + "zookeeper://localhost:8181,localhost2:2181/testpath", + new ZookeeperConnection( + ZookeeperConnectionType.ZOOKEEPER, + Arrays.asList( + new AddressPort("localhost", 8181), + new AddressPort("localhost2", 2181)), + "/testpath") + }, new Object[]{ + "zookeeper://127.0.0.1:8181,127.0.0.2:2181/test/path", + new ZookeeperConnection( + ZookeeperConnectionType.ZOOKEEPER, + Arrays.asList( + new AddressPort("127.0.0.1", 8181), + new AddressPort("127.0.0.2", 2181)), + "/test/path") + } + ); + } + + private final String value; + private final ZookeeperConnection expectedConnection; + + public ZookeeperConnectionTest(final String value, final ZookeeperConnection expectedConnection) { + this.value = value; + this.expectedConnection = expectedConnection; + } + + @Test + public void test() { + Assert.assertEquals( + expectedConnection, + ZookeeperConnection.valueOf(value) + ); + } +} diff --git a/src/test/java/org/zalando/nakadi/repository/TopicRepositoryHolderTest.java b/src/test/java/org/zalando/nakadi/repository/TopicRepositoryHolderTest.java index 5f3ac8039d..b763c30cce 100644 --- a/src/test/java/org/zalando/nakadi/repository/TopicRepositoryHolderTest.java +++ b/src/test/java/org/zalando/nakadi/repository/TopicRepositoryHolderTest.java @@ -3,7 +3,7 @@ import org.junit.Assert; import org.junit.Test; import org.zalando.nakadi.domain.NakadiCursor; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException; import org.zalando.nakadi.exceptions.runtime.TopicRepositoryException; diff --git a/src/test/java/org/zalando/nakadi/security/AuthenticationTest.java b/src/test/java/org/zalando/nakadi/security/AuthenticationTest.java index 8e43034745..438fc8f8ff 100644 --- a/src/test/java/org/zalando/nakadi/security/AuthenticationTest.java +++ b/src/test/java/org/zalando/nakadi/security/AuthenticationTest.java @@ -29,8 +29,8 @@ import org.springframework.web.context.WebApplicationContext; import org.zalando.nakadi.Application; import org.zalando.nakadi.config.SecuritySettings; -import org.zalando.nakadi.domain.DefaultStorage; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.DefaultStorage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.metrics.EventTypeMetricRegistry; import org.zalando.nakadi.repository.EventTypeRepository; import org.zalando.nakadi.repository.TopicRepository; diff --git a/src/test/java/org/zalando/nakadi/service/CursorOperationsServiceTest.java b/src/test/java/org/zalando/nakadi/service/CursorOperationsServiceTest.java index 1743ef1638..6a906d9b1f 100644 --- a/src/test/java/org/zalando/nakadi/service/CursorOperationsServiceTest.java +++ b/src/test/java/org/zalando/nakadi/service/CursorOperationsServiceTest.java @@ -5,7 +5,7 @@ import org.zalando.nakadi.config.NakadiSettings; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.ShiftedNakadiCursor; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation; import org.zalando.nakadi.repository.TopicRepository; diff --git a/src/test/java/org/zalando/nakadi/service/NakadiCursorComparatorTest.java b/src/test/java/org/zalando/nakadi/service/NakadiCursorComparatorTest.java index f2f10c87b2..61d5670058 100644 --- a/src/test/java/org/zalando/nakadi/service/NakadiCursorComparatorTest.java +++ b/src/test/java/org/zalando/nakadi/service/NakadiCursorComparatorTest.java @@ -4,7 +4,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.zalando.nakadi.domain.NakadiCursor; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.runtime.InternalNakadiException; import org.zalando.nakadi.exceptions.runtime.NoSuchEventTypeException; diff --git a/src/test/java/org/zalando/nakadi/service/StorageServiceTest.java b/src/test/java/org/zalando/nakadi/service/StorageServiceTest.java index d6eaa12dd8..e5f2ff972c 100644 --- a/src/test/java/org/zalando/nakadi/service/StorageServiceTest.java +++ b/src/test/java/org/zalando/nakadi/service/StorageServiceTest.java @@ -3,8 +3,10 @@ import org.json.JSONObject; import org.junit.Before; import org.junit.Test; -import org.zalando.nakadi.domain.DefaultStorage; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.DefaultStorage; +import org.zalando.nakadi.domain.storage.KafkaConfiguration; +import org.zalando.nakadi.domain.storage.Storage; +import org.zalando.nakadi.domain.storage.ZookeeperConnection; import org.zalando.nakadi.exceptions.runtime.NoSuchStorageException; import org.zalando.nakadi.exceptions.runtime.StorageIsUsedException; import org.zalando.nakadi.repository.db.StorageDbRepository; @@ -85,8 +87,8 @@ private Storage createTestStorage() { final Storage storage = new Storage(); storage.setType(Storage.Type.KAFKA); storage.setId("123-abc"); - final Storage.KafkaConfiguration configuration = - new Storage.KafkaConfiguration("https://localhost", 8181, "https://localhost", "/path/to/kafka"); + final KafkaConfiguration configuration = + new KafkaConfiguration(ZookeeperConnection.valueOf("exhibitor://localhost:8181/path/to/kafka")); storage.setConfiguration(configuration); return storage; } diff --git a/src/test/java/org/zalando/nakadi/service/SubscriptionTimeLagServiceTest.java b/src/test/java/org/zalando/nakadi/service/SubscriptionTimeLagServiceTest.java index 3ad0c78d46..1df9b96249 100644 --- a/src/test/java/org/zalando/nakadi/service/SubscriptionTimeLagServiceTest.java +++ b/src/test/java/org/zalando/nakadi/service/SubscriptionTimeLagServiceTest.java @@ -7,7 +7,7 @@ import org.zalando.nakadi.domain.EventTypePartition; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.PartitionEndStatistics; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.runtime.InvalidCursorException; import org.zalando.nakadi.repository.EventConsumer; diff --git a/src/test/java/org/zalando/nakadi/service/converter/CursorConverterImplTest.java b/src/test/java/org/zalando/nakadi/service/converter/CursorConverterImplTest.java index 55b9d9ea20..eaf4deb2bf 100644 --- a/src/test/java/org/zalando/nakadi/service/converter/CursorConverterImplTest.java +++ b/src/test/java/org/zalando/nakadi/service/converter/CursorConverterImplTest.java @@ -4,7 +4,7 @@ import org.junit.Test; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.PartitionStatistics; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.repository.TopicRepository; import org.zalando.nakadi.repository.db.EventTypeCache; diff --git a/src/test/java/org/zalando/nakadi/service/converter/VersionOneConverterTest.java b/src/test/java/org/zalando/nakadi/service/converter/VersionOneConverterTest.java index f0de627811..2f863df8d3 100644 --- a/src/test/java/org/zalando/nakadi/service/converter/VersionOneConverterTest.java +++ b/src/test/java/org/zalando/nakadi/service/converter/VersionOneConverterTest.java @@ -6,7 +6,7 @@ import org.zalando.nakadi.domain.CursorError; import org.zalando.nakadi.domain.EventType; import org.zalando.nakadi.domain.NakadiCursor; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.runtime.InvalidCursorException; import org.zalando.nakadi.repository.db.EventTypeCache; diff --git a/src/test/java/org/zalando/nakadi/service/job/DiskUsageStatsJobTest.java b/src/test/java/org/zalando/nakadi/service/job/DiskUsageStatsJobTest.java index 67fd86e9ed..c17388fae8 100644 --- a/src/test/java/org/zalando/nakadi/service/job/DiskUsageStatsJobTest.java +++ b/src/test/java/org/zalando/nakadi/service/job/DiskUsageStatsJobTest.java @@ -1,7 +1,7 @@ package org.zalando.nakadi.service.job; import org.junit.Test; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.domain.TopicPartition; import org.zalando.nakadi.repository.TopicRepository; diff --git a/src/test/java/org/zalando/nakadi/service/subscription/StartingStateTest.java b/src/test/java/org/zalando/nakadi/service/subscription/StartingStateTest.java index 44bda1784f..928762cd05 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/StartingStateTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/StartingStateTest.java @@ -8,7 +8,7 @@ import org.junit.Test; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.PartitionStatistics; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.domain.SubscriptionBase; import org.zalando.nakadi.domain.Timeline; diff --git a/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java b/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java index 055c36f819..ed31dd5147 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java @@ -4,7 +4,7 @@ import org.junit.Test; import org.zalando.nakadi.domain.ConsumedEvent; import org.zalando.nakadi.domain.NakadiCursor; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.repository.kafka.KafkaCursor; diff --git a/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java b/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java index e77027e37a..59d5d8f6c6 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java @@ -8,7 +8,7 @@ import org.zalando.nakadi.domain.EventTypePartition; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.PartitionStatistics; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.runtime.InternalNakadiException; diff --git a/src/test/java/org/zalando/nakadi/service/timeline/TimelineServiceTest.java b/src/test/java/org/zalando/nakadi/service/timeline/TimelineServiceTest.java index b8cda6117e..62b43237f4 100644 --- a/src/test/java/org/zalando/nakadi/service/timeline/TimelineServiceTest.java +++ b/src/test/java/org/zalando/nakadi/service/timeline/TimelineServiceTest.java @@ -8,9 +8,9 @@ import org.springframework.transaction.support.TransactionTemplate; import org.zalando.nakadi.config.NakadiSettings; import org.zalando.nakadi.domain.CleanupPolicy; -import org.zalando.nakadi.domain.DefaultStorage; +import org.zalando.nakadi.domain.storage.DefaultStorage; import org.zalando.nakadi.domain.EventType; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.runtime.InconsistentStateException; import org.zalando.nakadi.exceptions.runtime.InternalNakadiException; diff --git a/src/test/java/org/zalando/nakadi/utils/TestUtils.java b/src/test/java/org/zalando/nakadi/utils/TestUtils.java index a72da43b16..0fb78d6973 100644 --- a/src/test/java/org/zalando/nakadi/utils/TestUtils.java +++ b/src/test/java/org/zalando/nakadi/utils/TestUtils.java @@ -19,7 +19,7 @@ import org.zalando.nakadi.domain.BatchFactory; import org.zalando.nakadi.domain.BatchItem; import org.zalando.nakadi.domain.EventType; -import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.runtime.AccessDeniedException;