diff --git a/src/main/java/org/zalando/nakadi/config/NakadiConfig.java b/src/main/java/org/zalando/nakadi/config/NakadiConfig.java index 4eca62d6c1..130106b154 100644 --- a/src/main/java/org/zalando/nakadi/config/NakadiConfig.java +++ b/src/main/java/org/zalando/nakadi/config/NakadiConfig.java @@ -3,17 +3,23 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.BeanCreationException; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.env.Environment; import org.springframework.core.io.DefaultResourceLoader; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.annotation.EnableScheduling; +import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.exceptions.DuplicatedStorageIdException; +import org.zalando.nakadi.exceptions.InternalNakadiException; import org.zalando.nakadi.plugin.api.ApplicationService; import org.zalando.nakadi.plugin.api.ApplicationServiceFactory; import org.zalando.nakadi.plugin.api.SystemProperties; +import org.zalando.nakadi.repository.db.StorageDbRepository; import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; import org.zalando.nakadi.repository.zookeeper.ZooKeeperLockFactory; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClientFactory; @@ -60,4 +66,24 @@ public ApplicationService applicationService(@Value("${nakadi.auth.plugin.factor } } + @Bean + @Qualifier("default_storage") + public Storage defaultStorage(final StorageDbRepository storageDbRepository, + final Environment environment) throws InternalNakadiException { + final Storage storage = new Storage(); + storage.setId("default"); + storage.setType(Storage.Type.KAFKA); + storage.setConfiguration(new Storage.KafkaConfiguration( + environment.getProperty("nakadi.zookeeper.exhibitor.brokers"), + Integer.valueOf(environment.getProperty("nakadi.zookeeper.exhibitor.port", "0")), + environment.getProperty("nakadi.zookeeper.brokers"), + environment.getProperty("nakadi.zookeeper.kafkaNamespace", ""))); + try { + storageDbRepository.createStorage(storage); + } catch (final DuplicatedStorageIdException e) { + LOGGER.info("Creation of default storage failed: {}", e.getMessage()); + } + return storage; + } + } diff --git a/src/main/java/org/zalando/nakadi/repository/tool/DefaultStorage.java b/src/main/java/org/zalando/nakadi/repository/tool/DefaultStorage.java deleted file mode 100644 index 7c68cb49ef..0000000000 --- a/src/main/java/org/zalando/nakadi/repository/tool/DefaultStorage.java +++ /dev/null @@ -1,46 +0,0 @@ -package org.zalando.nakadi.repository.tool; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; -import org.springframework.core.env.Environment; -import org.springframework.stereotype.Service; -import org.zalando.nakadi.domain.Storage; -import org.zalando.nakadi.exceptions.DuplicatedStorageIdException; -import org.zalando.nakadi.exceptions.InternalNakadiException; -import org.zalando.nakadi.repository.db.StorageDbRepository; - -@Service -public class DefaultStorage implements ApplicationRunner { - - private static final Logger LOG = LoggerFactory.getLogger(DefaultStorage.class); - private static final String DEFAULT_STORAGE = "default"; - private final Environment environment; - private final StorageDbRepository storageDbRepository; - - @Autowired - public DefaultStorage(final StorageDbRepository storageDbRepository, - final Environment environment) { - this.storageDbRepository = storageDbRepository; - this.environment = environment; - } - - @Override - public void run(final ApplicationArguments args) throws InternalNakadiException { - final Storage storage = new Storage(); - storage.setId(DEFAULT_STORAGE); - storage.setType(Storage.Type.KAFKA); - storage.setConfiguration(new Storage.KafkaConfiguration( - environment.getProperty("nakadi.zookeeper.exhibitor.brokers"), - Integer.valueOf(environment.getProperty("nakadi.zookeeper.exhibitor.port", "0")), - environment.getProperty("nakadi.zookeeper.brokers"), - environment.getProperty("nakadi.zookeeper.kafkaNamespace", ""))); - try { - storageDbRepository.createStorage(storage); - } catch (final DuplicatedStorageIdException e) { - LOG.info("Creation of default storage failed: {}", e.getMessage()); - } - } -} diff --git a/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java b/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java index 01d02e9c56..affc53d6cf 100644 --- a/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java +++ b/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java @@ -3,6 +3,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionException; import org.springframework.transaction.support.TransactionTemplate; @@ -42,7 +43,6 @@ public class TimelineService { private static final Logger LOG = LoggerFactory.getLogger(TimelineService.class); - private static final String DEFAULT_STORAGE = "default"; private final SecuritySettings securitySettings; private final EventTypeCache eventTypeCache; @@ -53,6 +53,7 @@ public class TimelineService { private final TopicRepositoryHolder topicRepositoryHolder; private final TransactionTemplate transactionTemplate; private final UUIDGenerator uuidGenerator; + private final Storage defaultStorage; @Autowired public TimelineService(final SecuritySettings securitySettings, @@ -63,7 +64,8 @@ public TimelineService(final SecuritySettings securitySettings, final TimelineDbRepository timelineDbRepository, final TopicRepositoryHolder topicRepositoryHolder, final TransactionTemplate transactionTemplate, - final UUIDGenerator uuidGenerator) { + final UUIDGenerator uuidGenerator, + @Qualifier("default_storage") final Storage defaultStorage) { this.securitySettings = securitySettings; this.eventTypeCache = eventTypeCache; this.storageDbRepository = storageDbRepository; @@ -73,6 +75,7 @@ public TimelineService(final SecuritySettings securitySettings, this.topicRepositoryHolder = topicRepositoryHolder; this.transactionTemplate = transactionTemplate; this.uuidGenerator = uuidGenerator; + this.defaultStorage = defaultStorage; } public void createTimeline(final String eventTypeName, final String storageId, final Client client) @@ -119,10 +122,7 @@ public Timeline getTimeline(final EventTypeBase eventType) throws TimelineExcept return activeTimeline.get(); } - final Storage storage = storageDbRepository.getStorage(DEFAULT_STORAGE) - .orElseThrow(() -> new UnableProcessException("Fake timeline creation failed for event type " + - eventType.getName() + ".No default storage defined")); - return Timeline.createFakeTimeline(eventType, storage); + return Timeline.createFakeTimeline(eventType, defaultStorage); } catch (final NakadiException e) { LOG.error("Failed to get timeline for event type {}", eventType.getName(), e); throw new TimelineException("Failed to get timeline", e); @@ -136,14 +136,7 @@ public TopicRepository getTopicRepository(final EventTypeBase eventType) } public TopicRepository getDefaultTopicRepository() throws TopicRepositoryException { - try { - final Storage storage = storageDbRepository.getStorage(DEFAULT_STORAGE) - .orElseThrow(() -> new UnableProcessException("No default storage defined")); - return topicRepositoryHolder.getTopicRepository(storage); - } catch (final InternalNakadiException e) { - LOG.error("Failed to get default topic repository", e); - throw new TopicRepositoryException("Failed to get timeline", e); - } + return topicRepositoryHolder.getTopicRepository(defaultStorage); } private void switchTimelines(final Timeline activeTimeline, final Timeline nextTimeline) { diff --git a/src/test/java/org/zalando/nakadi/security/AuthenticationTest.java b/src/test/java/org/zalando/nakadi/security/AuthenticationTest.java index 14ba122f2d..3e9c52060a 100644 --- a/src/test/java/org/zalando/nakadi/security/AuthenticationTest.java +++ b/src/test/java/org/zalando/nakadi/security/AuthenticationTest.java @@ -8,6 +8,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.SpringApplicationConfiguration; import org.springframework.boot.test.WebIntegrationTest; @@ -29,7 +30,6 @@ import org.zalando.nakadi.Application; import org.zalando.nakadi.config.SecuritySettings; import org.zalando.nakadi.domain.Storage; -import org.zalando.nakadi.exceptions.InternalNakadiException; import org.zalando.nakadi.metrics.EventTypeMetricRegistry; import org.zalando.nakadi.repository.EventTypeRepository; import org.zalando.nakadi.repository.TopicRepository; @@ -39,7 +39,6 @@ import org.zalando.nakadi.repository.db.StorageDbRepository; import org.zalando.nakadi.repository.db.SubscriptionDbRepository; import org.zalando.nakadi.repository.kafka.KafkaLocationManager; -import org.zalando.nakadi.repository.tool.DefaultStorage; import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; import org.zalando.nakadi.service.CursorsService; import org.zalando.nakadi.service.EventPublisher; @@ -52,7 +51,6 @@ import javax.annotation.PostConstruct; import javax.servlet.Filter; import java.util.List; -import java.util.Optional; import java.util.Set; import static org.hamcrest.Matchers.isOneOf; @@ -218,11 +216,14 @@ public TopicRepositoryHolder topicRepositoryHolder() { } @Bean - public DefaultStorage defaultStorage() throws InternalNakadiException { - final StorageDbRepository storageDbRepository = mock(StorageDbRepository.class); - when(storageDbRepository.getStorage("default")).thenReturn(Optional.of(new Storage())); - final DefaultStorage defaultStorage = new DefaultStorage(storageDbRepository, environment); - return defaultStorage; + @Qualifier("default_storage") + public Storage defaultStorage() { + return mock(Storage.class); + } + + @Bean + public StorageDbRepository storageDbRepository() { + return mock(StorageDbRepository.class); } } diff --git a/src/test/java/org/zalando/nakadi/service/timeline/TimelineServiceTest.java b/src/test/java/org/zalando/nakadi/service/timeline/TimelineServiceTest.java index 2b4c33e95a..415e0193b2 100644 --- a/src/test/java/org/zalando/nakadi/service/timeline/TimelineServiceTest.java +++ b/src/test/java/org/zalando/nakadi/service/timeline/TimelineServiceTest.java @@ -15,7 +15,6 @@ import org.zalando.nakadi.exceptions.NoSuchEventTypeException; import org.zalando.nakadi.exceptions.NotFoundException; import org.zalando.nakadi.exceptions.TimelineException; -import org.zalando.nakadi.exceptions.UnableProcessException; import org.zalando.nakadi.repository.TopicRepositoryHolder; import org.zalando.nakadi.repository.db.EventTypeCache; import org.zalando.nakadi.repository.db.StorageDbRepository; @@ -36,7 +35,8 @@ public class TimelineServiceTest { private final TimelineService timelineService = new TimelineService(securitySettings, eventTypeCache, storageDbRepository, Mockito.mock(TimelineSync.class), Mockito.mock(NakadiSettings.class), Mockito.mock(TimelineDbRepository.class), Mockito.mock(TopicRepositoryHolder.class), - new TransactionTemplate(Mockito.mock(PlatformTransactionManager.class)), new UUIDGenerator()); + new TransactionTemplate(Mockito.mock(PlatformTransactionManager.class)), new UUIDGenerator(), + new Storage()); @Test(expected = NotFoundException.class) public void testGetTimelinesNotFound() throws Exception { @@ -75,13 +75,4 @@ public void testGetFakeTimeline() throws Exception { Assert.assertTrue(actualTimeline.isFake()); } - @Test(expected = UnableProcessException.class) - public void testGetTimelineUnableProcessException() throws Exception { - final EventType eventType = EventTypeTestBuilder.builder().build(); - Mockito.when(eventTypeCache.getActiveTimeline(Matchers.any())).thenReturn(Optional.empty()); - Mockito.when(storageDbRepository.getStorage("default")).thenReturn(Optional.empty()); - - timelineService.getTimeline(eventType); - } - } \ No newline at end of file