Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] Topics failed to delete after remove cluster from replicated clusters set and caused OOM (#23360) #342

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions .github/actions/clean-disk/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,36 @@ runs:
if [[ "${{ inputs.mode }}" == "full" ]]; then
# remove these directories only when mode is 'full'
directories+=(/usr/share/dotnet /opt/hostedtoolcache/CodeQL)

# with help of https://github.com/apache/flink/blob/master/tools/azure-pipelines/free_disk_space.sh
echo "Listing 100 largest packages"
dpkg-query -Wf '${Installed-Size}\t${Package}\n' | sort -n | tail -n 100
df -h
echo "Removing large packages"
sudo apt-get remove -y '^dotnet-.*'
sudo apt-get remove -y '^llvm-.*'
sudo apt-get remove -y 'php.*'
sudo apt-get remove -y '^mongodb-.*'
sudo apt-get remove -y '^mysql-.*'
sudo apt-get remove -y '^microsoft-.*'
sudo apt-get remove -y '^aspnetcore-.*'
sudo apt-get remove -y azure-cli google-cloud-sdk google-chrome-stable firefox powershell mono-devel libgl1-mesa-dri
sudo apt-get autoremove -y
sudo apt-get clean
df -h
echo "Removing large directories"

sudo rm -rf /usr/share/dotnet/
sudo rm -rf /usr/local/graalvm/
sudo rm -rf /usr/local/.ghcup/
sudo rm -rf /usr/local/share/powershell
sudo rm -rf /usr/local/share/chromium
sudo rm -rf /usr/local/lib/android
sudo rm -rf /usr/local/lib/node_modules
sudo rm -rf /opt/microsoft
sudo rm -rf /opt/ghc
sudo docker image prune --all --force
df -h
fi
emptydir=/tmp/empty$$/
mkdir $emptydir
Expand All @@ -53,5 +83,7 @@ runs:
fi
echo "::group::Available diskspace"
time df -BM / /mnt
#echo "200 largest directories of depth up to 3"
#du -h -d 3 / 2>/dev/null | sort -h -r 2>/dev/null | head -n 200
echo "::endgroup::"
shell: bash
15 changes: 12 additions & 3 deletions .github/workflows/pulsar-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ jobs:
COLLECT_COVERAGE: "${{ needs.preconditions.outputs.collect_coverage }}"
DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
runs-on: ubuntu-22.04
timeout-minutes: ${{ matrix.timeout || 60 }}
timeout-minutes: ${{ matrix.timeout || 90 }}
needs: ['preconditions', 'build-and-license-check']
if: ${{ needs.preconditions.outputs.docs_only != 'true' }}
strategy:
Expand All @@ -183,7 +183,6 @@ jobs:
include:
- name: Other
group: OTHER
timeout: 75
- name: Brokers - Broker Group 1
group: BROKER_GROUP_1
- name: Brokers - Broker Group 2
Expand All @@ -200,7 +199,6 @@ jobs:
group: PROXY
- name: Pulsar IO
group: PULSAR_IO
timeout: 75
- name: Pulsar IO - Elastic Search
group: PULSAR_IO_ELASTIC
- name: Pulsar IO - Kafka Connect Adaptor
Expand Down Expand Up @@ -240,6 +238,11 @@ jobs:
distribution: 'temurin'
java-version: ${{ matrix.jdk || '17' }}

- name: Clean Disk
uses: ./.github/actions/clean-disk
with:
mode: full

- name: Install gh-actions-artifact-client.js
uses: apache/pulsar-test-infra/gh-actions-artifact-client/dist@master

Expand All @@ -257,6 +260,12 @@ jobs:
run: |
CHANGED_TESTS="${{ needs.preconditions.outputs.tests_files }}" ./build/run_unit_group.sh ${{ matrix.group }}

- name: Disk space usage
if: ${{ always() }}
run: |
echo "Free space:"
df -h

- name: Upload coverage to build artifacts
if: ${{ needs.preconditions.outputs.collect_coverage == 'true' }}
run: $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh upload_unittest_coverage_files ${{ matrix.group }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,16 @@ public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName) {
if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
return CompletableFuture.completedFuture(null);
}
return sendTopicPolicyEvent(topicName, ActionType.DELETE, null);
TopicName changeEvents = NamespaceEventsSystemTopicFactory.getEventsTopicName(topicName.getNamespaceObject());
return pulsarService.getNamespaceService().checkTopicExists(changeEvents).thenCompose(topicExistsInfo -> {
// If the system topic named "__change_events" has been deleted, it means all the data in the topic have
// been deleted, so we do not need to delete the message that we want to delete again.
if (!topicExistsInfo.isExists()) {
log.info("Skip delete topic-level policies because {} has been removed before", changeEvents);
return CompletableFuture.completedFuture(null);
}
return sendTopicPolicyEvent(topicName, ActionType.DELETE, null);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,16 @@ public NamespaceEventsSystemTopicFactory(PulsarClient client) {
}

public TopicPoliciesSystemTopicClient createTopicPoliciesSystemTopicClient(NamespaceName namespaceName) {
TopicName topicName = TopicName.get(TopicDomain.persistent.value(), namespaceName,
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
TopicName topicName = getEventsTopicName(namespaceName);
log.info("Create topic policies system topic client {}", topicName.toString());
return new TopicPoliciesSystemTopicClient(client, topicName);
}

public static TopicName getEventsTopicName(NamespaceName namespaceName) {
return TopicName.get(TopicDomain.persistent.value(), namespaceName,
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
}

public <T> TransactionBufferSnapshotBaseSystemTopicClient<T> createTransactionBufferSystemTopicClient(
TopicName systemTopicName, SystemTopicTxnBufferSnapshotService<T>
systemTopicTxnBufferSnapshotService, Class<T> schemaType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,25 @@
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -173,4 +181,42 @@ public void testDifferentTopicCreationRule(ReplicationMode replicationMode) thro
public void testReplicationCountMetrics() throws Exception {
super.testReplicationCountMetrics();
}

@Test
public void testRemoveCluster() throws Exception {
// Initialize.
final String ns1 = defaultTenant + "/" + "ns_73b1a31afce34671a5ddc48fe5ad7fc8";
final String topic = "persistent://" + ns1 + "/___tp-5dd50794-7af8-4a34-8a0b-06188052c66a";
final String topicChangeEvents = "persistent://" + ns1 + "/__change_events";
admin1.namespaces().createNamespace(ns1);
admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster1, cluster2)));
admin1.topics().createNonPartitionedTopic(topic);

// Wait for loading topic up.
Producer<String> p = client1.newProducer(Schema.STRING).topic(topic).create();
Awaitility.await().untilAsserted(() -> {
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> tps
= pulsar1.getBrokerService().getTopics();
assertTrue(tps.containsKey(topic));
assertTrue(tps.containsKey(topicChangeEvents));
});

// The topics under the namespace of the cluster-1 will be deleted.
// Verify the result.
admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster2)));
Awaitility.await().atMost(Duration.ofSeconds(120)).untilAsserted(() -> {
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> tps
= pulsar1.getBrokerService().getTopics();
assertFalse(tps.containsKey(topic));
assertFalse(tps.containsKey(topicChangeEvents));
assertFalse(pulsar1.getNamespaceService().checkTopicExists(TopicName.get(topic)).join().isExists());
assertFalse(pulsar1.getNamespaceService()
.checkTopicExists(TopicName.get(topicChangeEvents)).join().isExists());
});

// cleanup.
p.close();
admin2.topics().delete(topic);
admin2.namespaces().deleteNamespace(ns1);
}
}
Loading