From 7ce96e4c7ce0d16b6b8c3f8e6d19fd6550312f00 Mon Sep 17 00:00:00 2001 From: justabug Date: Sun, 3 Mar 2024 21:20:46 +0800 Subject: [PATCH] feature : RocketMQ transaction are supported (#6230) --- all/pom.xml | 5 + bom/pom.xml | 5 + changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 1 + .../seata/common/ConfigurationKeys.java | 5 + .../apache/seata/common/DefaultValues.java | 2 + .../seata/core/context/RootContext.java | 2 + .../seata/core/model/ResourceManager.java | 9 ++ dependencies/pom.xml | 7 + pom.xml | 1 + .../seata/rm/AbstractResourceManager.java | 15 ++ .../seata/rm/DefaultResourceManager.java | 6 + rocketmq/pom.xml | 49 ++++++ .../integration/rocketmq/SeataMQProducer.java | 149 ++++++++++++++++++ .../rocketmq/SeataMQProducerFactory.java | 63 ++++++++ .../integration/rocketmq/TCCRocketMQ.java | 79 ++++++++++ .../integration/rocketmq/TCCRocketMQImpl.java | 96 +++++++++++ .../rocketmq/SeataMQProducerFactoryTest.java | 36 +++++ .../rocketmq/SeataMQProducerTest.java | 31 ++++ rocketmq/src/test/resources/file.conf | 25 +++ rocketmq/src/test/resources/registry.conf | 34 ++++ seata-spring-boot-starter/pom.xml | 5 + .../rm/tcc/interceptor/ProxyUtilsTccTest.java | 8 + .../seata/mockserver/MockCoordinator.java | 2 +- test/pom.xml | 11 ++ .../rocketmq/SeataMQProducerSendTest.java | 139 ++++++++++++++++ 26 files changed, 785 insertions(+), 1 deletion(-) create mode 100644 rocketmq/pom.xml create mode 100644 rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducer.java create mode 100644 rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java create mode 100644 rocketmq/src/main/java/org/apache/seata/integration/rocketmq/TCCRocketMQ.java create mode 100644 rocketmq/src/main/java/org/apache/seata/integration/rocketmq/TCCRocketMQImpl.java create mode 100644 rocketmq/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactoryTest.java create mode 100644 rocketmq/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerTest.java create mode 100644 rocketmq/src/test/resources/file.conf create mode 100644 rocketmq/src/test/resources/registry.conf create mode 100644 test/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerSendTest.java diff --git a/all/pom.xml b/all/pom.xml index d3542d1758e..97cd30bbba6 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -192,6 +192,11 @@ + + org.apache.seata + seata-rocketmq + ${project.version} + org.apache.seata seata-sqlparser-core diff --git a/bom/pom.xml b/bom/pom.xml index 224c84d1f61..514d4f048bb 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -191,6 +191,11 @@ seata-http ${project.version} + + org.apache.seata + seata-rocketmq + ${project.version} + org.apache.seata seata-rm diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 64b2df332e8..2df25a629d2 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -6,6 +6,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6370](https://github.com/seata/seata/pull/6370)] seata saga decouple spring, optimize architecture. - [[#6205](https://github.com/apache/incubator-seata/pull/6205)] mock server - [[#6169](https://github.com/apache/incubator-seata/pull/6169)] full support for states in the refactored state machine designer +- [[#6230](https://github.com/apache/incubator-seata/pull/6230)] RocketMQ transaction are supported ### bugfix: - [[#6090](https://github.com/apache/incubator-seata/pull/6090)] fix the TCC aspect exception handling process, do not wrapping the internal call exceptions diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 4c5d1608415..6099f3ae339 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -6,6 +6,7 @@ - [[#6370](https://github.com/seata/seata/pull/6370)] seata saga spring接耦、架构优化。 - [[#6205](https://github.com/apache/incubator-seata/pull/6205)] 提供mock server - [[#6169](https://github.com/apache/incubator-seata/pull/6169)] 支持新版本状态机设计器 +- [[#6230](https://github.com/apache/incubator-seata/pull/6230)] 支持RocketMQ消息事务 ### bugfix: - [[#6090](https://github.com/apache/incubator-seata/pull/6090)] 修复tcc切面异常处理过程,不对内部调用异常做包装处理,直接向外抛出 diff --git a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java index 7e6521c5ab5..e1c0f11f395 100644 --- a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java +++ b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java @@ -1006,4 +1006,9 @@ public interface ConfigurationKeys { * The constant SERVER_APPLICATION_DATA_SIZE_CHECK */ String SERVER_APPLICATION_DATA_SIZE_CHECK = SERVER_PREFIX + "applicationDataLimitCheck"; + + /** + * The constant ROCKET_MQ_MSG_TIMEOUT + */ + String ROCKET_MQ_MSG_TIMEOUT = SERVER_PREFIX + "rocketmqMsgTimeout"; } diff --git a/common/src/main/java/org/apache/seata/common/DefaultValues.java b/common/src/main/java/org/apache/seata/common/DefaultValues.java index 67e21838fbe..0c2dd0f7b89 100644 --- a/common/src/main/java/org/apache/seata/common/DefaultValues.java +++ b/common/src/main/java/org/apache/seata/common/DefaultValues.java @@ -312,4 +312,6 @@ public interface DefaultValues { * Default druid location in classpath */ String DRUID_LOCATION = "lib/sqlparser/druid.jar"; + + int DEFAULT_ROCKET_MQ_MSG_TIMEOUT = 60 * 1000; } diff --git a/core/src/main/java/org/apache/seata/core/context/RootContext.java b/core/src/main/java/org/apache/seata/core/context/RootContext.java index eb78a1bf2e6..85453afb5d1 100644 --- a/core/src/main/java/org/apache/seata/core/context/RootContext.java +++ b/core/src/main/java/org/apache/seata/core/context/RootContext.java @@ -48,6 +48,8 @@ private RootContext() { */ public static final String KEY_XID = "TX_XID"; + public static final String KEY_BRANCHID = "TX_BRANCHID"; + /** * The constant HIDDEN_KEY_XID for sofa-rpc integration. */ diff --git a/core/src/main/java/org/apache/seata/core/model/ResourceManager.java b/core/src/main/java/org/apache/seata/core/model/ResourceManager.java index ab65cdb76eb..ee590589654 100644 --- a/core/src/main/java/org/apache/seata/core/model/ResourceManager.java +++ b/core/src/main/java/org/apache/seata/core/model/ResourceManager.java @@ -51,4 +51,13 @@ public interface ResourceManager extends ResourceManagerInbound, ResourceManager * @return The BranchType of ResourceManager. */ BranchType getBranchType(); + + /** + * Get the GlobalStatus. + * + * @param branchType The BranchType of ResourceManager. + * @param xid The xid of transaction. + * @return The GlobalStatus of transaction. + */ + GlobalStatus getGlobalStatus(BranchType branchType, String xid); } diff --git a/dependencies/pom.xml b/dependencies/pom.xml index ad5b15c6abb..b317f642ecb 100644 --- a/dependencies/pom.xml +++ b/dependencies/pom.xml @@ -114,6 +114,8 @@ ${mysql.version} 8.0.27 + + 5.0.0 1.4.32 @@ -781,6 +783,11 @@ janino ${janino-version} + + org.apache.rocketmq + rocketmq-client + ${rocketmq-version} + diff --git a/pom.xml b/pom.xml index d403ecebc73..a9feab80b9e 100644 --- a/pom.xml +++ b/pom.xml @@ -57,6 +57,7 @@ integration/brpc rm rm-datasource + rocketmq spring tcc test diff --git a/rm/src/main/java/org/apache/seata/rm/AbstractResourceManager.java b/rm/src/main/java/org/apache/seata/rm/AbstractResourceManager.java index 1c82579031b..fa4e776b115 100644 --- a/rm/src/main/java/org/apache/seata/rm/AbstractResourceManager.java +++ b/rm/src/main/java/org/apache/seata/rm/AbstractResourceManager.java @@ -28,6 +28,7 @@ import org.apache.seata.core.exception.TransactionExceptionCode; import org.apache.seata.core.model.BranchStatus; import org.apache.seata.core.model.BranchType; +import org.apache.seata.core.model.GlobalStatus; import org.apache.seata.core.model.Resource; import org.apache.seata.core.model.ResourceManager; import org.apache.seata.core.protocol.ResultCode; @@ -35,6 +36,8 @@ import org.apache.seata.core.protocol.transaction.BranchRegisterResponse; import org.apache.seata.core.protocol.transaction.BranchReportRequest; import org.apache.seata.core.protocol.transaction.BranchReportResponse; +import org.apache.seata.core.protocol.transaction.GlobalStatusRequest; +import org.apache.seata.core.protocol.transaction.GlobalStatusResponse; import org.apache.seata.core.rpc.netty.RmNettyRemotingClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,4 +143,16 @@ public void unregisterResource(Resource resource) { public void registerResource(Resource resource) { RmNettyRemotingClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId()); } + + @Override + public GlobalStatus getGlobalStatus(BranchType branchType, String xid) { + GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest(); + queryGlobalStatus.setXid(xid); + try { + GlobalStatusResponse response = (GlobalStatusResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(queryGlobalStatus); + return response.getGlobalStatus(); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + } } diff --git a/rm/src/main/java/org/apache/seata/rm/DefaultResourceManager.java b/rm/src/main/java/org/apache/seata/rm/DefaultResourceManager.java index 45097b0de0c..51bbff569c9 100644 --- a/rm/src/main/java/org/apache/seata/rm/DefaultResourceManager.java +++ b/rm/src/main/java/org/apache/seata/rm/DefaultResourceManager.java @@ -27,6 +27,7 @@ import org.apache.seata.core.exception.TransactionException; import org.apache.seata.core.model.BranchStatus; import org.apache.seata.core.model.BranchType; +import org.apache.seata.core.model.GlobalStatus; import org.apache.seata.core.model.Resource; import org.apache.seata.core.model.ResourceManager; @@ -150,6 +151,11 @@ public BranchType getBranchType() { throw new FrameworkException("DefaultResourceManager isn't a real ResourceManager"); } + @Override + public GlobalStatus getGlobalStatus(BranchType branchType, String xid) { + return getResourceManager(branchType).getGlobalStatus(branchType, xid); + } + private static class SingletonHolder { private static DefaultResourceManager INSTANCE = new DefaultResourceManager(); } diff --git a/rocketmq/pom.xml b/rocketmq/pom.xml new file mode 100644 index 00000000000..23297aff44a --- /dev/null +++ b/rocketmq/pom.xml @@ -0,0 +1,49 @@ + + + + + org.apache.seata + seata-parent + ${revision} + ../pom.xml + + 4.0.0 + seata-rocketmq + jar + seata-rocketmq ${project.version} + rocketmq integration for Seata built with Maven + + + + ${project.groupId} + seata-tcc + ${project.version} + + + org.apache.rocketmq + rocketmq-client + provided + + + + + diff --git a/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducer.java b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducer.java new file mode 100644 index 00000000000..2846d00073e --- /dev/null +++ b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducer.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.integration.rocketmq; + +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.core.context.RootContext; +import org.apache.seata.core.model.GlobalStatus; +import org.apache.seata.rm.DefaultResourceManager; +import org.apache.rocketmq.client.Validators; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.TransactionListener; +import org.apache.rocketmq.client.producer.TransactionMQProducer; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; + +/** + * Seata MQ Producer + **/ +public class SeataMQProducer extends TransactionMQProducer { + + private static final Logger LOGGER = LoggerFactory.getLogger(SeataMQProducer.class); + + private static final List COMMIT_STATUSES = Arrays.asList(GlobalStatus.Committed, GlobalStatus.Committing, GlobalStatus.CommitRetrying); + private static final List ROLLBACK_STATUSES = Arrays.asList(GlobalStatus.Rollbacked, GlobalStatus.Rollbacking, GlobalStatus.RollbackRetrying); + + public static String PROPERTY_SEATA_XID = RootContext.KEY_XID; + public static String PROPERTY_SEATA_BRANCHID = RootContext.KEY_BRANCHID; + private TransactionListener transactionListener; + + private TCCRocketMQ tccRocketMQ; + + SeataMQProducer(final String producerGroup) { + this(null, producerGroup, null); + } + + SeataMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) { + super(namespace, producerGroup, rpcHook); + this.transactionListener = new TransactionListener() { + @Override + public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { + return LocalTransactionState.UNKNOW; + } + + @Override + public LocalTransactionState checkLocalTransaction(MessageExt msg) { + String xid = msg.getProperty(PROPERTY_SEATA_XID); + if (StringUtils.isBlank(xid)) { + LOGGER.error("msg has no xid, msgTransactionId: {}, msg will be rollback", msg.getTransactionId()); + return LocalTransactionState.ROLLBACK_MESSAGE; + } + GlobalStatus globalStatus = DefaultResourceManager.get().getGlobalStatus(SeataMQProducerFactory.ROCKET_BRANCH_TYPE, xid); + if (COMMIT_STATUSES.contains(globalStatus)) { + return LocalTransactionState.COMMIT_MESSAGE; + } else if (ROLLBACK_STATUSES.contains(globalStatus) || GlobalStatus.isOnePhaseTimeout(globalStatus)) { + return LocalTransactionState.ROLLBACK_MESSAGE; + } else if (GlobalStatus.Finished.equals(globalStatus)) { + LOGGER.error("global transaction finished, msg will be rollback, xid: {}", xid); + return LocalTransactionState.ROLLBACK_MESSAGE; + } + return LocalTransactionState.UNKNOW; + } + }; + } + + @Override + public SendResult send(Message msg) throws MQClientException, MQBrokerException, RemotingException, InterruptedException { + return send(msg, this.getSendMsgTimeout()); + } + + @Override + public SendResult send(Message msg, long timeout) throws MQClientException, MQBrokerException, RemotingException, InterruptedException { + if (RootContext.inGlobalTransaction()) { + if (tccRocketMQ == null) { + throw new RuntimeException("TCCRocketMQ is not initialized"); + } + return tccRocketMQ.prepare(msg, timeout); + } else { + return super.send(msg, timeout); + } + } + + public SendResult doSendMessageInTransaction(final Message msg, long timeout, String xid, long branchId) throws MQClientException { + msg.setTopic(withNamespace(msg.getTopic())); + if (msg.getDelayTimeLevel() != 0) { + MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); + } + Validators.checkMessage(msg, this); + + SendResult sendResult = null; + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.getProducerGroup()); + MessageAccessor.putProperty(msg, PROPERTY_SEATA_XID, xid); + MessageAccessor.putProperty(msg, PROPERTY_SEATA_BRANCHID, String.valueOf(branchId)); + try { + sendResult = super.send(msg, timeout); + } catch (Exception e) { + throw new MQClientException("send message Exception", e); + } + + if (SendStatus.SEND_OK != sendResult.getSendStatus()) { + throw new RuntimeException("Message send fail.status=" + sendResult.getSendStatus()); + } + if (sendResult.getTransactionId() != null) { + msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); + } + String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); + if (null != transactionId && !"".equals(transactionId)) { + msg.setTransactionId(transactionId); + } + return sendResult; + } + + + @Override + public TransactionListener getTransactionListener() { + return transactionListener; + } + + public void setTccRocketMQ(TCCRocketMQ tccRocketMQ) { + this.tccRocketMQ = tccRocketMQ; + } +} diff --git a/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java new file mode 100644 index 00000000000..63414aa1290 --- /dev/null +++ b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.integration.rocketmq; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.seata.common.exception.NotSupportYetException; +import org.apache.seata.core.model.BranchType; +import org.apache.seata.integration.tx.api.util.ProxyUtil; + +/** + * SeataMQProducer Factory + **/ +public class SeataMQProducerFactory { + + public static final String ROCKET_TCC_NAME = "tccRocketMQ"; + public static final BranchType ROCKET_BRANCH_TYPE = BranchType.TCC; + + /** + * Default Producer, it can be replaced to Map after multi-resource is supported + */ + private static SeataMQProducer defaultProducer; + + public static SeataMQProducer createSingle(String nameServer, String producerGroup) throws MQClientException { + return createSingle(nameServer, null, producerGroup, null); + } + + public static SeataMQProducer createSingle(String nameServer, String namespace, + String groupName, RPCHook rpcHook) throws MQClientException { + if (defaultProducer == null) { + synchronized (SeataMQProducerFactory.class) { + if (defaultProducer == null) { + defaultProducer = new SeataMQProducer(namespace, groupName, rpcHook); + defaultProducer.setNamesrvAddr(nameServer); + TCCRocketMQ tccRocketMQProxy = ProxyUtil.createProxy(new TCCRocketMQImpl()); + tccRocketMQProxy.setProducer(defaultProducer); + defaultProducer.setTccRocketMQ(tccRocketMQProxy); + defaultProducer.start(); + return defaultProducer; + } + } + } + throw new NotSupportYetException("only one seata producer is permitted"); + } + + public static SeataMQProducer getProducer() { + return defaultProducer; + } +} diff --git a/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/TCCRocketMQ.java b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/TCCRocketMQ.java new file mode 100644 index 00000000000..44ea5dfac34 --- /dev/null +++ b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/TCCRocketMQ.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.integration.rocketmq; + +import org.apache.seata.core.exception.TransactionException; +import org.apache.seata.rm.tcc.api.BusinessActionContext; + +import java.net.UnknownHostException; +import java.util.concurrent.TimeoutException; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; + +/** + * The interface Tcc rocket mq. + */ +public interface TCCRocketMQ { + + /** + * set SeataMQProducer + * + * @param producer the producer + */ + void setProducer(SeataMQProducer producer); + + /** + * RocketMQ half send + * + * @param message the message + * @param timeout the timeout + * @return SendResult + */ + SendResult prepare(Message message, long timeout) throws MQClientException; + + /** + * RocketMQ half send commit + * + * @param context the BusinessActionContext + * @return SendResult + * @throws UnknownHostException + * @throws MQBrokerException + * @throws RemotingException + * @throws InterruptedException + */ + boolean commit(BusinessActionContext context) + throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException, TransactionException, TimeoutException; + + /** + * RocketMQ half send rollback + * + * @param context the BusinessActionContext + * @return + * @throws UnknownHostException + * @throws MQBrokerException + * @throws RemotingException + * @throws InterruptedException + */ + boolean rollback(BusinessActionContext context) + throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException, TransactionException; + + +} \ No newline at end of file diff --git a/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/TCCRocketMQImpl.java b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/TCCRocketMQImpl.java new file mode 100644 index 00000000000..bd1a7972038 --- /dev/null +++ b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/TCCRocketMQImpl.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.integration.rocketmq; + +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import org.apache.seata.core.exception.TransactionException; +import org.apache.seata.rm.tcc.api.BusinessActionContext; +import org.apache.seata.rm.tcc.api.BusinessActionContextUtil; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.seata.rm.tcc.api.LocalTCC; +import org.apache.seata.rm.tcc.api.TwoPhaseBusinessAction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +/** + * the type TCCRocketMQImpl + */ +@LocalTCC +public class TCCRocketMQImpl implements TCCRocketMQ { + private static final Logger LOGGER = LoggerFactory.getLogger(TCCRocketMQImpl.class); + private static final String ROCKET_MSG_KEY = "ROCKET_MSG"; + private static final String ROCKET_SEND_RESULT_KEY = "ROCKET_SEND_RESULT"; + + private SeataMQProducer producer; + private DefaultMQProducerImpl producerImpl; + + @Override + public void setProducer(SeataMQProducer producer) { + this.producer = producer; + this.producerImpl = producer.getDefaultMQProducerImpl(); + } + + @Override + @TwoPhaseBusinessAction(name = SeataMQProducerFactory.ROCKET_TCC_NAME) + public SendResult prepare(Message message, long timeout) throws MQClientException { + BusinessActionContext context = BusinessActionContextUtil.getContext(); + LOGGER.info("RocketMQ message send prepare, xid = {}", context.getXid()); + Map params = new HashMap<>(8); + SendResult sendResult = producer.doSendMessageInTransaction(message, timeout, context.getXid(), context.getBranchId()); + message.setDeliverTimeMs(0); + params.put(ROCKET_MSG_KEY, message); + params.put(ROCKET_SEND_RESULT_KEY, sendResult); + BusinessActionContextUtil.addContext(params); + return sendResult; + } + + @Override + public boolean commit(BusinessActionContext context) + throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException, TimeoutException, TransactionException { + Message message = context.getActionContext(ROCKET_MSG_KEY, Message.class); + SendResult sendResult = context.getActionContext(ROCKET_SEND_RESULT_KEY, SendResult.class); + if (message == null || sendResult == null) { + throw new TransactionException("TCCRocketMQ commit but cannot find message and sendResult"); + } + this.producerImpl.endTransaction(message, sendResult, LocalTransactionState.COMMIT_MESSAGE, null); + LOGGER.info("RocketMQ message send commit, xid = {}, branchId = {}", context.getXid(), context.getBranchId()); + return true; + } + + @Override + public boolean rollback(BusinessActionContext context) + throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException, TransactionException { + Message message = context.getActionContext(ROCKET_MSG_KEY, Message.class); + SendResult sendResult = context.getActionContext(ROCKET_SEND_RESULT_KEY, SendResult.class); + if (message == null || sendResult == null) { + LOGGER.error("TCCRocketMQ rollback but cannot find message and sendResult"); + } + this.producerImpl.endTransaction(message, sendResult, LocalTransactionState.ROLLBACK_MESSAGE, null); + LOGGER.info("RocketMQ message send rollback, xid = {}, branchId = {}", context.getXid(), context.getBranchId()); + return true; + } +} \ No newline at end of file diff --git a/rocketmq/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactoryTest.java b/rocketmq/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactoryTest.java new file mode 100644 index 00000000000..975bb885d85 --- /dev/null +++ b/rocketmq/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactoryTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.integration.rocketmq; + +import org.apache.seata.common.exception.NotSupportYetException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * seata mq producer factory test + **/ +public class SeataMQProducerFactoryTest { + + @Test + public void testCreateSingle() throws Exception { + SeataMQProducerFactory.createSingle("127.0.0.1:9876", "test"); + Assertions.assertThrows(NotSupportYetException.class, () -> SeataMQProducerFactory.createSingle("127.0.0.1:9876", "test")); + + SeataMQProducer producer = SeataMQProducerFactory.getProducer(); + Assertions.assertNotNull(producer); + } +} diff --git a/rocketmq/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerTest.java b/rocketmq/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerTest.java new file mode 100644 index 00000000000..7b8ab979d58 --- /dev/null +++ b/rocketmq/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.integration.rocketmq; + +import org.junit.jupiter.api.Test; + +/** + * seata mq producer test + **/ +public class SeataMQProducerTest { + + @Test + public void testCreate(){ + new SeataMQProducer("testProducerGroup"); + new SeataMQProducer("testNamespace", "testProducerGroup",null); + } +} diff --git a/rocketmq/src/test/resources/file.conf b/rocketmq/src/test/resources/file.conf new file mode 100644 index 00000000000..46c3e0401cc --- /dev/null +++ b/rocketmq/src/test/resources/file.conf @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +service { + #transaction service group mapping + vgroupMapping.default_tx_group = "default" + #only support when registry.type=file, please don't set multiple addresses + default.grouplist = "127.0.0.1:8091" + #disable seata + disableGlobalTransaction = false +} \ No newline at end of file diff --git a/rocketmq/src/test/resources/registry.conf b/rocketmq/src/test/resources/registry.conf new file mode 100644 index 00000000000..5ad014bf55a --- /dev/null +++ b/rocketmq/src/test/resources/registry.conf @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +registry { + # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa + type = "file" + + file { + name = "file.conf" + } +} + +config { + # file、nacos 、apollo、zk、consul、etcd3 + type = "file" + + file { + name = "file.conf" + } +} \ No newline at end of file diff --git a/seata-spring-boot-starter/pom.xml b/seata-spring-boot-starter/pom.xml index 517923778e2..24f449166b0 100644 --- a/seata-spring-boot-starter/pom.xml +++ b/seata-spring-boot-starter/pom.xml @@ -66,6 +66,11 @@ spring-boot-configuration-processor true + + org.apache.rocketmq + rocketmq-client + true + diff --git a/tcc/src/test/java/org/apache/seata/rm/tcc/interceptor/ProxyUtilsTccTest.java b/tcc/src/test/java/org/apache/seata/rm/tcc/interceptor/ProxyUtilsTccTest.java index 48416d90e9c..9a243733e94 100644 --- a/tcc/src/test/java/org/apache/seata/rm/tcc/interceptor/ProxyUtilsTccTest.java +++ b/tcc/src/test/java/org/apache/seata/rm/tcc/interceptor/ProxyUtilsTccTest.java @@ -25,6 +25,7 @@ import org.apache.seata.core.exception.TransactionException; import org.apache.seata.core.model.BranchStatus; import org.apache.seata.core.model.BranchType; +import org.apache.seata.core.model.GlobalStatus; import org.apache.seata.core.model.Resource; import org.apache.seata.core.model.ResourceManager; import org.apache.seata.integration.tx.api.util.ProxyUtil; @@ -90,6 +91,13 @@ public Map getManagedResources() { public BranchType getBranchType() { return null; } + + @Override + public GlobalStatus getGlobalStatus(BranchType branchType, String xid) { + return null; + } + + }; diff --git a/test-mock-server/src/main/java/org/apache/seata/mockserver/MockCoordinator.java b/test-mock-server/src/main/java/org/apache/seata/mockserver/MockCoordinator.java index 727d32e824d..eba654fa48a 100644 --- a/test-mock-server/src/main/java/org/apache/seata/mockserver/MockCoordinator.java +++ b/test-mock-server/src/main/java/org/apache/seata/mockserver/MockCoordinator.java @@ -254,7 +254,7 @@ public void setExpectedRetry(String xid, int times) { } private void checkMockActionFail(String xid) throws TransactionException { - if (expectedResultMap.get(xid) == ResultCode.Failed) { + if (ResultCode.Failed == expectedResultMap.get(xid)) { throw new TransactionException(TransactionExceptionCode.Broken, "mock action expect fail"); } } diff --git a/test/pom.xml b/test/pom.xml index e9d638fe389..91d4c9f60fb 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -141,6 +141,17 @@ seata-sqlparser-druid ${project.version} + + + ${project.groupId} + seata-rocketmq + ${project.version} + + + org.apache.rocketmq + rocketmq-client + test + diff --git a/test/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerSendTest.java b/test/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerSendTest.java new file mode 100644 index 00000000000..e1e778f7c38 --- /dev/null +++ b/test/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerSendTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.integration.rocketmq; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.seata.common.ConfigurationKeys; +import org.apache.seata.common.ConfigurationTestHelper; +import org.apache.seata.core.context.RootContext; +import org.apache.seata.core.exception.TransactionException; +import org.apache.seata.core.model.TransactionManager; +import org.apache.seata.core.rpc.netty.mockserver.ProtocolTestConstants; +import org.apache.seata.core.rpc.netty.mockserver.TmClientTest; +import org.apache.seata.mockserver.MockServer; +import org.apache.seata.rm.RMClient; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * seata mq producer test + **/ +public class SeataMQProducerSendTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(SeataMQProducerSendTest.class); + + + private static final String TOPIC = "seata-test"; + private static final String NAME_SERVER = "127.0.0.1:9876"; + + private static SeataMQProducer producer; + + @BeforeAll + public static void before() throws MQClientException { + ConfigurationTestHelper.putConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL, String.valueOf(ProtocolTestConstants.MOCK_SERVER_PORT)); + MockServer.start(ProtocolTestConstants.MOCK_SERVER_PORT); + producer = SeataMQProducerFactory.createSingle(NAME_SERVER, "test"); + // should start mq server here + } + + @AfterAll + public static void after() { + MockServer.close(); + ConfigurationTestHelper.removeConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL); + producer.shutdown(); + } + + @Test + @Disabled + public void testSendCommit() throws MQBrokerException, RemotingException, InterruptedException, MQClientException, TransactionException { + TransactionManager tm = getTmAndBegin(); + + CountDownLatch countDownLatch = new CountDownLatch(1); + MQPushConsumer consumer = startConsume(countDownLatch); + producer.send(new Message(TOPIC, "testMessage".getBytes(StandardCharsets.UTF_8))); + + tm.commit(RootContext.getXID()); + LOGGER.info("global commit"); + boolean await = countDownLatch.await(2, TimeUnit.SECONDS); + LOGGER.info("await:{}", await); + consumer.shutdown(); + } + + @Test + @Disabled + public void testSendRollback() + throws MQBrokerException, RemotingException, InterruptedException, MQClientException, TransactionException { + TransactionManager tm = getTmAndBegin(); + + CountDownLatch countDownLatch = new CountDownLatch(1); + MQPushConsumer consumer = startConsume(countDownLatch); + + producer.send(new Message(TOPIC, "testMessage".getBytes(StandardCharsets.UTF_8))); + + tm.rollback(RootContext.getXID()); + LOGGER.info("global rollback"); + try { + boolean await = countDownLatch.await(2, TimeUnit.SECONDS); + LOGGER.info("await:{}", await); + } catch (Exception e) { + Assertions.assertEquals(e.getClass(), InterruptedException.class); + } finally { + consumer.shutdown(); + } + } + + + private static MQPushConsumer startConsume(CountDownLatch countDownLatch) throws MQClientException { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("yourGroup"); + consumer.setNamesrvAddr(NAME_SERVER); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + consumer.subscribe(TOPIC,"*"); + consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> { + LOGGER.info("%s Receive New Messages: {} {}", Thread.currentThread().getName(), msg); + countDownLatch.countDown(); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + consumer.start(); + return consumer; + } + + + private static TransactionManager getTmAndBegin() throws TransactionException { + TransactionManager tm = TmClientTest.getTm(); + RMClient.init(ProtocolTestConstants.APPLICATION_ID, ProtocolTestConstants.SERVICE_GROUP); + String xid = tm.begin(ProtocolTestConstants.APPLICATION_ID, ProtocolTestConstants.SERVICE_GROUP, "testRocket", 60000); + RootContext.bind(xid); + return tm; + } +}