Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #562 from zalando/feature/aruha-527
Browse files Browse the repository at this point in the history
aruha-527: CRUD timelines
  • Loading branch information
adyach authored Mar 6, 2017
2 parents 8c750b1 + 8e7e8c5 commit 6b0fbb0
Show file tree
Hide file tree
Showing 65 changed files with 1,810 additions and 351 deletions.
138 changes: 135 additions & 3 deletions api/nakadi-event-bus-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,135 @@ paths:
schema:
$ref: '#/definitions/Problem'

/event-types/{name}/timelines:
post:
tags:
- timelines-api
description: |
Creates a new timeline for an event type and makes it active.
The oauth resource owner username has to be equal to 'nakadi.oauth2.adminClientId' property
to be able to access this endpoint.
parameters:
- name: name
in: path
description: Name of the EventType
type: string
required: true
- name: timeline_request
in: body
schema:
description: Storage id to be used for timeline creation
type: object
properties:
storage_id:
type: string
required:
- storage_id
required: true
responses:
'201':
description: New timeline is created and in use
'404':
description: No such event type
schema:
$ref: '#/definitions/Problem'
'422':
description: Unprocessable entity due to non existing storage
schema:
$ref: '#/definitions/Problem'
'403':
description: Access forbidden
schema:
$ref: '#/definitions/Problem'
get:
tags:
- timelines-api
description: |
List timelines for a given event type.
The oauth resource owner username has to be equal to 'nakadi.oauth2.adminClientId' property
to be able to access this endpoint.
parameters:
- name: name
in: path
description: Name of the EventType to list timelines for.
type: string
required: true
responses:
'200':
description: OK
schema:
type: array
description: list of timelines.
items:
type: object
properties:
id:
type: string
format: uuid
event_type:
type: string
order:
type: integer
storage_id:
type: string
topic:
type: string
created_at:
type: string
format: date-time
switched_at:
type: string
format: date-time
cleaned_up_at:
type: string
format: date-time
latest_position:
type: object
'404':
description: No such event type
schema:
$ref: '#/definitions/Problem'
'403':
description: Access forbidden
schema:
$ref: '#/definitions/Problem'

/event-types/{name}/timelines/{id}:
delete:
tags:
- timelines-api
description: |
Deletes a timeline if there is only one timeline.
The oauth resource owner username has to be equal to 'nakadi.oauth2.adminClientId' property
to be able to access this endpoint.
parameters:
- name: name
in: path
description: Name of the EventType.
type: string
required: true
- name: id
in: path
description: timeline id
type: string
format: uuid
required: true
responses:
'200':
description: Timeline was deleted
'404':
description: No such event type or timeline id
schema:
$ref: '#/definitions/Problem'
'422':
description: could not delete timeline, because there is more than one timeline
schema:
$ref: '#/definitions/Problem'
'403':
description: Access forbidden
schema:
$ref: '#/definitions/Problem'

# ################################### #
# #
# Definitions #
Expand All @@ -1269,15 +1398,18 @@ definitions:
description: configuration settings for kafka storage. Only necessary if the storage type is 'kafka'
type: object
properties:
exhibitor_address:
description: the Zookeeper address
type: string
exhibitor_port:
description: the Zookeeper path
type: string
zk_address:
description: the Zookeeper address
type: string
zk_path:
description: the Zookeeper path
type: string
required:
- zk_address
- zk_path
required:
- id
- storage_type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,13 @@ public void setUp() {
eventTypeDbRepository = new EventTypeDbRepository(template, mapper);
}

