Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE: #40]Finish producer and consumer features in client, 2 sql filter features and a tag filter feature. And fix some bugs #41

Merged
merged 16 commits into from
Jun 9, 2023
Merged
199 changes: 199 additions & 0 deletions bdd/src/main/java/org/apache/rocketmq/ClientInitStepdefs.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,203 @@ public void setKeyValue(String arg0, String arg1) {
@And("Set messageProperty {string} to {string} and {string} to {string}")
public void setMessagePropertyToAndTo(String arg0, String arg1, String arg2, String arg3) {
}

@When("Create a PushConsumer, set the Endpoint\\({string}), ConsumerGroup\\({string}), Topic\\({string}), filterExpression\\(SubExpression:{string}, FilterExpressionType:{string}), and MessageListener\\({string})")
public void createAPushConsumerSetTheEndpointConsumerGroupTopicFilterExpressionSubExpressionFilterExpressionTypeAndMessageListener(String arg0, String arg1, String arg2, String arg3, String arg4, String arg5) {

}

@Then("Create a message, including the Topic\\({string}), Tag\\({string}), Key\\({string}), Body\\({string}), and msgProps\\(regionId:{string}, price:{string})")
public void createAMessageIncludingTheTopicTagKeyBodyAndMsgPropsRegionIdPrice(String arg0, String arg1, String arg2, String arg3, String arg4, String arg5) {

}

@And("Send {string} messages with msgProps\\(price:{string}) {string}")
public void sendMessagesWithMsgPropsPrice(String arg0, String arg1, String arg2) {

}

@Then("Check only all messages with msgProps\\(price:{string}) are consumed")
public void checkOnlyAllMessagesWithMsgPropsPriceAreConsumed(String arg0) {

}

@And("Check PushConsumer consumes {int} messages")
public void checkPushConsumerConsumesMessages(int arg0) {

}

@Then("Create a message, including the Topic\\({string}), Body\\({string}), messageGroup\\({string}), and msgProps\\(regionId:{string}, price:{string})")
public void createAMessageIncludingTheTopicBodyMessageGroupAndMsgPropsRegionIdPrice(String arg0, String arg1, String arg2, String arg3, String arg4) {

}

@And("Check the order of received messages consistent with the order of pre-sent messages")
public void checkTheOrderOfReceivedMessagesConsistentWithTheOrderOfPreSentMessages() {

}

@When("Create a PushConsumer, set the Endpoint\\({string}), ConsumerGroup\\({string}), Topic\\({string}), filterExpression\\({string}), and MessageListener\\({string})")
public void createAPushConsumerSetTheEndpointConsumerGroupTopicFilterExpressionAndMessageListener(String arg0, String arg1, String arg2, String arg3, String arg4) {

}

@And("Send {string} messages with Tag\\({string}) {string}")
public void sendMessagesWithTag(String arg0, String arg1, String arg2) {

}

@And("Create a SimpleConsumer, set the Endpoint\\({string}), Topic\\({string}), ConsumerGroup\\({string}), FilterExpressions\\({string}), Duration\\({string})")
public void createASimpleConsumerSetTheEndpointTopicConsumerGroupFilterExpressionsDuration(String arg0, String arg1, String arg2, String arg3, String arg4) {

}

@Then("Check SimpleConsumer pull a message once")
public void checkSimpleConsumerPullAMessageOnce() {

}

@And("SimpleConsumer invokes receive method {string} and returns acks {string}")
public void simpleconsumerInvokesReceiveMethodAndReturnsAcks(String arg0, String arg1) {

}

@Then("Check all messages are pulled by SimpleConsumer {string}")
public void checkAllMessagesArePulledBySimpleConsumer(String arg0) {

}

@When("Create a {string}, set the ClientConfiguration\\(Endpoint:{string}), ConsumerGroup\\({string}), AwaitDuration\\({string}), SubscriptionExpressions\\(NULL)")
public void createASetTheClientConfigurationEndpointConsumerGroupAwaitDurationSubscriptionExpressionsNULL(String arg0, String arg1, String arg2, String arg3) {

}

@When("Create a {string}, set the ClientConfiguration\\(Endpoint:{string}), ConsumerGroup\\({string}), AwaitDuration\\({string}), SubscriptionExpressions\\({string}, {string})")
public void createASetTheClientConfigurationEndpointConsumerGroupAwaitDurationSubscriptionExpressions(String arg0, String arg1, String arg2, String arg3, String arg4, String arg5) {

}

@Then("Consumer invoke receive\\(maxMessageNum:{int}, invisibleDuration:{int}s)")
public void consumerInvokeReceiveMaxMessageNumInvisibleDurationS(int arg0, int arg1) {

}

@Then("Check {string} receive messages successfully")
public void checkReceiveMessagesSuccessfully(String arg0) {

}

@And("Check the message request return duration between {int}s and {int}s")
public void checkTheMessageRequestReturnDurationBetweenSAndS(int arg0, int arg1) {

}

@Then("Check build {string} successfully")
public void checkBuildSuccessfully(String arg0) {

}

@When("Create a {string}, set the ConsumerGroup\\({string}), AwaitDuration\\({string}), SubscriptionExpressions\\({string}, {string})")
public void createASetTheConsumerGroupAwaitDurationSubscriptionExpressions(String arg0, String arg1, String arg2, String arg3, String arg4) {

}

@When("Create a {string}, set the ClientConfiguration\\(Endpoint:{string}), AwaitDuration\\({string}), SubscriptionExpressions\\({string}, {string})")
public void createASetTheClientConfigurationEndpointAwaitDurationSubscriptionExpressions(String arg0, String arg1, String arg2, String arg3, String arg4) {

}

@When("Create a {string}, set the ClientConfiguration\\(Endpoint:{string}), ConsumerGroup\\({string}), AwaitDuration\\({string})")
public void createASetTheClientConfigurationEndpointConsumerGroupAwaitDuration(String arg0, String arg1, String arg2, String arg3) {

}

@When("Create a {string}, set the ClientConfiguration\\(Endpoint:{string}), ConsumerGroup\\({string}), SubscriptionExpressions\\({string}, {string})")
public void createASetTheClientConfigurationEndpointConsumerGroupSubscriptionExpressions(String arg0, String arg1, String arg2, String arg3, String arg4) {

}

@Given("Create a {string}, set the ClientConfiguration\\(Endpoint:{string}), ConsumerGroup\\({string}), SubscriptionExpressions\\({string}, {string}), ConsumptionThreadCount\\({int}), MaxCacheMessageCount\\({int}), MaxCacheMessageSizeInBytes\\({int}M), MessageListener\\({string})")
public void createASetTheClientConfigurationEndpointConsumerGroupSubscriptionExpressionsConsumptionThreadCountMaxCacheMessageCountMaxCacheMessageSizeInBytesMMessageListener(String arg0, String arg1, String arg2, String arg3, String arg4, int arg5, int arg6, int arg7, String arg8) {

}

@When("Create a {string}, set the ConsumerGroup\\({string}), SubscriptionExpressions\\({string}, {string}), MessageListener\\({string})")
public void createASetTheConsumerGroupSubscriptionExpressionsMessageListener(String arg0, String arg1, String arg2, String arg3, String arg4) {

}

@And("Set {string} ClientConfiguration\\(Endpoint:{string}, AccessKey:{string}, SecretKey:{string})")
public void setClientConfigurationEndpointAccessKeySecretKey(String arg0, String arg1, String arg2, String arg3) {

}

@When("Create a {string}, set the ClientConfiguration\\(Endpoint:{string}), ConsumerGroup\\({string}), SubscriptionExpressions\\({string}, {string}), MessageListener\\({string})")
public void createASetTheClientConfigurationEndpointConsumerGroupSubscriptionExpressionsMessageListener(String arg0, String arg1, String arg2, String arg3, String arg4, String arg5) {

}

@When("Create a {string}, set the ClientConfiguration\\(Endpoint:{string}), SubscriptionExpressions\\({string}, {string}), MessageListener\\({string})")
public void createASetTheClientConfigurationEndpointSubscriptionExpressionsMessageListener(String arg0, String arg1, String arg2, String arg3, String arg4) {

}

@When("Create a {string}, set the ClientConfiguration\\(Endpoint:{string}), ConsumerGroup\\({string}), MessageListener\\({string})")
public void createASetTheClientConfigurationEndpointConsumerGroupMessageListener(String arg0, String arg1, String arg2, String arg3) {

}

@When("Create a {string}, set the ClientConfiguration\\(Endpoint:{string}), ConsumerGroup\\({string}), SubscriptionExpressions\\(NULL), MessageListener\\({string})")
public void createASetTheClientConfigurationEndpointConsumerGroupSubscriptionExpressionsNULLMessageListener(String arg0, String arg1, String arg2, String arg3) {

}

@Given("Create a {string} ConsumerGroup if not exist")
public void createAConsumerGroupIfNotExist(String arg0) {

}

@When("Create a {string}, set the Endpoint\\({string}), ConsumerGroup\\({string}), AwaitDuration\\({string}), SubscriptionExpressions\\({string}, {string})")
public void createASetTheEndpointConsumerGroupAwaitDurationSubscriptionExpressions(String arg0, String arg1, String arg2, String arg3, String arg4, String arg5) {

}

@When("Create a {string}, set the ClientConfiguration\\(Endpoint:{string}), Topics\\({string}), and MaxAttempts\\({int})")
public void createASetTheClientConfigurationEndpointTopicsAndMaxAttempts(String arg0, String arg1, String arg2, int arg3) {

}

@When("Create a {string}, set the Topics\\({string})")
public void createASetTheTopics(String arg0, String arg1) {

}

@And("Set {string} ClientConfiguration\\(Endpoint:{string}, AccessKey:NULL, SecretKey:{string})")
public void setClientConfigurationEndpointAccessKeyNULLSecretKey(String arg0, String arg1, String arg2) {

}

@And("Set {string} ClientConfiguration\\(Endpoint:{string}, AccessKey:{string}, SecretKey:NULL)")
public void setClientConfigurationEndpointAccessKeySecretKeyNULL(String arg0, String arg1, String arg2) {

}

@When("Create a {string}, set the ClientConfiguration\\(NULL), Topics\\({string}), and MaxAttempts\\({int})")
public void createASetTheClientConfigurationNULLTopicsAndMaxAttempts(String arg0, String arg1, int arg2) {

}

@When("Create a {string}, set the ClientConfiguration\\(Endpoint:{string}), Topics\\({string}, {string}), and MaxAttempts\\({int})")
public void createASetTheClientConfigurationEndpointTopicsAndMaxAttempts(String arg0, String arg1, String arg2, String arg3, int arg4) {

}

@When("Create a {string}, set the Topics\\({string}), and MaxAttempts\\({int})")
public void createASetTheTopicsAndMaxAttempts(String arg0, String arg1, int arg2) {

}

@When("Create a {string}, set the ClientConfiguration\\(Endpoint:{string}), and MaxAttempts\\({int})")
public void createASetTheClientConfigurationEndpointAndMaxAttempts(String arg0, String arg1, int arg2) {
}
}
38 changes: 38 additions & 0 deletions bdd/src/main/resources/client/consumer/ConsumerGroup.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Feature: Test System inner ConsumerGroup

