Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #584 from zalando/bugfix/aruha-683-1
Browse files Browse the repository at this point in the history
aruha-683: cache default storage
  • Loading branch information
adyach authored Mar 7, 2017
2 parents 6b2e65b + abaed57 commit 250c679
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 79 deletions.
26 changes: 26 additions & 0 deletions src/main/java/org/zalando/nakadi/config/NakadiConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,23 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.zalando.nakadi.domain.Storage;
import org.zalando.nakadi.exceptions.DuplicatedStorageIdException;
import org.zalando.nakadi.exceptions.InternalNakadiException;
import org.zalando.nakadi.plugin.api.ApplicationService;
import org.zalando.nakadi.plugin.api.ApplicationServiceFactory;
import org.zalando.nakadi.plugin.api.SystemProperties;
import org.zalando.nakadi.repository.db.StorageDbRepository;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperLockFactory;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClientFactory;
Expand Down Expand Up @@ -60,4 +66,24 @@ public ApplicationService applicationService(@Value("${nakadi.auth.plugin.factor
}
}

@Bean
@Qualifier("default_storage")
public Storage defaultStorage(final StorageDbRepository storageDbRepository,
final Environment environment) throws InternalNakadiException {
final Storage storage = new Storage();
storage.setId("default");
storage.setType(Storage.Type.KAFKA);
storage.setConfiguration(new Storage.KafkaConfiguration(
environment.getProperty("nakadi.zookeeper.exhibitor.brokers"),
Integer.valueOf(environment.getProperty("nakadi.zookeeper.exhibitor.port", "0")),
environment.getProperty("nakadi.zookeeper.brokers"),
environment.getProperty("nakadi.zookeeper.kafkaNamespace", "")));
try {
storageDbRepository.createStorage(storage);
} catch (final DuplicatedStorageIdException e) {
LOGGER.info("Creation of default storage failed: {}", e.getMessage());
}
return storage;
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.support.TransactionTemplate;
Expand Down Expand Up @@ -42,7 +43,6 @@
public class TimelineService {

private static final Logger LOG = LoggerFactory.getLogger(TimelineService.class);
private static final String DEFAULT_STORAGE = "default";

private final SecuritySettings securitySettings;
private final EventTypeCache eventTypeCache;
Expand All @@ -53,6 +53,7 @@ public class TimelineService {
private final TopicRepositoryHolder topicRepositoryHolder;
private final TransactionTemplate transactionTemplate;
private final UUIDGenerator uuidGenerator;
private final Storage defaultStorage;

@Autowired
public TimelineService(final SecuritySettings securitySettings,
Expand All @@ -63,7 +64,8 @@ public TimelineService(final SecuritySettings securitySettings,
final TimelineDbRepository timelineDbRepository,
final TopicRepositoryHolder topicRepositoryHolder,
final TransactionTemplate transactionTemplate,
final UUIDGenerator uuidGenerator) {
final UUIDGenerator uuidGenerator,
@Qualifier("default_storage") final Storage defaultStorage) {
this.securitySettings = securitySettings;
this.eventTypeCache = eventTypeCache;
this.storageDbRepository = storageDbRepository;
Expand All @@ -73,6 +75,7 @@ public TimelineService(final SecuritySettings securitySettings,
this.topicRepositoryHolder = topicRepositoryHolder;
this.transactionTemplate = transactionTemplate;
this.uuidGenerator = uuidGenerator;
this.defaultStorage = defaultStorage;
}

public void createTimeline(final String eventTypeName, final String storageId, final Client client)
Expand Down Expand Up @@ -119,10 +122,7 @@ public Timeline getTimeline(final EventTypeBase eventType) throws TimelineExcept
return activeTimeline.get();
}

final Storage storage = storageDbRepository.getStorage(DEFAULT_STORAGE)
.orElseThrow(() -> new UnableProcessException("Fake timeline creation failed for event type " +
eventType.getName() + ".No default storage defined"));
return Timeline.createFakeTimeline(eventType, storage);
return Timeline.createFakeTimeline(eventType, defaultStorage);
} catch (final NakadiException e) {
LOG.error("Failed to get timeline for event type {}", eventType.getName(), e);
throw new TimelineException("Failed to get timeline", e);
Expand All @@ -136,14 +136,7 @@ public TopicRepository getTopicRepository(final EventTypeBase eventType)
}

public TopicRepository getDefaultTopicRepository() throws TopicRepositoryException {
try {
final Storage storage = storageDbRepository.getStorage(DEFAULT_STORAGE)
.orElseThrow(() -> new UnableProcessException("No default storage defined"));
return topicRepositoryHolder.getTopicRepository(storage);
} catch (final InternalNakadiException e) {
LOG.error("Failed to get default topic repository", e);
throw new TopicRepositoryException("Failed to get timeline", e);
}
return topicRepositoryHolder.getTopicRepository(defaultStorage);
}

private void switchTimelines(final Timeline activeTimeline, final Timeline nextTimeline) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.boot.test.WebIntegrationTest;
Expand All @@ -29,7 +30,6 @@
import org.zalando.nakadi.Application;
import org.zalando.nakadi.config.SecuritySettings;
import org.zalando.nakadi.domain.Storage;
import org.zalando.nakadi.exceptions.InternalNakadiException;
import org.zalando.nakadi.metrics.EventTypeMetricRegistry;
import org.zalando.nakadi.repository.EventTypeRepository;
import org.zalando.nakadi.repository.TopicRepository;
Expand All @@ -39,7 +39,6 @@
import org.zalando.nakadi.repository.db.StorageDbRepository;
import org.zalando.nakadi.repository.db.SubscriptionDbRepository;
import org.zalando.nakadi.repository.kafka.KafkaLocationManager;
import org.zalando.nakadi.repository.tool.DefaultStorage;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder;
import org.zalando.nakadi.service.CursorsService;
import org.zalando.nakadi.service.EventPublisher;
Expand All @@ -52,7 +51,6 @@
import javax.annotation.PostConstruct;
import javax.servlet.Filter;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import static org.hamcrest.Matchers.isOneOf;
Expand Down Expand Up @@ -218,11 +216,14 @@ public TopicRepositoryHolder topicRepositoryHolder() {
}

@Bean
public DefaultStorage defaultStorage() throws InternalNakadiException {
final StorageDbRepository storageDbRepository = mock(StorageDbRepository.class);
when(storageDbRepository.getStorage("default")).thenReturn(Optional.of(new Storage()));
final DefaultStorage defaultStorage = new DefaultStorage(storageDbRepository, environment);
return defaultStorage;
@Qualifier("default_storage")
public Storage defaultStorage() {
return mock(Storage.class);
}

@Bean
public StorageDbRepository storageDbRepository() {
return mock(StorageDbRepository.class);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.zalando.nakadi.exceptions.NoSuchEventTypeException;
import org.zalando.nakadi.exceptions.NotFoundException;
import org.zalando.nakadi.exceptions.TimelineException;
import org.zalando.nakadi.exceptions.UnableProcessException;
import org.zalando.nakadi.repository.TopicRepositoryHolder;
import org.zalando.nakadi.repository.db.EventTypeCache;
import org.zalando.nakadi.repository.db.StorageDbRepository;
Expand All @@ -36,7 +35,8 @@ public class TimelineServiceTest {
private final TimelineService timelineService = new TimelineService(securitySettings, eventTypeCache,
storageDbRepository, Mockito.mock(TimelineSync.class), Mockito.mock(NakadiSettings.class),
Mockito.mock(TimelineDbRepository.class), Mockito.mock(TopicRepositoryHolder.class),
new TransactionTemplate(Mockito.mock(PlatformTransactionManager.class)), new UUIDGenerator());
new TransactionTemplate(Mockito.mock(PlatformTransactionManager.class)), new UUIDGenerator(),
new Storage());

@Test(expected = NotFoundException.class)
public void testGetTimelinesNotFound() throws Exception {
Expand Down Expand Up @@ -75,13 +75,4 @@ public void testGetFakeTimeline() throws Exception {
Assert.assertTrue(actualTimeline.isFake());
}

@Test(expected = UnableProcessException.class)
public void testGetTimelineUnableProcessException() throws Exception {
final EventType eventType = EventTypeTestBuilder.builder().build();
Mockito.when(eventTypeCache.getActiveTimeline(Matchers.any())).thenReturn(Optional.empty());
Mockito.when(storageDbRepository.getStorage("default")).thenReturn(Optional.empty());

timelineService.getTimeline(eventType);
}

}

0 comments on commit 250c679

Please sign in to comment.