Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

Commit

Permalink
fix #366: stream failure if the topic doesn't exist with Pulsar 2.8.0 (
Browse files Browse the repository at this point in the history
…#369)

(cherry picked from commit 9cd690b)
  • Loading branch information
shibd authored and jianyun8023 committed Jul 30, 2021
1 parent d057c4a commit 059de98
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.pulsar.client.impl.schema.BytesSchema;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
Expand Down Expand Up @@ -178,19 +180,21 @@ public List<String> getTopics(String ns) throws PulsarAdminException {
}

public boolean topicExists(String topicName) throws PulsarAdminException {
int partitionNum = admin.topics().getPartitionedTopicMetadata(topicName).partitions;
if (partitionNum > 0) {
return true;
} else {
admin.topics().getStats(topicName);
try {
PartitionedTopicMetadata partitionedTopicMetadata = admin.topics().getPartitionedTopicMetadata(topicName);
if (partitionedTopicMetadata.partitions > 0) {
return true;
}
} catch (PulsarAdminException.NotFoundException e) {
}
return true;
return false;
}

public void deleteTopic(String topicName) throws PulsarAdminException {
int partitionNum = admin.topics().getPartitionedTopicMetadata(topicName).partitions;
if (partitionNum > 0) {
final Optional<PersistentTopicInternalStats> any = admin.topics().getPartitionedInternalStats(topicName).partitions.entrySet()

try {
PartitionedTopicInternalStats partitionedInternalStats = admin.topics().getPartitionedInternalStats(topicName);
final Optional<PersistentTopicInternalStats> any = partitionedInternalStats.partitions.entrySet()
.stream()
.map(Map.Entry::getValue)
.filter(p -> !p.cursors.isEmpty())
Expand All @@ -199,10 +203,8 @@ public void deleteTopic(String topicName) throws PulsarAdminException {
throw new IllegalStateException(String.format("The topic[%s] cannot be deleted because there are subscribers", topicName));
}
admin.topics().deletePartitionedTopic(topicName, true);
} else {
if (!admin.topics().getInternalStats(topicName).cursors.isEmpty()) {
throw new IllegalStateException(String.format("The topic[%s] cannot be deleted because there are subscribers", topicName));
}
} catch (PulsarAdminException.NotFoundException e) {
log.warn("topic<{}> is not exit, try delete force it", topicName);
admin.topics().delete(topicName, true);
}
}
Expand Down Expand Up @@ -357,17 +359,34 @@ public Set<TopicRange> getTopicPartitions() throws PulsarAdminException {
.collect(Collectors.toSet());
}

/**
* Get topic partitions all, If the topic does not exist, it is created automatically ont partition to topic.
*
* @return allTopicPartitions
* @throws PulsarAdminException pulsarAdminException
*/
public Set<TopicRange> getTopicPartitionsAll() throws PulsarAdminException {
List<TopicRange> topics = getTopics();
HashSet<TopicRange> allTopics = new HashSet<>();
for (TopicRange topic : topics) {
int partNum = admin.topics().getPartitionedTopicMetadata(topic.getTopic()).partitions;
int partNum = 1;
try {
partNum = admin.topics().getPartitionedTopicMetadata(topic.getTopic()).partitions;
} catch (PulsarAdminException.NotFoundException e) {
log.info("topic<{}> is not exit, auto create <{}> partition to <{}>", topic.getTopic(), partNum, topic.getTopic());
try {
createTopic(topic.getTopic(), partNum);
} catch (PulsarAdminException.ConflictException conflictException) {
// multi thread may cause concurrent creation
}
}
// pulsar still has the situation of getting 0 partitions, non-partitions topic.
if (partNum == 0) {
allTopics.add(topic);
} else {
for (int i = 0; i < partNum; i++) {
final TopicRange topicRange =
new TopicRange(topic.getTopic() + PulsarOptions.PARTITION_SUFFIX + i, topic.getPulsarRange());
new TopicRange(topic.getTopic() + PulsarOptions.PARTITION_SUFFIX + i, topic.getPulsarRange());
allTopics.add(topicRange);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ public void testCaseSensitiveReaderConf() throws Exception {
List<Integer> messages =
IntStream.range(0, 50).mapToObj(t -> Integer.valueOf(t)).collect(Collectors.toList());

sendTypedMessages(tp, SchemaType.INT32, messages, Optional.empty());
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

Properties props = new Properties() {{ put(TOPIC_SINGLE_OPTION_KEY, tp); }};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.pulsar.internal;

import org.apache.flink.streaming.connectors.pulsar.PulsarTestBase;

import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class PulsarMetadataReaderTest extends PulsarTestBase {

private PulsarMetadataReader pulsarMetadataReader;

private String nonPersistTopic = TopicName.get("non-persistent", "public", "default", "NON-PERSIST-TOPIC").toString();
private String nonPartitionTopic = TopicName.get("NON-P-TOPIC").toString();
private String onePartitionTopic = TopicName.get("ONE-P-TOPIC").toString();

@Before
public void init() throws PulsarClientException {
Map<String, String> caseInsensitiveParams = new HashMap<>();
caseInsensitiveParams.put(PulsarOptions.TOPIC_MULTI_OPTION_KEY, nonPartitionTopic + "," + onePartitionTopic);
pulsarMetadataReader = new PulsarMetadataReader(adminUrl, clientConfigurationData, "subscribeName", caseInsensitiveParams, 0, 0);
}

@Test
public void getTopicPartitionsAll() throws PulsarAdminException {

createNonPartitionTopic(nonPartitionTopic);

Set<TopicRange> topicPartitionsAll = pulsarMetadataReader.getTopicPartitionsAll();
List<TopicRange> topicRanges = topicPartitionsAll.stream().collect(Collectors.toList());
for (TopicRange topicRange : topicRanges) {
if (topicRange.getTopic().contains(nonPartitionTopic)) {
Assert.assertEquals(topicRange.getTopic(), nonPartitionTopic);
} else {
Assert.assertEquals(topicRange.getTopic(), onePartitionTopic + PulsarOptions.PARTITION_SUFFIX + 0);
}
}

Assert.assertFalse(pulsarMetadataReader.topicExists(nonPartitionTopic));
Assert.assertTrue(pulsarMetadataReader.topicExists(onePartitionTopic));
}

@Test
public void topicExists() throws PulsarAdminException {
Assert.assertFalse(pulsarMetadataReader.topicExists(nonPartitionTopic));

// non-partitioned topic it doesn't exist
createNonPartitionTopic(nonPartitionTopic);
Assert.assertFalse(pulsarMetadataReader.topicExists(nonPartitionTopic));

// non-persist topic it exit
getPulsarAdmin().topics().createPartitionedTopic(nonPersistTopic, 1);
Assert.assertTrue(pulsarMetadataReader.topicExists(nonPersistTopic));
}

@After
public void clearTopic() throws PulsarAdminException {
pulsarMetadataReader.deleteTopic(nonPartitionTopic);
pulsarMetadataReader.deleteTopic(onePartitionTopic);
pulsarMetadataReader.deleteTopic(nonPersistTopic);
}

private void createNonPartitionTopic(String topicName) throws PulsarAdminException {
getPulsarAdmin().topics().createNonPartitionedTopic(topicName);
}
}

0 comments on commit 059de98

Please sign in to comment.