Skip to content

Commit

Permalink
Faster topic removal in MultiDC setup (#1884)
Browse files Browse the repository at this point in the history
  • Loading branch information
moscicky authored Aug 5, 2024
1 parent 4292174 commit a3cf6fd
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -18,6 +20,7 @@
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

public abstract class ZookeeperBasedRepository {

Expand Down Expand Up @@ -75,6 +78,13 @@ protected List<String> childrenOf(String path) {
}
}

protected List<String> childrenPathsOf(String path) {
List<String> childNodes = childrenOf(path);
return childNodes.stream()
.map(child -> ZKPaths.makePath(path, child))
.collect(Collectors.toList());
}

@SuppressWarnings("unchecked")
protected byte[] readFrom(String path) {
return readWithStatFrom(path, bytes -> bytes, (t, stat) -> {}, false).get();
Expand Down Expand Up @@ -156,6 +166,20 @@ protected void createInTransaction(String path, Object value, String childPath)
.commit();
}

protected void deleteInTransaction(List<String> paths) throws Exception {
if (paths.isEmpty()) {
throw new InternalProcessingException("Attempting to remove empty set of paths from ZK");
}
ensureConnected();
CuratorTransactionFinal transaction = zookeeper.inTransaction().delete().forPath(paths.get(0)).and();

for (int i = 1; i < paths.size(); i++) {
transaction = transaction.delete().forPath(paths.get(i)).and();
}

transaction.commit();
}

protected void create(String path, Object value) throws Exception {
ensureConnected();
zookeeper.create().forPath(path, mapper.writeValueAsBytes(value));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Group;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
import pl.allegro.tech.hermes.domain.group.GroupAlreadyExistsException;
import pl.allegro.tech.hermes.domain.group.GroupNotEmptyException;
Expand Down Expand Up @@ -65,14 +66,23 @@ public void updateGroup(Group group) {
}
}

/**
* Atomic removal of <code>group</code> and <code>group/topics</code>
* nodes is required to prevent lengthy loop during removal, see:
* {@link pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperTopicRepository#removeTopic(TopicName)}.
*/
@Override
public void removeGroup(String groupName) {
ensureGroupExists(groupName);
ensureGroupIsEmpty(groupName);

logger.info("Removing group: {}", groupName);
List<String> pathsToDelete = List.of(
paths.topicsPath(groupName),
paths.groupPath(groupName)
);
try {
remove(paths.groupPath(groupName));
deleteInTransaction(pathsToDelete);
} catch (Exception e) {
throw new InternalProcessingException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public ZookeeperMessagePreviewRepository(CuratorFramework zookeeper, ObjectMappe
@Override
public List<MessagePreview> loadPreview(TopicName topicName) {
try {
return Optional.of(paths.topicPath(topicName, ZookeeperPaths.PREVIEW_PATH))
return Optional.of(paths.topicPreviewPath(topicName))
.filter(this::pathExists)
.flatMap(p -> readFrom(p, new TypeReference<List<MessagePreview>>() {}, true))
.orElseGet(ArrayList::new);
Expand All @@ -50,7 +50,7 @@ private void persistMessage(TopicName topic, List<MessagePreview> messages) {
logger.debug("Persisting {} messages for preview of topic: {}", messages.size(), topic.qualifiedName());
try {
if (pathExists(paths.topicPath(topic))) {
String previewPath = paths.topicPath(topic, ZookeeperPaths.PREVIEW_PATH);
String previewPath = paths.topicPreviewPath(topic);
ensurePathExists(previewPath);
overwrite(previewPath, messages);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ public String topicPath(TopicName topicName, String... tail) {
return Joiner.on(URL_SEPARATOR).join(topicsPath(topicName.getGroupName()), topicName.getName(), (Object[]) tail);
}

public String topicPreviewPath(TopicName topicName) {
return topicPath(topicName, ZookeeperPaths.PREVIEW_PATH);
}

public String topicMetricsPath(TopicName topicName) {
return topicPath(topicName, METRICS_PATH);
}

public String subscriptionPath(TopicName topicName, String subscriptionName, String... tail) {
return Joiner.on(URL_SEPARATOR).join(subscriptionsPath(topicName), subscriptionName, (Object[]) tail);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import pl.allegro.tech.hermes.domain.topic.TopicNotExistsException;
import pl.allegro.tech.hermes.domain.topic.TopicRepository;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -77,12 +78,67 @@ public void createTopic(Topic topic) {
}
}

/**
* To remove topic node, we must remove topic node and its children. The tree looks like this:
* <ul>
* <li>- topic
* <li>----- /subscriptions (required)
* <li>----- /preview (optional)
* <li>----- /metrics (optional)
* <li>--------------- /volume
* <li>--------------- /published
* </ul>
*
* <p>One way to remove the whole tree for topic that would be to use <code>deletingChildrenIfNeeded()</code>:
* e.g. <code>zookeeper.delete().deletingChildrenIfNeeded().forPath(topicPath)</code>.
* However, <code>deletingChildrenIfNeeded</code> is not atomic. It first tries to remove the node <code>topic</code>
* and upon receiving <code>KeeperException.NotEmptyException</code> it tries to remove children recursively
* and then retries the node removal. This means that there is a potentially large time gap between
* removal of <code>topic/subscriptions</code> node and <code>topic</code> node, especially when topic removal is being done
* in remote DC.
*
* <p>It turns out that <code>PathChildrenCache</code> used by <code>HierarchicalCacheLevel</code> in
* Consumers and Frontend listens for <code>topics/subscriptions</code> changes and recreates that node when deleted.
* If the recreation happens between the <code>topic/subscriptions</code> and <code>topic</code> node removal
* than the whole removal process must be repeated resulting in a lengthy loop that may even result in <code>StackOverflowException</code>.
* Example of that scenario would be
* <ol>
* <li> DELETE <code>topic</code> - issued by management, fails with KeeperException.NotEmptyException
* <li> DELETE <code>topic/subscriptions</code> - issued by management, succeeds
* <li> CREATE <code>topic/subscriptions</code> - issued by frontend, succeeds
* <li> DELETE <code>topic</code> - issued by management, fails with KeeperException.NotEmptyException
* <li> [...]
* </ol>
*
* <p>To solve this we must remove <code>topic</code> and <code>topic/subscriptions</code> atomically. However, we must also remove
* other <code>topic</code> children. Transaction API does not allow for optional deletes so we:
* <ol>
* <li> find all children paths
* <li> delete all children in one transaction
* </ol>
*/
@Override
public void removeTopic(TopicName topicName) {
ensureTopicExists(topicName);
logger.info("Removing topic: " + topicName);

List<String> pathsForRemoval = new ArrayList<>();
String topicMetricsPath = paths.topicMetricsPath(topicName);
if (pathExists(topicMetricsPath)) {
pathsForRemoval.addAll(childrenPathsOf(topicMetricsPath));
pathsForRemoval.add(topicMetricsPath);
}

String topicPreviewPath = paths.topicPreviewPath(topicName);
if (pathExists(topicPreviewPath)) {
pathsForRemoval.add(topicPreviewPath);
}

pathsForRemoval.add(paths.subscriptionsPath(topicName));
pathsForRemoval.add(paths.topicPath(topicName));

try {
remove(paths.topicPath(topicName));
deleteInTransaction(pathsForRemoval);
} catch (Exception e) {
throw new InternalProcessingException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,16 @@ class ZookeeperTopicRepositoryTest extends IntegrationTest {
!repository.topicExists(new TopicName(GROUP, 'remove'))
}

def "should remove topic with metrics but without subscriptions"() {
def "should remove topic with metrics and without preview"() {
given:
def topicName = "topicWithMetrics"

repository.createTopic(topic(GROUP, topicName).build())
wait.untilTopicCreated(GROUP, topicName)

def path = pathsCompiler.compile(BASE_ZOOKEEPER_PATH + ZookeeperCounterStorage.SUBSCRIPTION_DELIVERED, pathContext()
def path = pathsCompiler.compile(BASE_ZOOKEEPER_PATH + ZookeeperCounterStorage.TOPIC_VOLUME_COUNTER, pathContext()
.withGroup(GROUP)
.withTopic(topicName)
.withSubscription("sample")
.build())
zookeeper().create().creatingParentsIfNeeded().forPath(path, '1'.bytes)
wait.untilZookeeperPathIsCreated(path)
Expand All @@ -207,6 +206,29 @@ class ZookeeperTopicRepositoryTest extends IntegrationTest {
!repository.topicExists(new TopicName(GROUP, topicName))
}

def "should remove topic with metrics and preview"() {
given: "a topic"
Topic topic = topic(GROUP, "topicWithMetricsAndPreview").build()
repository.createTopic(topic)
wait.untilTopicCreated(GROUP, topic.getName().getName())

and: "volume metric in zk for that topic"
String metricPath = paths.topicMetricPath(topic.getName(), "volume")
zookeeper().create().creatingParentsIfNeeded().forPath(metricPath, '1'.bytes)
wait.untilZookeeperPathIsCreated(metricPath)

and: "preview in zk for that topic"
String previewPath = paths.topicPreviewPath(topic.getName())
zookeeper().create().creatingParentsIfNeeded().forPath(previewPath , '1'.bytes)
wait.untilZookeeperPathIsCreated(previewPath)

when:
repository.removeTopic(topic.getName())

then:
!repository.topicExists(topic.getName())
}

def "should not throw exception on malformed topic when reading list of all topics"() {
given:
zookeeper().create().forPath(paths.topicPath(new TopicName(GROUP, 'malformed')), ''.bytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ MultiDCAwareService multiDCAwareService(KafkaNamesMappers kafkaNamesMappers, Sch
AdminClient brokerAdminClient = brokerAdminClient(kafkaProperties);
BrokerStorage storage = brokersStorage(brokerAdminClient);
BrokerTopicManagement brokerTopicManagement =
new KafkaBrokerTopicManagement(topicProperties, brokerAdminClient, kafkaNamesMapper);
new KafkaBrokerTopicManagement(topicProperties, brokerAdminClient, kafkaNamesMapper, kafkaProperties.getDatacenter());
KafkaConsumerPool consumerPool = kafkaConsumersPool(kafkaProperties, storage, kafkaProperties.getBrokerList());
KafkaRawMessageReader kafkaRawMessageReader =
new KafkaRawMessageReader(consumerPool, kafkaProperties.getKafkaConsumer().getPollTimeoutMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ private <T> void execute(RepositoryCommand<T> command, boolean isRollbackEnabled
List<DatacenterBoundRepositoryHolder<T>> executedRepoHolders = new ArrayList<>();

for (DatacenterBoundRepositoryHolder<T> repoHolder : repoHolders) {
long start = System.currentTimeMillis();
try {
executedRepoHolders.add(repoHolder);
logger.info("Executing repository command: {} in ZK dc: {}", command, repoHolder.getDatacenterName());
command.execute(repoHolder);
logger.info("Successfully executed repository command: {} in ZK dc: {} in: {} ms", command, repoHolder.getDatacenterName(), System.currentTimeMillis() - start);
} catch (RepositoryNotAvailableException e) {
logger.warn("Execute failed with an RepositoryNotAvailableException error", e);
if (isRollbackEnabled) {
Expand All @@ -58,7 +61,7 @@ private <T> void execute(RepositoryCommand<T> command, boolean isRollbackEnabled
throw ExceptionWrapper.wrapInInternalProcessingExceptionIfNeeded(e, command.toString(), repoHolder.getDatacenterName());
}
} catch (Exception e) {
logger.warn("Execute failed with an error", e);
logger.warn("Failed to execute repository command: {} in ZK dc: {} in: {} ms", command, repoHolder.getDatacenterName(), System.currentTimeMillis() - start, e);
if (isRollbackEnabled) {
rollback(executedRepoHolders, command);
}
Expand All @@ -68,9 +71,12 @@ private <T> void execute(RepositoryCommand<T> command, boolean isRollbackEnabled
}

private <T> void rollback(List<DatacenterBoundRepositoryHolder<T>> repoHolders, RepositoryCommand<T> command) {
long start = System.currentTimeMillis();
for (DatacenterBoundRepositoryHolder<T> repoHolder : repoHolders) {
logger.info("Executing rollback of repository command: {} in ZK dc: {}", command, repoHolder.getDatacenterName());
try {
command.rollback(repoHolder);
logger.info("Successfully executed rollback of repository command: {} in ZK dc: {} in: {} ms", command, repoHolder.getDatacenterName(), System.currentTimeMillis() - start);
} catch (Exception e) {
logger.error("Rollback procedure failed for command {} on DC {}", command, repoHolder.getDatacenterName(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ public void removeSubscription(TopicName topicName, String subscriptionName, Req

public void removeSubscriptionRelatedToTopic(Topic topic, RequestUser removedBy) {
List<Subscription> subscriptions = subscriptionRepository.listSubscriptions(topic.getName());

ensureSubscriptionsHaveAutoRemove(subscriptions, topic.getName());
logger.info("Removing subscriptions of topic: {}, subscriptions: {}", topic.getName(), subscriptions);
long start = System.currentTimeMillis();
subscriptions.forEach(sub -> removeSubscription(topic.getName(), sub.getName(), removedBy));
logger.info("Removed subscriptions of topic: {} in {} ms", topic.getName(), System.currentTimeMillis() - start);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,14 @@ private void removeSchema(Topic topic) {
}

private void removeTopic(Topic topic, RequestUser removedBy) {
logger.info("Removing topic: {} from ZK clusters", topic.getQualifiedName());
long start = System.currentTimeMillis();
multiDcExecutor.executeByUser(new RemoveTopicRepositoryCommand(topic.getName()), removedBy);
logger.info("Removed topic: {} from ZK clusters in: {} ms", topic.getQualifiedName(), System.currentTimeMillis() - start);
logger.info("Removing topic: {} from Kafka clusters", topic.getQualifiedName());
start = System.currentTimeMillis();
multiDCAwareService.manageTopic(brokerTopicManagement -> brokerTopicManagement.removeTopic(topic));
logger.info("Removed topic: {} from Kafka clusters in: {} ms", topic.getQualifiedName(), System.currentTimeMillis() - start);
auditor.objectRemoved(removedBy.getUsername(), topic);
topicOwnerCache.onRemovedTopic(topic);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package pl.allegro.tech.hermes.management.domain.topic.schema;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pl.allegro.tech.hermes.api.RawSchema;
Expand All @@ -24,6 +26,8 @@ public class SchemaService {
private final SchemaValidatorProvider validatorProvider;
private final TopicProperties topicProperties;

private static final Logger logger = LoggerFactory.getLogger(SchemaService.class);

@Autowired
public SchemaService(RawSchemaClient rawSchemaClient,
SchemaValidatorProvider validatorProvider,
Expand Down Expand Up @@ -68,7 +72,10 @@ public void deleteAllSchemaVersions(String qualifiedTopicName) {
if (!topicProperties.isRemoveSchema()) {
throw new SchemaRemovalDisabledException();
}
logger.info("Removing all schema versions for topic: {}", qualifiedTopicName);
long start = System.currentTimeMillis();
rawSchemaClient.deleteAllSchemaVersions(fromQualifiedName(qualifiedTopicName));
logger.info("Removed all schema versions for topic: {} in {} ms", qualifiedTopicName, System.currentTimeMillis() - start);
}

public void validateSchema(Topic topic, String schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.KafkaTopic;
Expand All @@ -32,10 +34,16 @@ public class KafkaBrokerTopicManagement implements BrokerTopicManagement {

private final KafkaNamesMapper kafkaNamesMapper;

public KafkaBrokerTopicManagement(TopicProperties topicProperties, AdminClient kafkaAdminClient, KafkaNamesMapper kafkaNamesMapper) {
private final String datacenterName;

private static final Logger logger = LoggerFactory.getLogger(KafkaBrokerTopicManagement.class);


public KafkaBrokerTopicManagement(TopicProperties topicProperties, AdminClient kafkaAdminClient, KafkaNamesMapper kafkaNamesMapper, String datacenterName) {
this.topicProperties = topicProperties;
this.kafkaAdminClient = kafkaAdminClient;
this.kafkaNamesMapper = kafkaNamesMapper;
this.datacenterName = datacenterName;
}

@Override
Expand All @@ -59,7 +67,12 @@ public void removeTopic(Topic topic) {
kafkaNamesMapper.toKafkaTopics(topic).stream()
.map(k -> kafkaAdminClient.deleteTopics(Collections.singletonList(k.name().asString())))
.map(DeleteTopicsResult::all)
.forEach(this::waitForKafkaFuture);
.forEach(future -> {
logger.info("Removing topic: {} from Kafka dc: {}", topic, datacenterName);
long start = System.currentTimeMillis();
waitForKafkaFuture(future);
logger.info("Removed topic: {} from Kafka dc: {} in {} ms", topic, datacenterName, System.currentTimeMillis() - start);
});
}

@Override
Expand Down

0 comments on commit a3cf6fd

Please sign in to comment.