Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1067 from zalando/ARUHA-2324
Browse files Browse the repository at this point in the history
ARUHA-2324 Use connection string to connect to zookeeper
  • Loading branch information
antban authored Jun 26, 2019
2 parents 753677f + dee6535 commit 8796b12
Show file tree
Hide file tree
Showing 47 changed files with 439 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.mockito.Mockito;
import org.zalando.nakadi.config.RepositoriesConfig;
import org.zalando.nakadi.domain.EventType;
import org.zalando.nakadi.domain.Storage;
import org.zalando.nakadi.domain.storage.Storage;
import org.zalando.nakadi.domain.Timeline;
import org.zalando.nakadi.repository.EventTypeRepository;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import org.junit.Before;
import org.junit.Test;
import org.zalando.nakadi.domain.EventType;
import org.zalando.nakadi.domain.Storage;
import org.zalando.nakadi.domain.storage.KafkaConfiguration;
import org.zalando.nakadi.domain.storage.Storage;
import org.zalando.nakadi.domain.Timeline;
import org.zalando.nakadi.domain.storage.ZookeeperConnection;
import org.zalando.nakadi.exceptions.runtime.NoSuchStorageException;
import org.zalando.nakadi.exceptions.runtime.StorageIsUsedException;
import org.zalando.nakadi.utils.TestUtils;
Expand Down Expand Up @@ -35,13 +37,8 @@ public void setUp() throws Exception {
eventTypeDbRepository = new EventTypeDbRepository(template, TestUtils.OBJECT_MAPPER);
}