Scenario Outline: Use the built-in ConsumerGroup to consume messages and expect consume failed
Given Create a "Concurrently" ConsumerGroup if not exist
When Create a "SimpleConsumer", set the Endpoint("127.0.0.1:9876"), ConsumerGroup("<RmqSystemGroupConstant>"), AwaitDuration("10s"), SubscriptionExpressions("random-topic", "random-filterExpression")
Then Consumer invoke receive(maxMessageNum:32, invisibleDuration:10s)
Then Check exceptions can be thrown
And Shutdown the producer and consumer if they are started

Examples:
| RmqSystemGroupConstant |
| GROUP_DEFAULT_CONSUMER |
| GROUP_TOOLS_CONSUMER |
| GROUP_FILTERSRV_CONSUMER |
| GROUP_MONITOR_CONSUMER |
| GROUP_CLIENT_INNER_PRODUCER |
| GROUP_SELF_TEST_P_GROUP |
| GROUP_SELF_TEST_C_GROUP |
| GROUP_CID_ONS_HTTP_PROXY |
| GROUP_CID_ONSAPI_PERMISSION |
| GROUP_CID_ONSAPI_OWNER |
| GROUP_CID_ONSAPI_PULL |
| GROUP_CID_RMQ_SYS_TRANS |
73 changes: 73 additions & 0 deletions bdd/src/main/resources/client/consumer/PushConsumerInit.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Feature: Test PushConsumer Initialization Parameters