static Storage createStorage(final String name, final String zkAddress, final String zkPath) {
final Storage.KafkaConfiguration config = new Storage.KafkaConfiguration(zkAddress, zkPath);
static Storage createStorage(final String name,
final String exhibitorAddress,
final int exhibitorPort,
final String zkAddress,
final String zkPath) {
final Storage.KafkaConfiguration config =
new Storage.KafkaConfiguration(exhibitorAddress, exhibitorPort, zkAddress, zkPath);
final Storage storage = new Storage();
storage.setId(name);
storage.setType(Storage.Type.KAFKA);
Expand All @@ -45,7 +50,7 @@ static Storage createStorage(final String name, final String zkAddress, final St

@Test
public void testStorageCreated() throws Exception {
final Storage storage = createStorage("default", "address", "path");
final Storage storage = createStorage("default", "exaddress", 8181, "address", "path");

repository.createStorage(storage);

Expand All @@ -58,9 +63,9 @@ public void testStorageCreated() throws Exception {

@Test
public void testStorageOrdered() throws Exception {
final Storage storage2 = repository.createStorage(createStorage("2", "address1", "path3"));
final Storage storage1 = repository.createStorage(createStorage("1", "address2", "path2"));
final Storage storage3 = repository.createStorage(createStorage("3", "address3", "path1"));
final Storage storage2 = repository.createStorage(createStorage("2", "exaddress", 8181, "address1", "path3"));
final Storage storage1 = repository.createStorage(createStorage("1", "exaddress", 8181, "address2", "path2"));
final Storage storage3 = repository.createStorage(createStorage("3", "exaddress", 8181, "address3", "path1"));

final List<Storage> storages = repository.listStorages();
assertEquals(3, storages.size());
Expand All @@ -71,7 +76,7 @@ public void testStorageOrdered() throws Exception {

@Test
public void testStorageDeleted() throws Exception {
final Storage storage = repository.createStorage(createStorage("1", "address2", "path2"));
final Storage storage = repository.createStorage(createStorage("1", "exaddress", 8181, "address2", "path2"));
assertEquals(storage, repository.getStorage(storage.getId()).get());
repository.deleteStorage(storage.getId());
assertFalse(repository.getStorage(storage.getId()).isPresent());
Expand All @@ -80,13 +85,13 @@ public void testStorageDeleted() throws Exception {

@Test
public void testIsStorageUsedNo() throws Exception {
repository.createStorage(createStorage("s1", "address1", "path1"));
repository.createStorage(createStorage("s1", "exaddress", 8181, "address1", "path1"));
assertFalse(repository.isStorageUsed("s1"));
}

@Test
public void testIsStorageUsedYes() throws Exception {
final Storage storage1 = repository.createStorage(createStorage("s2", "address1", "path1"));
final Storage storage1 = repository.createStorage(createStorage("s2", "exaddress", 8181, "address1", "path1"));
final EventType testEt = eventTypeDbRepository.saveEventType(TestUtils.buildDefaultEventType());
final Timeline timeline = createTimeline(
storage1, UUID.randomUUID(), 0, "test_topic", testEt.getName(),
Expand All @@ -108,7 +113,7 @@ private static Timeline createTimeline(
final Timeline timeline = new Timeline(eventType, order, storage, topic, createdAt);
timeline.setId(id);
timeline.setSwitchedAt(switchedAt);
timeline.setCleanupAt(cleanupAt);
timeline.setCleanedUpAt(cleanupAt);
timeline.setLatestPosition(latestPosition);
return timeline;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
package org.zalando.nakadi.repository.db;

import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -18,6 +12,13 @@
import org.zalando.nakadi.exceptions.InternalNakadiException;
import org.zalando.nakadi.utils.TestUtils;

import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

public class TimelineDbRepositoryTest extends AbstractDbRepositoryTest {

private TimelineDbRepository tRepository;
Expand All @@ -40,7 +41,7 @@ public void setUp() {
public void testTimelineCreated()
throws InternalNakadiException, DuplicatedEventTypeNameException, DuplicatedStorageIdException {
final Storage storage = sRepository.createStorage(
StorageDbRepositoryTest.createStorage("default", "test", "path"));
StorageDbRepositoryTest.createStorage("default", "localhost", 8181, "test", "path"));
final EventType testEt = eRepository.saveEventType(TestUtils.buildDefaultEventType());

final Timeline timeline = createTimeline(
Expand All @@ -59,7 +60,7 @@ public void testTimelineCreated()
public void testTimelineUpdate()
throws InternalNakadiException, DuplicatedEventTypeNameException, DuplicatedStorageIdException {
final Storage storage = sRepository.createStorage(
StorageDbRepositoryTest.createStorage("default", "test", "path"));
StorageDbRepositoryTest.createStorage("default", "localhost", 8181, "test", "path"));
final EventType testEt = eRepository.saveEventType(TestUtils.buildDefaultEventType());

final Timeline initial = tRepository.createTimeline(createTimeline(
Expand All @@ -68,7 +69,7 @@ public void testTimelineUpdate()

final Timeline modified = tRepository.getTimeline(initial.getId()).get();
modified.setCreatedAt(new Date());
modified.setCleanupAt(new Date());
modified.setCleanedUpAt(new Date());
modified.setSwitchedAt(new Date());
final Timeline.KafkaStoragePosition pos = new Timeline.KafkaStoragePosition();
pos.setOffsets(LongStream.range(0L, 10L).mapToObj(Long::new).collect(Collectors.toList()));
Expand All @@ -85,7 +86,7 @@ public void testTimelineUpdate()
public void testDuplicateOrderNotAllowed()
throws InternalNakadiException, DuplicatedEventTypeNameException, DuplicatedStorageIdException {
final Storage storage = sRepository.createStorage(
StorageDbRepositoryTest.createStorage("default", "test", "path"));
StorageDbRepositoryTest.createStorage("default", "localhost", 8181, "test", "path"));
final EventType testEt = eRepository.saveEventType(TestUtils.buildDefaultEventType());

tRepository.createTimeline(createTimeline(
Expand All @@ -100,7 +101,7 @@ public void testDuplicateOrderNotAllowed()
public void testListTimelinesOrdered()
throws InternalNakadiException, DuplicatedEventTypeNameException, DuplicatedStorageIdException {
final Storage storage = sRepository.createStorage(
StorageDbRepositoryTest.createStorage("default", "test", "path"));
StorageDbRepositoryTest.createStorage("default", "localhost", 8181, "test", "path"));
final EventType testEt = eRepository.saveEventType(TestUtils.buildDefaultEventType());
final Timeline t1 = tRepository.createTimeline(createTimeline(
storage, UUID.randomUUID(), 1, "test_topic", testEt.getName(),
Expand All @@ -119,7 +120,7 @@ public void testListTimelinesOrdered()
public void testTimelineDeleted()
throws InternalNakadiException, DuplicatedEventTypeNameException, DuplicatedStorageIdException {
final Storage storage = sRepository.createStorage(
StorageDbRepositoryTest.createStorage("default", "test", "path"));
StorageDbRepositoryTest.createStorage("default", "localhost", 8181, "test", "path"));
final EventType testEt = eRepository.saveEventType(TestUtils.buildDefaultEventType());
final Timeline t1 = tRepository.createTimeline(createTimeline(
storage, UUID.randomUUID(), 1, "test_topic", testEt.getName(),
Expand All @@ -142,7 +143,7 @@ private static Timeline createTimeline(
final Timeline timeline = new Timeline(eventType, order, storage, topic, createdAt);
timeline.setId(id);
timeline.setSwitchedAt(switchedAt);
timeline.setCleanupAt(cleanupAt);
timeline.setCleanedUpAt(cleanupAt);
timeline.setLatestPosition(latestPosition);
return timeline;
}
Expand Down
28 changes: 24 additions & 4 deletions src/acceptance-test/java/org/zalando/nakadi/webservice/BaseAT.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,19 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.restassured.RestAssured;
import com.jayway.restassured.parsing.Parser;
import org.junit.BeforeClass;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import org.zalando.nakadi.config.JsonConfig;
import org.zalando.nakadi.domain.EventType;
import org.zalando.nakadi.domain.Storage;
import org.zalando.nakadi.exceptions.NoSuchEventTypeException;
import org.zalando.nakadi.repository.db.EventTypeDbRepository;
import org.junit.BeforeClass;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import org.zalando.nakadi.repository.db.StorageDbRepository;
import org.zalando.nakadi.repository.db.TimelineDbRepository;
import org.zalando.nakadi.utils.EventTypeTestBuilder;

import java.util.Optional;

import static org.zalando.nakadi.utils.TestUtils.buildDefaultEventType;

Expand All @@ -27,20 +33,25 @@ public abstract class BaseAT {

protected static final String EVENT_TYPE_NAME = "test-event-type-name";
protected static final String TEST_TOPIC = "test-topic";
protected static final EventType EVENT_TYPE = EventTypeTestBuilder.builder()
.name(EVENT_TYPE_NAME)
.topic(TEST_TOPIC).build();
protected static final int PARTITIONS_NUM = 8;

private static final JdbcTemplate JDBC_TEMPLATE = new JdbcTemplate(
new DriverManagerDataSource(POSTGRES_URL, POSTGRES_USER, POSTGRES_PWD));
private static final ObjectMapper MAPPER = (new JsonConfig()).jacksonObjectMapper();
protected static final EventTypeDbRepository EVENT_TYPE_REPO = new EventTypeDbRepository(JDBC_TEMPLATE, MAPPER);
protected static final StorageDbRepository STORAGE_DB_REPOSITORY = new StorageDbRepository(JDBC_TEMPLATE, MAPPER);
protected static final TimelineDbRepository TIMELINE_REPOSITORY = new TimelineDbRepository(JDBC_TEMPLATE, MAPPER);

static {
RestAssured.port = PORT;
RestAssured.defaultParser = Parser.JSON;
}

@BeforeClass
public static void createTestEventType() throws Exception {
public static void initDB() throws Exception {
try {
EVENT_TYPE_REPO.findByName(EVENT_TYPE_NAME);
} catch (final NoSuchEventTypeException e) {
Expand All @@ -49,5 +60,14 @@ public static void createTestEventType() throws Exception {
eventType.setTopic(TEST_TOPIC);
EVENT_TYPE_REPO.saveEventType(eventType);
}

final Optional<Storage> defaultStorage = STORAGE_DB_REPOSITORY.getStorage("default");
if (!defaultStorage.isPresent()) {
final Storage storage = new Storage();
storage.setId("default");
storage.setType(Storage.Type.KAFKA);
storage.setConfiguration(new Storage.KafkaConfiguration(null, null, ZOOKEEPER_URL, ""));
STORAGE_DB_REPOSITORY.createStorage(storage);
}
}
}
Loading

0 comments on commit 6b0fbb0

Please sign in to comment.