static Storage createStorage(final String name,
final String exhibitorAddress,
final int exhibitorPort,
final String zkAddress,
final String zkPath) {
final Storage.KafkaConfiguration config =
new Storage.KafkaConfiguration(exhibitorAddress, exhibitorPort, zkAddress, zkPath);
static Storage createStorage(final String name, final ZookeeperConnection zkConnection) {
final KafkaConfiguration config = new KafkaConfiguration(zkConnection);
final Storage storage = new Storage();
storage.setId(name);
storage.setType(Storage.Type.KAFKA);
Expand All @@ -50,9 +47,9 @@ static Storage createStorage(final String name,
}

@Test
public void testStorageCreated() throws Exception {
public void testStorageCreated() {
final String name = randomUUID();
final Storage storage = createStorage(name, "exaddress", 8181, "address", "path");
final Storage storage = createStorage(name, ZookeeperConnection.valueOf("exhibitor://exaddress:8181/path"));

repository.createStorage(storage);

Expand All @@ -64,15 +61,15 @@ public void testStorageCreated() throws Exception {
}

@Test
public void testStorageOrdered() throws Exception {
public void testStorageOrdered() {
final String namePrefix = randomValidStringOfLength(31);

final Storage storage2 = repository.createStorage(
createStorage(namePrefix + "2", "exaddress", 8181, "address1", "path3"));
createStorage(namePrefix + "2", ZookeeperConnection.valueOf("exhibitor://exaddress1:8181/path3")));
final Storage storage1 = repository.createStorage(
createStorage(namePrefix + "1", "exaddress", 8181, "address2", "path2"));
createStorage(namePrefix + "1", ZookeeperConnection.valueOf("exhibitor://exaddress2:8181/path2")));
final Storage storage3 = repository.createStorage(
createStorage(namePrefix + "3", "exaddress", 8181, "address3", "path1"));
createStorage(namePrefix + "3", ZookeeperConnection.valueOf("exhibitor://exaddress3:8181/path1")));

final List<Storage> storages = repository.listStorages().stream()
.filter(st -> st.getId() != null)
Expand All @@ -86,23 +83,25 @@ public void testStorageOrdered() throws Exception {
}

@Test
public void testStorageDeleted() throws Exception {
public void testStorageDeleted() {
final String name = randomUUID();
final Storage storage = repository.createStorage(createStorage(name, "exaddress", 8181, "address2", "path2"));
final Storage storage = repository.createStorage(
createStorage(name, ZookeeperConnection.valueOf("exhibitor://exaddress:8181/path")));
assertEquals(storage, repository.getStorage(storage.getId()).get());
repository.deleteStorage(storage.getId());
assertFalse(repository.getStorage(storage.getId()).isPresent());
}

@Test(expected = NoSuchStorageException.class)
public void testDeleteNoneExistingStorage() throws Exception {
public void testDeleteNoneExistingStorage() {
repository.deleteStorage(randomUUID());
}

@Test(expected = StorageIsUsedException.class)
public void testDeleteUsedStorage() throws Exception {
public void testDeleteUsedStorage() {
final String name = randomUUID();
final Storage storage = repository.createStorage(createStorage(name, "exaddress", 8181, "address", "path"));
final Storage storage = repository.createStorage(
createStorage(name, ZookeeperConnection.valueOf("exhibitor://exaddress:8181/path")));

final EventType eventType = eventTypeDbRepository.saveEventType(TestUtils.buildDefaultEventType());
final Timeline timeline = TimelineDbRepositoryTest.createTimeline(storage, UUID.randomUUID(), 0, "topic",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import org.junit.Before;
import org.junit.Test;
import org.zalando.nakadi.domain.EventType;
import org.zalando.nakadi.domain.Storage;
import org.zalando.nakadi.domain.storage.Storage;
import org.zalando.nakadi.domain.Timeline;
import org.zalando.nakadi.domain.storage.ZookeeperConnection;
import org.zalando.nakadi.exceptions.runtime.DuplicatedTimelineException;
import org.zalando.nakadi.utils.TestUtils;

Expand All @@ -32,8 +33,8 @@ public void setUp() throws Exception {
final StorageDbRepository sRepository = new StorageDbRepository(template, TestUtils.OBJECT_MAPPER);
final EventTypeDbRepository eRepository = new EventTypeDbRepository(template, TestUtils.OBJECT_MAPPER);

storage = sRepository.createStorage(
StorageDbRepositoryTest.createStorage(randomUUID(), "localhost", 8181, "test", "path"));
storage = sRepository.createStorage(StorageDbRepositoryTest.createStorage(
randomUUID(), ZookeeperConnection.valueOf("zookeeper://localhost:8181/test")));
testEt = eRepository.saveEventType(TestUtils.buildDefaultEventType());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import org.zalando.nakadi.config.JsonConfig;
import org.zalando.nakadi.domain.Storage;
import org.zalando.nakadi.domain.storage.KafkaConfiguration;
import org.zalando.nakadi.domain.storage.Storage;
import org.zalando.nakadi.domain.storage.ZookeeperConnection;
import org.zalando.nakadi.exceptions.runtime.DuplicatedStorageException;
import org.zalando.nakadi.repository.db.StorageDbRepository;
import org.zalando.nakadi.repository.db.TimelineDbRepository;
Expand All @@ -25,6 +27,8 @@ public abstract class BaseAT {
public static final String URL = "http://localhost:" + PORT;

protected static final String ZOOKEEPER_URL = "localhost:2181";
protected static final ZookeeperConnection ZOOKEEPER_CONNECTION =
ZookeeperConnection.valueOf("zookeeper://" + ZOOKEEPER_URL);
protected static final String KAFKA_URL = "localhost:29092";

private static final JdbcTemplate JDBC_TEMPLATE = new JdbcTemplate(
Expand All @@ -47,7 +51,7 @@ public static void createDefaultStorage() {
final Storage storage = new Storage();
storage.setId("default");
storage.setType(Storage.Type.KAFKA);
storage.setConfiguration(new Storage.KafkaConfiguration(null, null, ZOOKEEPER_URL, ""));
storage.setConfiguration(new KafkaConfiguration(ZOOKEEPER_CONNECTION));
try {
STORAGE_DB_REPOSITORY.createStorage(storage);
} catch (final DuplicatedStorageException ignore) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ public class StorageControllerAT extends BaseAT {
@Test
public void shouldChangeDefaultStorageWhenRequested() throws Exception {
given()
.body("{\"id\": \"default-test\",\"kafka_configuration\": {\"exhibitor_address\": null," +
"\"exhibitor_port\": 0,\"zk_address\": \"zookeeper:2181\",\"zk_path\": \"\"}," +
"\"storage_type\": \"kafka\"}")
.body("{" +
"\"id\": \"default-test\"," +
"\"kafka_configuration\": {" +
"\"zookeeper_connection\":{" +
"\"type\": \"zookeeper\"," +
"\"addresses\":[{\"address\":\"zookeeper\", \"port\":2181}]" +
"}" +
"},\"storage_type\": \"kafka\"}")
.contentType(JSON).post("/storages");

NakadiTestUtils.createEventTypeInNakadi(EventTypeTestBuilder.builder().name("event_a").build());
Expand Down
13 changes: 6 additions & 7 deletions src/main/java/org/zalando/nakadi/config/NakadiConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.zalando.nakadi.domain.DefaultStorage;
import org.zalando.nakadi.domain.Storage;
import org.zalando.nakadi.domain.storage.DefaultStorage;
import org.zalando.nakadi.domain.storage.KafkaConfiguration;
import org.zalando.nakadi.domain.storage.Storage;
import org.zalando.nakadi.domain.storage.ZookeeperConnection;
import org.zalando.nakadi.exceptions.runtime.DuplicatedStorageException;
import org.zalando.nakadi.exceptions.runtime.InternalNakadiException;
import org.zalando.nakadi.repository.db.StorageDbRepository;
Expand Down Expand Up @@ -46,11 +48,8 @@ public DefaultStorage defaultStorage(final StorageDbRepository storageDbReposito
final Storage storage = new Storage();
storage.setId(storageId);
storage.setType(Storage.Type.KAFKA);
storage.setConfiguration(new Storage.KafkaConfiguration(
environment.getProperty("nakadi.zookeeper.exhibitor.brokers"),
Integer.valueOf(environment.getProperty("nakadi.zookeeper.exhibitor.port", "0")),
environment.getProperty("nakadi.zookeeper.brokers"),
environment.getProperty("nakadi.zookeeper.kafkaNamespace", "")));
storage.setConfiguration(new KafkaConfiguration(
ZookeeperConnection.valueOf(environment.getProperty("nakadi.zookeeper.connectionString"))));
try {
storageDbRepository.createStorage(storage);
} catch (final DuplicatedStorageException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.zalando.nakadi.domain.EventType;
import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.domain.PartitionStatistics;
import org.zalando.nakadi.domain.Storage;
import org.zalando.nakadi.domain.storage.Storage;
import org.zalando.nakadi.domain.Timeline;
import org.zalando.nakadi.exceptions.runtime.AccessDeniedException;
import org.zalando.nakadi.exceptions.runtime.InternalNakadiException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.NativeWebRequest;
import org.zalando.nakadi.domain.Storage;
import org.zalando.nakadi.domain.storage.Storage;
import org.zalando.nakadi.exceptions.runtime.DbWriteOperationsBlockedException;
import org.zalando.nakadi.exceptions.runtime.DuplicatedStorageException;
import org.zalando.nakadi.exceptions.runtime.ForbiddenOperationException;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/zalando/nakadi/domain/Timeline.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.zalando.nakadi.domain;

import org.zalando.nakadi.domain.storage.Storage;
import org.zalando.nakadi.repository.kafka.KafkaCursor;
import org.zalando.nakadi.util.UUIDGenerator;

Expand Down
55 changes: 55 additions & 0 deletions src/main/java/org/zalando/nakadi/domain/storage/AddressPort.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.zalando.nakadi.domain.storage;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;

public class AddressPort {
private final String address;
private final int port;

public AddressPort(
@JsonProperty("address") final String address,
@JsonProperty("port") final int port) {
this.address = address;
this.port = port;
}

public String getAddress() {
return address;
}

public int getPort() {
return port;
}

public String asAddressPort() {
return address + ":" + port;
}

@Override
public String toString() {
return "AddressPort{" +
"address='" + address + '\'' +
", port=" + port +
'}';
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final AddressPort that = (AddressPort) o;
return port == that.port &&
Objects.equals(address, that.address);
}

@Override
public int hashCode() {
return Objects.hash(address, port);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.zalando.nakadi.domain;
package org.zalando.nakadi.domain.storage;

public class DefaultStorage {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.zalando.nakadi.domain.storage;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;

public class KafkaConfiguration {
private final ZookeeperConnection zookeeperConnection;

public KafkaConfiguration(
@JsonProperty(value = "zookeeper_connection") final ZookeeperConnection zookeeperConnection) {
this.zookeeperConnection = zookeeperConnection;
}

public ZookeeperConnection getZookeeperConnection() {
return zookeeperConnection;
}

@Override
public String toString() {
return "KafkaConfiguration{" + zookeeperConnection + '}';
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final KafkaConfiguration that = (KafkaConfiguration) o;
return Objects.equals(zookeeperConnection, that.zookeeperConnection);
}

@Override
public int hashCode() {
return Objects.hash(zookeeperConnection);
}
}
Loading

0 comments on commit 8796b12

Please sign in to comment.