Scenario: PushConsumer all parameters are set properly, expect start successfully
Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
Given Create a "PushConsumer", set the ClientConfiguration(Endpoint:"127.0.0.1:9876"), ConsumerGroup("random-group"), SubscriptionExpressions("random-topic", "random-filterExpression"), ConsumptionThreadCount(20), MaxCacheMessageCount(1000), MaxCacheMessageSizeInBytes(4M), MessageListener("default")
Then Check build "PushConsumer" successfully
And Shutdown the producer and consumer if they are started

Scenario Outline: Error setting the ClientConfiguration(including Endpoint and CredentialProvider), expect start failed
Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
When Create a "PushConsumer", set the ConsumerGroup("random-group"), SubscriptionExpressions("random-topic", "random-filterExpression"), MessageListener("default")
And Set "PushConsumer" ClientConfiguration(Endpoint:"<Endpoint>", AccessKey:"<AccessKey>", SecretKey:"<SecretKey>")
Then Check exceptions can be thrown
And Shutdown the producer and consumer if they are started

Examples:
| Endpoint | AccessKey | SecretKey |
| 127.0.0.1:9876 | errorAk | accountSK |
| 127.0.0.1:9876 | accountAk | errorSK |
| https://www.aliyun.com | accountAk | accountSK |

Scenario: Error setting the 'Topic' of the consumer client, expect start failed
Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
When Create a "PushConsumer", set the ClientConfiguration(Endpoint:"127.0.0.1:9876"), ConsumerGroup("random-group"), SubscriptionExpressions("topicNotExist", "random-filterExpression"), MessageListener("default")
Then Check exceptions can be thrown
And Shutdown the producer and consumer if they are started

