From 21017b04afc4c40564d9b3c5e671e629ccf12b34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8E=AB=E6=A5=A0?= <651932351@qq.com> Date: Wed, 13 Sep 2023 10:41:01 +0800 Subject: [PATCH] Update tars client --- .../fisco/bcos/sdk/v3/client/TarsClient.java | 107 +++++++++++------- .../manager/TarsTransactionProcessor.java | 7 +- 2 files changed, 67 insertions(+), 47 deletions(-) diff --git a/src/main/java/org/fisco/bcos/sdk/v3/client/TarsClient.java b/src/main/java/org/fisco/bcos/sdk/v3/client/TarsClient.java index f9a9b29bf..d5bdfd2f6 100644 --- a/src/main/java/org/fisco/bcos/sdk/v3/client/TarsClient.java +++ b/src/main/java/org/fisco/bcos/sdk/v3/client/TarsClient.java @@ -9,8 +9,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import org.fisco.bcos.sdk.tars.ConcurrentQueue; -import org.fisco.bcos.sdk.tars.ConcurrentQueueCallback; +import org.fisco.bcos.sdk.tars.Callback; import org.fisco.bcos.sdk.tars.Config; import org.fisco.bcos.sdk.tars.CryptoSuite; import org.fisco.bcos.sdk.tars.LogEntry; @@ -35,22 +34,45 @@ public class TarsClient extends ClientImpl implements Client { private static Logger logger = LoggerFactory.getLogger(TarsClient.class); private RPCClient tarsRPCClient; private TransactionFactoryImpl transactionFactory; - private Thread queueThread; private ThreadPoolExecutor asyncThreadPool; + private Callback callback; - static final int queueSize = 10 * 10000; - static final String libFileName = System.mapLibraryName("bcos_swig_java"); + private static final int queueSize = 10 * 10000; + private static final String libFileName = System.mapLibraryName("bcos_swig_java"); - private class CallbackContent { - public SendTransaction sendTransaction; - TransactionCallback callback; - Transaction transaction; + private static class Content { + private SendTransaction sendTransaction; + private Transaction transaction; + private TransactionCallback callback; + + public SendTransaction getSendTransaction() { + return sendTransaction; + } + + public void setSendTransaction(SendTransaction sendTransaction) { + this.sendTransaction = sendTransaction; + } + + public Transaction getTransaction() { + return transaction; + } + + public void setTransaction(Transaction transaction) { + this.transaction = transaction; + } + + public TransactionCallback getCallback() { + return callback; + } + + public void setCallback(TransactionCallback callback) { + this.callback = callback; + } }; - ConcurrentQueue concurrentQueue = new ConcurrentQueue(); - ConcurrentHashMap callbackMap = - new ConcurrentHashMap(); - AtomicInteger callbackSeq = new AtomicInteger(0); + ConcurrentHashMap callbackMap = + new ConcurrentHashMap(); + AtomicInteger currentSeq = new AtomicInteger(); public RPCClient getTarsRPCClient() { return tarsRPCClient; @@ -78,31 +100,12 @@ protected TarsClient(String groupID, ConfigOption configOption, long nativePoint Config config = new Config(); config.setConnectionString(connectionString); config.setSendQueueSize(queueSize); - config.setTimeoutMs(configOption.getNetworkConfig().getTimeout() * 1000); + config.setTimeoutMs(60 * 1000); tarsRPCClient = new RPCClient(config); CryptoSuite cryptoSuite = bcos.newCryptoSuite(configOption.getCryptoMaterialConfig().getUseSmCrypto()); transactionFactory = new TransactionFactoryImpl(cryptoSuite); - queueThread = - new Thread( - () -> { - while (true) { - int seq = concurrentQueue.pop(); - logger.debug("Receive queue message...", seq); - asyncThreadPool.submit( - () -> { - CallbackContent content = callbackMap.remove(seq); - if (content != null) { - TransactionReceipt receipt = - content.sendTransaction.get(); - content.callback.onResponse( - toJSONTransactionReceipt( - receipt, content.transaction)); - } - }); - } - }); asyncThreadPool = new ThreadPoolExecutor( 1, @@ -110,6 +113,24 @@ protected TarsClient(String groupID, ConfigOption configOption, long nativePoint 0, TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize)); + callback = + new Callback() { + public void onMessage(int seq) { + asyncThreadPool.submit( + () -> { + logger.debug("Receive seq: {}", seq); + Content content = callbackMap.remove(seq); + if (content != null) { + TransactionReceipt receipt = + content.getSendTransaction().get(); + content.getCallback() + .onResponse( + toJSONTransactionReceipt( + receipt, content.getTransaction())); + } + }); + } + }; } public static void loadLibrary() { @@ -152,7 +173,7 @@ public void sendTransactionAsync( String signedTransactionData, boolean withProof, TransactionCallback callback) { - logger.debug("sendTransactionAsync...", node, withProof); + logger.debug("sendTransactionAsync... {} {}", node, withProof); if (withProof) { super.sendTransactionAsync(node, signedTransactionData, withProof, callback); return; @@ -163,15 +184,17 @@ public void sendTransactionAsync( } public void sendTransactionAsync(Transaction transaction, TransactionCallback callback) { - SendTransaction sendTransaction = new SendTransaction(tarsRPCClient); + int seq = currentSeq.addAndGet(1); - int seq = callbackSeq.addAndGet(1); - CallbackContent callbackContent = new CallbackContent(); - callbackContent.sendTransaction = sendTransaction; - callbackContent.callback = callback; - callbackContent.transaction = transaction; - callbackMap.put(seq, callbackContent); - sendTransaction.setCallback(new ConcurrentQueueCallback(concurrentQueue, seq)); + SendTransaction sendTransaction = new SendTransaction(tarsRPCClient); + sendTransaction.setCallback(this.callback); + sendTransaction.setSeq(seq); + + Content content = new Content(); + content.setSendTransaction(sendTransaction); + content.setTransaction(transaction); + content.setCallback(callback); + callbackMap.put(seq, content); sendTransaction.send(transaction); } diff --git a/src/main/java/org/fisco/bcos/sdk/v3/transaction/manager/TarsTransactionProcessor.java b/src/main/java/org/fisco/bcos/sdk/v3/transaction/manager/TarsTransactionProcessor.java index 4da8adb0f..76be4d6b7 100644 --- a/src/main/java/org/fisco/bcos/sdk/v3/transaction/manager/TarsTransactionProcessor.java +++ b/src/main/java/org/fisco/bcos/sdk/v3/transaction/manager/TarsTransactionProcessor.java @@ -34,8 +34,7 @@ public String sendTransactionAsync( String extraData = client.getExtraData(); String hexPrivateKey = cryptoKeyPair.getHexPrivateKey(); - SWIGTYPE_p_bcos__bytesConstRef hexPrivateKeyRef = bcos.toBytesConstRef(hexPrivateKey); - SWIGTYPE_p_std__vectorT_unsigned_char_t privateKey = bcos.fromHex(hexPrivateKeyRef); + SWIGTYPE_p_std__vectorT_unsigned_char_t privateKey = bcos.fromHex(hexPrivateKey); SWIGTYPE_p_bcos__bytesConstRef privateKeyRef = bcos.toBytesConstRef(privateKey); SWIGTYPE_p_std__shared_ptrT_KeyInterface_t key = tarsClient @@ -46,7 +45,6 @@ public String sendTransactionAsync( SWIGTYPE_p_std__unique_ptrT_bcos__crypto__KeyPairInterface_t sharedKeyPair = tarsClient.getTransactionFactory().cryptoSuite().signatureImpl().createKeyPair(key); KeyPairInterface keyPair = bcos.pointerToReference(sharedKeyPair); - SWIGTYPE_p_std__vectorT_unsigned_char_t input = bcos.toBytes(data); Transaction transaction = @@ -57,7 +55,7 @@ public String sendTransactionAsync( to, input, tarsClient.getTarsRPCClient().generateNonce(), - 500, + client.getBlockLimit().longValue(), client.getChainId(), client.getGroup(), 0, @@ -65,7 +63,6 @@ public String sendTransactionAsync( ""); transaction.setExtraData(extraData); transaction.setAttribute(txAttribute); - tarsClient.sendTransactionAsync(transaction, callback); return bcos.toHex(transaction.hash());