diff --git a/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java b/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java index b910242595..2052432b78 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java @@ -67,6 +67,8 @@ public class KafkaRepositoryAT extends BaseAT { private static final String DEFAULT_ADMIN_VALUE = "nakadi"; private static final String DEFAULT_WARN_ALL_DATA_ACCESS_MESSAGE = ""; private static final String DEFAULT_WARN_LOG_COMPACTION_MESSAGE = ""; + private static final String DEFAULT_EVENT_TYPE_DELETABLE_SUBSCRIPTION_OWNING_APPLICATION = "nakadi_archiver"; + private static final String DEFAULT_EVENT_TYPE_DELETABLE_SUBSCRIPTION_CONSUMER_GROUP = "nakadi_to_s3"; private NakadiSettings nakadiSettings; private KafkaSettings kafkaSettings; @@ -93,7 +95,9 @@ public void setup() { DEFAULT_ADMIN_DATA_TYPE, DEFAULT_ADMIN_VALUE, DEFAULT_WARN_ALL_DATA_ACCESS_MESSAGE, - DEFAULT_WARN_LOG_COMPACTION_MESSAGE); + DEFAULT_WARN_LOG_COMPACTION_MESSAGE, + DEFAULT_EVENT_TYPE_DELETABLE_SUBSCRIPTION_OWNING_APPLICATION, + DEFAULT_EVENT_TYPE_DELETABLE_SUBSCRIPTION_CONSUMER_GROUP); kafkaSettings = new KafkaSettings(KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_BUFFER_MEMORY, KAFKA_LINGER_MS, KAFKA_ENABLE_AUTO_COMMIT, KAFKA_MAX_REQUEST_SIZE, diff --git a/src/main/java/org/zalando/nakadi/config/NakadiSettings.java b/src/main/java/org/zalando/nakadi/config/NakadiSettings.java index 142870192d..343cc070ed 100644 --- a/src/main/java/org/zalando/nakadi/config/NakadiSettings.java +++ b/src/main/java/org/zalando/nakadi/config/NakadiSettings.java @@ -23,6 +23,8 @@ public class NakadiSettings { private final AuthorizationAttribute defaultAdmin; private final String warnAllDataAccessMessage; private final String logCompactionWarnMessage; + private final String deletableSubscriptionOwningApplication; + private final String deletableSubscriptionConsumerGroup; @Autowired public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTopicPartitionCount, @@ -39,7 +41,11 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo @Value("${nakadi.admin.default.dataType}") final String defaultAdminDataType, @Value("${nakadi.admin.default.value}") final String defaultAdminValue, @Value("${nakadi.authz.warnAllDataAccessMessage}") final String warnAllDataAccessMessage, - @Value("${nakadi.topic.compacted.warnMessage}") final String logCompactionWarnMessage) { + @Value("${nakadi.topic.compacted.warnMessage}") final String logCompactionWarnMessage, + @Value("${nakadi.eventType.deletableSubscription.owningApplication}") + final String deletableSubscriptionOwningApplication, + @Value("${nakadi.eventType.deletableSubscription.consumerGroup}") + final String deletableSubscriptionConsumerGroup) { this.maxTopicPartitionCount = maxTopicPartitionCount; this.defaultTopicPartitionCount = defaultTopicPartitionCount; this.defaultTopicReplicaFactor = defaultTopicReplicaFactor; @@ -54,6 +60,8 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo this.defaultAdmin = new ResourceAuthorizationAttribute(defaultAdminDataType, defaultAdminValue); this.warnAllDataAccessMessage = warnAllDataAccessMessage; this.logCompactionWarnMessage = logCompactionWarnMessage; + this.deletableSubscriptionOwningApplication = deletableSubscriptionOwningApplication; + this.deletableSubscriptionConsumerGroup = deletableSubscriptionConsumerGroup; } public int getDefaultTopicPartitionCount() { @@ -111,4 +119,12 @@ public String getWarnAllDataAccessMessage() { public String getLogCompactionWarnMessage() { return logCompactionWarnMessage; } + + public String getDeletableSubscriptionOwningApplication() { + return deletableSubscriptionOwningApplication; + } + + public String getDeletableSubscriptionConsumerGroup() { + return deletableSubscriptionConsumerGroup; + } } diff --git a/src/main/java/org/zalando/nakadi/service/EventTypeService.java b/src/main/java/org/zalando/nakadi/service/EventTypeService.java index 7fc8c25496..b4dba1a16c 100644 --- a/src/main/java/org/zalando/nakadi/service/EventTypeService.java +++ b/src/main/java/org/zalando/nakadi/service/EventTypeService.java @@ -71,6 +71,7 @@ import java.util.stream.Collectors; import static org.zalando.nakadi.service.FeatureToggleService.Feature.DELETE_EVENT_TYPE_WITH_SUBSCRIPTIONS; +import static org.zalando.nakadi.service.FeatureToggleService.Feature.EVENT_TYPE_DELETION_ONLY_ADMINS; import static org.zalando.nakadi.service.FeatureToggleService.Feature.FORCE_EVENT_TYPE_AUTHZ; @Component @@ -249,7 +250,14 @@ public void delete(final String eventTypeName) throws EventTypeDeletionException authorizationValidator.authorizeEventTypeView(eventType); authorizationValidator.authorizeEventTypeAdmin(eventType); - if (featureToggleService.isFeatureEnabled(DELETE_EVENT_TYPE_WITH_SUBSCRIPTIONS)) { + if (featureToggleService.isFeatureEnabled(EVENT_TYPE_DELETION_ONLY_ADMINS)) { + if (eventType.getAuthorization() == null || hasNonDeletableSubscriptions(eventType.getName())) { + throw new AccessDeniedException(eventType.asResource()); + } + } + + if (featureToggleService.isFeatureEnabled(DELETE_EVENT_TYPE_WITH_SUBSCRIPTIONS) + || featureToggleService.isFeatureEnabled(EVENT_TYPE_DELETION_ONLY_ADMINS)) { topicsToDelete = deleteEventTypeWithSubscriptions(eventTypeName); } else { topicsToDelete = deleteEventTypeIfNoSubscriptions(eventTypeName); @@ -333,6 +341,25 @@ private boolean hasSubscriptions(final String eventTypeName) { return !subs.isEmpty(); } + private boolean hasNonDeletableSubscriptions(final String eventTypeName) { + int offset = 0; + List subs = subscriptionRepository.listSubscriptions( + ImmutableSet.of(eventTypeName), Optional.empty(), offset, 20); + while (!subs.isEmpty()) { + for (final Subscription sub : subs) { + if (!sub.getConsumerGroup().equals(nakadiSettings.getDeletableSubscriptionConsumerGroup()) + || !sub.getOwningApplication() + .equals(nakadiSettings.getDeletableSubscriptionOwningApplication())) { + return true; + } + } + offset += 20; + subs = subscriptionRepository.listSubscriptions( + ImmutableSet.of(eventTypeName), Optional.empty(), offset, 20); + } + return false; + } + public void update(final String eventTypeName, final EventTypeBase eventTypeBase) throws TopicConfigException, diff --git a/src/main/java/org/zalando/nakadi/service/FeatureToggleService.java b/src/main/java/org/zalando/nakadi/service/FeatureToggleService.java index 503eb87878..18428a4a84 100644 --- a/src/main/java/org/zalando/nakadi/service/FeatureToggleService.java +++ b/src/main/java/org/zalando/nakadi/service/FeatureToggleService.java @@ -34,6 +34,7 @@ enum Feature { DISABLE_EVENT_TYPE_CREATION("disable_event_type_creation"), DISABLE_EVENT_TYPE_DELETION("disable_event_type_deletion"), DELETE_EVENT_TYPE_WITH_SUBSCRIPTIONS("delete_event_type_with_subscriptions"), + EVENT_TYPE_DELETION_ONLY_ADMINS("event_type_deletion_only_admins"), DISABLE_SUBSCRIPTION_CREATION("disable_subscription_creation"), REMOTE_TOKENINFO("remote_tokeninfo"), KPI_COLLECTION("kpi_collection"), diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0ee3b0ae70..3ca4fdae6d 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -37,6 +37,9 @@ nakadi: dataType: service value: stups_nakadi authz.warnAllDataAccessMessage: "Data access warning" + eventType.deletableSubscription: + owningApplication: "nakadi_archiver" + consumerGroup: "nakadi_to_s3" topic: min: retentionMs: 10800000 # 3 hours diff --git a/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTestCase.java b/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTestCase.java index 4636328441..f5a4491db5 100644 --- a/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTestCase.java +++ b/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTestCase.java @@ -100,7 +100,7 @@ public void init() throws Exception { final NakadiSettings nakadiSettings = new NakadiSettings(0, 0, 0, TOPIC_RETENTION_TIME_MS, 0, 60, NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT, 0, NAKADI_EVENT_MAX_BYTES, NAKADI_SUBSCRIPTION_MAX_PARTITIONS, "service", "nakadi", "I am warning you", - "I am warning you, even more"); + "I am warning you, even more", "nakadi_archiver", "nakadi_to_s3"); final PartitionsCalculator partitionsCalculator = new KafkaConfig().createPartitionsCalculator( "t2.large", TestUtils.OBJECT_MAPPER, nakadiSettings); when(timelineService.getTopicRepository((Timeline) any())).thenReturn(topicRepository); diff --git a/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java b/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java index 1dc49a4d90..72a593e4eb 100644 --- a/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java +++ b/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java @@ -75,7 +75,7 @@ public class EventPublisherTest { private final AuthorizationValidator authzValidator = mock(AuthorizationValidator.class); private final NakadiSettings nakadiSettings = new NakadiSettings(0, 0, 0, TOPIC_RETENTION_TIME_MS, 0, 60, NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT, TIMELINE_WAIT_TIMEOUT_MS, NAKADI_EVENT_MAX_BYTES, - NAKADI_SUBSCRIPTION_MAX_PARTITIONS, "service", "nakadi", "", ""); + NAKADI_SUBSCRIPTION_MAX_PARTITIONS, "service", "nakadi", "", "", "nakadi_archiver", "nakadi_to_s3"); private final EventPublisher publisher; public EventPublisherTest() { diff --git a/src/test/java/org/zalando/nakadi/service/EventTypeServiceTest.java b/src/test/java/org/zalando/nakadi/service/EventTypeServiceTest.java index 23ebff96cc..c441591d1a 100644 --- a/src/test/java/org/zalando/nakadi/service/EventTypeServiceTest.java +++ b/src/test/java/org/zalando/nakadi/service/EventTypeServiceTest.java @@ -3,6 +3,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; +import org.assertj.core.util.Lists; import org.json.JSONObject; import org.junit.Before; import org.junit.Test; @@ -13,6 +14,7 @@ import org.zalando.nakadi.domain.EventType; import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.enrichment.Enrichment; +import org.zalando.nakadi.exceptions.runtime.AccessDeniedException; import org.zalando.nakadi.exceptions.runtime.ConflictException; import org.zalando.nakadi.exceptions.runtime.EventTypeDeletionException; import org.zalando.nakadi.exceptions.runtime.FeatureNotAvailableException; @@ -43,7 +45,9 @@ import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; import static org.zalando.nakadi.utils.TestUtils.buildDefaultEventType; +import static org.zalando.nakadi.utils.TestUtils.buildResourceAuthorization; import static org.zalando.nakadi.utils.TestUtils.checkKPIEventSubmitted; +import static org.zalando.nakadi.utils.TestUtils.createSubscription; public class EventTypeServiceTest { @@ -134,6 +138,82 @@ public void testFeatureToggleAllowsDeleteEventTypeWithSubscriptions() throws Exc // no exception should be thrown } + @Test + public void testFeatureToggleAllowsDeletEventTypeWithAuthzSectionAndDeletableSubscription() throws Exception { + final EventType eventType = buildDefaultEventType(); + eventType.setAuthorization(buildResourceAuthorization()); + + doReturn(Optional.of(eventType)).when(eventTypeRepository).findByNameO(eventType.getName()); + doReturn(ImmutableList.of(createSubscription("nakadi_archiver", "nakadi_to_s3"))) + .when(subscriptionDbRepository) + .listSubscriptions(ImmutableSet.of(eventType.getName()), Optional.empty(), 0, 20); + doReturn(ImmutableList.of(createSubscription("nakadi_archiver", "nakadi_to_s3"))) + .when(subscriptionDbRepository) + .listSubscriptions(ImmutableSet.of(eventType.getName()), Optional.empty(), 0, 1); + doReturn(Lists.emptyList()) + .when(subscriptionDbRepository) + .listSubscriptions(ImmutableSet.of(eventType.getName()), Optional.empty(), 20, 20); + doReturn("nakadi_archiver").when(nakadiSettings).getDeletableSubscriptionOwningApplication(); + doReturn("nakadi_to_s3").when(nakadiSettings).getDeletableSubscriptionConsumerGroup(); + + when(featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.EVENT_TYPE_DELETION_ONLY_ADMINS)) + .thenReturn(true); + + eventTypeService.delete(eventType.getName()); + // no exception should be thrown + } + + @Test + public void testFeatureToggleForbidsDeleteEventTypeWithoutAuthzSection() throws Exception { + final EventType eventType = buildDefaultEventType(); + + doReturn(Optional.of(eventType)).when(eventTypeRepository).findByNameO(eventType.getName()); + doReturn(ImmutableList.of(createSubscription("nakadi_archiver", "nakadi_to_s3"))) + .when(subscriptionDbRepository) + .listSubscriptions(ImmutableSet.of(eventType.getName()), Optional.empty(), 0, 20); + doReturn(Lists.emptyList()) + .when(subscriptionDbRepository) + .listSubscriptions(ImmutableSet.of(eventType.getName()), Optional.empty(), 20, 20); + doReturn("nakadi_archiver").when(nakadiSettings).getDeletableSubscriptionOwningApplication(); + doReturn("nakadi_to_s3").when(nakadiSettings).getDeletableSubscriptionConsumerGroup(); + + when(featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.EVENT_TYPE_DELETION_ONLY_ADMINS)) + .thenReturn(true); + + try { + eventTypeService.delete(eventType.getName()); + } catch (AccessDeniedException e) { + return; + } + fail("Should throw AccessDeniedException"); + } + + @Test + public void testFeatureToggleForbidsDeleteEventTypeWithNonDeletableSubscription() throws Exception { + final EventType eventType = buildDefaultEventType(); + eventType.setAuthorization(buildResourceAuthorization()); + + doReturn(Optional.of(eventType)).when(eventTypeRepository).findByNameO(eventType.getName()); + doReturn(ImmutableList.of(createSubscription("someone", "something"))) + .when(subscriptionDbRepository) + .listSubscriptions(ImmutableSet.of(eventType.getName()), Optional.empty(), 0, 20); + doReturn(Lists.emptyList()) + .when(subscriptionDbRepository) + .listSubscriptions(ImmutableSet.of(eventType.getName()), Optional.empty(), 20, 20); + doReturn("nakadi_archiver").when(nakadiSettings).getDeletableSubscriptionOwningApplication(); + doReturn("nakadi_to_s3").when(nakadiSettings).getDeletableSubscriptionConsumerGroup(); + + when(featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.EVENT_TYPE_DELETION_ONLY_ADMINS)) + .thenReturn(true); + + try { + eventTypeService.delete(eventType.getName()); + } catch (AccessDeniedException e) { + return; + } + fail("Should throw AccessDeniedException"); + } + @Test(expected = FeatureNotAvailableException.class) public void testFeatureToggleDisableLogCompaction() { final EventType eventType = buildDefaultEventType(); diff --git a/src/test/java/org/zalando/nakadi/utils/TestUtils.java b/src/test/java/org/zalando/nakadi/utils/TestUtils.java index 0fb78d6973..a5d23355a7 100644 --- a/src/test/java/org/zalando/nakadi/utils/TestUtils.java +++ b/src/test/java/org/zalando/nakadi/utils/TestUtils.java @@ -19,10 +19,12 @@ import org.zalando.nakadi.domain.BatchFactory; import org.zalando.nakadi.domain.BatchItem; import org.zalando.nakadi.domain.EventType; -import org.zalando.nakadi.domain.storage.Storage; +import org.zalando.nakadi.domain.ResourceAuthorization; import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.domain.Timeline; +import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.exceptions.runtime.AccessDeniedException; +import org.zalando.nakadi.plugin.api.authz.AuthorizationAttribute; import org.zalando.nakadi.plugin.api.authz.AuthorizationService; import org.zalando.nakadi.plugin.api.authz.Resource; import org.zalando.nakadi.problem.ValidationProblem; @@ -30,6 +32,7 @@ import org.zalando.problem.Problem; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; @@ -141,6 +144,13 @@ public static EventType buildDefaultEventType() { return EventTypeTestBuilder.builder().build(); } + public static ResourceAuthorization buildResourceAuthorization() { + final List admins = new ArrayList<>(); + final List readers = new ArrayList<>(); + final List writers = new ArrayList<>(); + return new ResourceAuthorization(admins, readers, writers); + } + public static AccessDeniedException mockAccessDeniedException() { final Resource resource = mock(Resource.class); when(resource.getName()).thenReturn("some-name"); @@ -224,6 +234,10 @@ public static List createRandomSubscriptions(final int count) { return createRandomSubscriptions(count, randomTextString()); } + public static Subscription createSubscription(final String owningApp, final String consumerGroup) { + return builder().withConsumerGroup(consumerGroup).withOwningApplication(owningApp).build(); + } + public static Timeline buildTimeline(final String etName) { return new Timeline(etName, 0, new Storage("ccc", Storage.Type.KAFKA), randomUUID(), new Date()); }