Scenario: Without setting 'ConsumerGroup' of the consumer client, expect start failed
Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
When Create a "PushConsumer", set the ClientConfiguration(Endpoint:"127.0.0.1:9876"), SubscriptionExpressions("random-topic", "random-filterExpression"), MessageListener("default")
Then Check exceptions can be thrown
And Shutdown the producer and consumer if they are started

Scenario: Without setting 'SubscriptionExpressions' of the consumer client, expect start failed
Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
When Create a "PushConsumer", set the ClientConfiguration(Endpoint:"127.0.0.1:9876"), ConsumerGroup("random-group"), MessageListener("default")
Then Check exceptions can be thrown
And Shutdown the producer and consumer if they are started

Scenario: Error setting an empty 'SubscriptionExpressions' of the consumer client, expect start failed
Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
When Create a "PushConsumer", set the ClientConfiguration(Endpoint:"127.0.0.1:9876"), ConsumerGroup("random-group"), SubscriptionExpressions(NULL), MessageListener("default")
Then Check exceptions can be thrown
And Shutdown the producer and consumer if they are started

Scenario: Without setting 'ClientConfiguration' of the consumer client, expect start failed
Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
When Create a "PushConsumer", set the ClientConfiguration(Endpoint:"127.0.0.1:9876"), SubscriptionExpressions("random-topic", "random-filterExpression"), MessageListener("default")
Then Check exceptions can be thrown
And Shutdown the producer and consumer if they are started

Scenario: Without setting 'MessageListener' of the consumer client, expect start failed
Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
When Create a "PushConsumer", set the ClientConfiguration(Endpoint:"127.0.0.1:9876"), ConsumerGroup("random-group"), SubscriptionExpressions("random-topic", "random-filterExpression")
Then Check exceptions can be thrown
And Shutdown the producer and consumer if they are started


Loading