Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge branch 'master' into ARUHA-473-check-size-of-events
Browse files Browse the repository at this point in the history
  • Loading branch information
Lionel Montrieux committed Jan 23, 2017
2 parents 3f2a2ac + 02a6fa1 commit 57e67d3
Show file tree
Hide file tree
Showing 18 changed files with 839 additions and 26 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ ADD build/libs/nakadi.jar nakadi.jar

EXPOSE 8080

ENTRYPOINT java -Djava.security.egd=file:/dev/./urandom -jar nakadi.jar
ENTRYPOINT exec java -Djava.security.egd=file:/dev/./urandom -jar nakadi.jar
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ curl -v -XPOST http://localhost:8080/event-types -H "Content-type: application/j
"default_statistic": {
"messages_per_minute": 1000,
"message_size": 5,
"read_parallelism": 10,
"write_parallelism": 5
"read_parallelism": 1,
"write_parallelism": 1
},
"schema": {
"type": "json_schema",
Expand Down
23 changes: 23 additions & 0 deletions database/migration/aruha-522-timelines.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
SET ROLE zalando_nakadi_data_owner;

CREATE TABLE zn_data.storage (
st_id VARCHAR(36) NOT NULL PRIMARY KEY,
st_type VARCHAR(32) NOT NULL,
st_configuration JSONB NOT NULL
);


CREATE TABLE zn_data.timeline (
tl_id UUID NOT NULL PRIMARY KEY,
et_name VARCHAR(255) NOT NULL REFERENCES zn_data.event_type (et_name),
tl_order INT NOT NULL,
st_id VARCHAR(36) NOT NULL REFERENCES zn_data.storage (st_id),
tl_topic VARCHAR(255) NOT NULL,
tl_created_at TIMESTAMP WITH TIME ZONE NOT NULL,
tl_switched_at TIMESTAMP WITH TIME ZONE DEFAULT NULL,
tl_cleanup_at TIMESTAMP WITH TIME ZONE DEFAULT NULL,
tl_latest_position JSONB DEFAULT NULL,
UNIQUE (et_name, tl_order)
);

CREATE INDEX ON zn_data.timeline (et_name);
5 changes: 5 additions & 0 deletions database/nakadi/10_data/04_tables/04_storage.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE zn_data.storage (
st_id VARCHAR(36) NOT NULL PRIMARY KEY,
st_type VARCHAR(32) NOT NULL,
st_configuration JSONB NOT NULL
);
14 changes: 14 additions & 0 deletions database/nakadi/10_data/04_tables/05_timeline.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TABLE zn_data.timeline (
tl_id UUID NOT NULL PRIMARY KEY,
et_name VARCHAR(255) NOT NULL REFERENCES zn_data.event_type (et_name),
tl_order INT NOT NULL,
st_id VARCHAR(36) NOT NULL REFERENCES zn_data.storage (st_id),
tl_topic VARCHAR(255) NOT NULL,
tl_created_at TIMESTAMP WITH TIME ZONE NOT NULL,
tl_switched_at TIMESTAMP WITH TIME ZONE DEFAULT NULL,
tl_cleanup_at TIMESTAMP WITH TIME ZONE DEFAULT NULL,
tl_latest_position JSONB DEFAULT NULL,
UNIQUE (et_name, tl_order)
);

CREATE INDEX ON zn_data.timeline (et_name);
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package org.zalando.nakadi.repository.db;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.zalando.nakadi.config.JsonConfig;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.stream.Stream;
import javax.sql.DataSource;
import org.junit.After;
import org.junit.Before;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DriverManagerDataSource;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;

import org.zalando.nakadi.config.JsonConfig;
import static org.zalando.nakadi.webservice.BaseAT.POSTGRES_PWD;
import static org.zalando.nakadi.webservice.BaseAT.POSTGRES_URL;
import static org.zalando.nakadi.webservice.BaseAT.POSTGRES_USER;
Expand All @@ -20,10 +19,10 @@ public abstract class AbstractDbRepositoryTest {
protected JdbcTemplate template;
protected Connection connection;
protected ObjectMapper mapper;
protected String[] dependencyTables;
protected String[] repositoryTables;

public AbstractDbRepositoryTest(final String[] dependencyTables) {
this.dependencyTables = dependencyTables;
public AbstractDbRepositoryTest(final String... repositoryTables) {
this.repositoryTables = repositoryTables;
}

@Before
Expand All @@ -33,21 +32,21 @@ public void setUp() {
final DataSource datasource = new DriverManagerDataSource(POSTGRES_URL, POSTGRES_USER, POSTGRES_PWD);
template = new JdbcTemplate(datasource);
connection = datasource.getConnection();
clearRepositoryTable();
} catch (SQLException e) {
clearRepositoryTables();
} catch (final SQLException e) {
e.printStackTrace();
}
}

@After
public void tearDown() throws SQLException {
clearRepositoryTable();
clearRepositoryTables();
connection.close();
}

private void clearRepositoryTable() {
for (final String table : dependencyTables) {
template.execute("DELETE FROM " + table);
}
private void clearRepositoryTables() {
Stream.of(repositoryTables)
.map(table -> "DELETE FROM " + table)
.forEach(template::execute);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class EventTypeDbRepositoryTest extends AbstractDbRepositoryTest {
private EventTypeRepository repository;

public EventTypeDbRepositoryTest() {
super(new String[]{ "zn_data.event_type_schema", "zn_data.event_type" });
super("zn_data.event_type_schema", "zn_data.event_type");
}

@Before
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package org.zalando.nakadi.repository.db;

import java.util.List;
import java.util.Optional;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.zalando.nakadi.domain.Storage;

public class StorageDbRepositoryTest extends AbstractDbRepositoryTest {
private StorageDbRepository repository;

public StorageDbRepositoryTest() {
super("zn_data.storage");
}

@Before
public void setUp() {
super.setUp();
repository = new StorageDbRepository(template, mapper);
}

static Storage createStorage(final String name, final String zkAddress, final String zkPath) {
final Storage.KafkaConfiguration config = new Storage.KafkaConfiguration();
config.setZkAddress(zkAddress);
config.setZkPath(zkPath);
final Storage storage = new Storage();
storage.setId(name);
storage.setType(Storage.Type.KAFKA);
storage.setConfiguration(config);
return storage;
}

@Test
public void testStorageCreated() {
final Storage storage = createStorage("default", "address", "path");

repository.createStorage(storage);

final Optional<Storage> createdCopy = repository.getStorage(storage.getId());
Assert.assertTrue(createdCopy.isPresent());
Assert.assertFalse(createdCopy.get() == storage);

Assert.assertEquals(storage, createdCopy.get());
}

@Test
public void testStorageOrdered() {
final Storage storage2 = repository.createStorage(createStorage("2", "address1", "path3"));
final Storage storage1 = repository.createStorage(createStorage("1", "address2", "path2"));
final Storage storage3 = repository.createStorage(createStorage("3", "address3", "path1"));

final List<Storage> storages = repository.listStorages();
Assert.assertEquals(3, storages.size());
Assert.assertEquals(storage1, storages.get(0));
Assert.assertEquals(storage2, storages.get(1));
Assert.assertEquals(storage3, storages.get(2));
}

@Test
public void testStorageDeleted() {
final Storage storage = repository.createStorage(createStorage("1", "address2", "path2"));
Assert.assertEquals(storage, repository.getStorage(storage.getId()).get());
repository.deleteStorage(storage.getId());
Assert.assertFalse(repository.getStorage(storage.getId()).isPresent());
Assert.assertEquals(0, repository.listStorages().size());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package org.zalando.nakadi.repository.db;

import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.dao.DuplicateKeyException;
import org.zalando.nakadi.domain.EventType;
import org.zalando.nakadi.domain.Storage;
import org.zalando.nakadi.domain.Timeline;
import org.zalando.nakadi.exceptions.DuplicatedEventTypeNameException;
import org.zalando.nakadi.exceptions.InternalNakadiException;
import org.zalando.nakadi.utils.TestUtils;

public class TimelineDbRepositoryTest extends AbstractDbRepositoryTest {

private TimelineDbRepository tRepository;
private StorageDbRepository sRepository;
private EventTypeDbRepository eRepository;

public TimelineDbRepositoryTest() {
super("zn_data.timeline", "zn_data.storage", "zn_data.event_type_schema", "zn_data.event_type");
}

@Before
public void setUp() {
super.setUp();
this.tRepository = new TimelineDbRepository(template, mapper);
this.sRepository = new StorageDbRepository(template, mapper);
this.eRepository = new EventTypeDbRepository(template, mapper);
}

@Test
public void testTimelineCreated() throws InternalNakadiException, DuplicatedEventTypeNameException {
final Storage storage = sRepository.createStorage(
StorageDbRepositoryTest.createStorage("default", "test", "path"));
final EventType testEt = eRepository.saveEventType(TestUtils.buildDefaultEventType());

final Timeline timeline = createTimeline(
storage, UUID.randomUUID(), 0, "test_topic", testEt.getName(),
new Date(), null, null, null);

tRepository.createTimeline(timeline);

final Optional<Timeline> fromDb = tRepository.getTimeline(timeline.getId());
Assert.assertTrue(fromDb.isPresent());
Assert.assertFalse(fromDb.get() == timeline);
Assert.assertEquals(timeline, fromDb.get());
}

@Test
public void testTimelineUpdate() throws InternalNakadiException, DuplicatedEventTypeNameException {
final Storage storage = sRepository.createStorage(
StorageDbRepositoryTest.createStorage("default", "test", "path"));
final EventType testEt = eRepository.saveEventType(TestUtils.buildDefaultEventType());

final Timeline initial = tRepository.createTimeline(createTimeline(
storage, UUID.randomUUID(), 0, "test_topic", testEt.getName(),
new Date(), null, null, null));

final Timeline modified = tRepository.getTimeline(initial.getId()).get();
modified.setCreatedAt(new Date());
modified.setCleanupAt(new Date());
modified.setSwitchedAt(new Date());
final Timeline.KafkaStoragePosition pos = new Timeline.KafkaStoragePosition();
pos.setOffsets(LongStream.range(0L, 10L).mapToObj(Long::new).collect(Collectors.toList()));
modified.setLatestPosition(pos);

tRepository.updateTimelime(modified);
final Timeline result = tRepository.getTimeline(modified.getId()).get();

Assert.assertEquals(modified, result);
Assert.assertNotEquals(initial, result);
}

@Test(expected = DuplicateKeyException.class)
public void testDuplicateOrderNotAllowed() throws InternalNakadiException, DuplicatedEventTypeNameException {
final Storage storage = sRepository.createStorage(
StorageDbRepositoryTest.createStorage("default", "test", "path"));
final EventType testEt = eRepository.saveEventType(TestUtils.buildDefaultEventType());

tRepository.createTimeline(createTimeline(
storage, UUID.randomUUID(), 0, "test_topic", testEt.getName(),
new Date(), null, null, null));
tRepository.createTimeline(createTimeline(
storage, UUID.randomUUID(), 0, "test_topic", testEt.getName(),
new Date(), null, null, null));
}

@Test
public void testListTimelinesOrdered() throws InternalNakadiException, DuplicatedEventTypeNameException {
final Storage storage = sRepository.createStorage(
StorageDbRepositoryTest.createStorage("default", "test", "path"));
final EventType testEt = eRepository.saveEventType(TestUtils.buildDefaultEventType());
final Timeline t1 = tRepository.createTimeline(createTimeline(
storage, UUID.randomUUID(), 1, "test_topic", testEt.getName(),
new Date(), null, null, null));
final Timeline t0 = tRepository.createTimeline(createTimeline(
storage, UUID.randomUUID(), 0, "test_topic", testEt.getName(),
new Date(), null, null, null));

final List<Timeline> testTimelines = tRepository.listTimelines(testEt.getName());
Assert.assertEquals(2, testTimelines.size());
Assert.assertEquals(t0, testTimelines.get(0));
Assert.assertEquals(t1, testTimelines.get(1));
}

@Test
public void testTimelineDeleted() throws InternalNakadiException, DuplicatedEventTypeNameException {
final Storage storage = sRepository.createStorage(
StorageDbRepositoryTest.createStorage("default", "test", "path"));
final EventType testEt = eRepository.saveEventType(TestUtils.buildDefaultEventType());
final Timeline t1 = tRepository.createTimeline(createTimeline(
storage, UUID.randomUUID(), 1, "test_topic", testEt.getName(),
new Date(), null, null, null));
Assert.assertEquals(1, tRepository.listTimelines(testEt.getName()).size());
tRepository.deleteTimeline(t1.getId());
Assert.assertEquals(0, tRepository.listTimelines(testEt.getName()).size());
}

private static Timeline createTimeline(
final Storage storage,
final UUID id,
final int order,
final String topic,
final String eventType,
final Date createdAt,
final Date switchedAt,
final Date cleanupAt,
final Timeline.StoragePosition latestPosition) {
final Timeline timeline = new Timeline(eventType, order, storage, topic, createdAt);
timeline.setId(id);
timeline.setSwitchedAt(switchedAt);
timeline.setCleanupAt(cleanupAt);
timeline.setLatestPosition(latestPosition);
return timeline;
}
}
Loading

0 comments on commit 57e67d3

Please sign in to comment.