Skip to content

Commit

Permalink
Update tars client impl (FISCO-BCOS#821)
Browse files Browse the repository at this point in the history
* Add TarsClient

* Update bulid.gradle, add tars-sdk

* Update fisco-bcos-tars-sdk package name

* Using google code style

* Merge origin

* Revert changes of xml

* ExtraData use string type

* Update tars client

* Add tars config and impl

* Update loadLibrary

* Use google java format

* Update library path find

* Update tars client

---------

Co-authored-by: More <More@MSI>
Co-authored-by: More <[email protected]>
  • Loading branch information
3 people authored and kyonRay committed Oct 12, 2023
1 parent 2f184e8 commit 3165b09
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 13 deletions.
79 changes: 66 additions & 13 deletions src/main/java/org/fisco/bcos/sdk/v3/client/TarsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
import java.net.URL;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.fisco.bcos.sdk.tars.Callback;
import org.fisco.bcos.sdk.tars.ConcurrentQueue;
import org.fisco.bcos.sdk.tars.ConcurrentQueueCallback;
import org.fisco.bcos.sdk.tars.Config;
import org.fisco.bcos.sdk.tars.CryptoSuite;
import org.fisco.bcos.sdk.tars.LogEntry;
Expand All @@ -32,10 +35,39 @@ public class TarsClient extends ClientImpl implements Client {
private static Logger logger = LoggerFactory.getLogger(TarsClient.class);
private RPCClient tarsRPCClient;
private TransactionFactoryImpl transactionFactory;
private Thread queueThread;
private ThreadPoolExecutor asyncThreadPool;

static final int queueSize = 10 * 10000;
static final String libFileName = System.mapLibraryName("bcos_swig_java");

private class CallbackContent {
public SendTransaction sendTransaction;
TransactionCallback callback;
Transaction transaction;
};

ConcurrentQueue concurrentQueue = new ConcurrentQueue();
ConcurrentHashMap<Integer, CallbackContent> callbackMap =
new ConcurrentHashMap<Integer, CallbackContent>();
AtomicInteger callbackSeq = new AtomicInteger(0);

public RPCClient getTarsRPCClient() {
return tarsRPCClient;
}

public void setTarsRPCClient(RPCClient tarsRPCClient) {
this.tarsRPCClient = tarsRPCClient;
}

public TransactionFactoryImpl getTransactionFactory() {
return transactionFactory;
}

public void setTransactionFactory(TransactionFactoryImpl transactionFactory) {
this.transactionFactory = transactionFactory;
}

protected TarsClient(String groupID, ConfigOption configOption, long nativePointer) {
super(groupID, configOption, nativePointer);
String connectionString =
Expand All @@ -52,6 +84,25 @@ protected TarsClient(String groupID, ConfigOption configOption, long nativePoint
CryptoSuite cryptoSuite =
bcos.newCryptoSuite(configOption.getCryptoMaterialConfig().getUseSmCrypto());
transactionFactory = new TransactionFactoryImpl(cryptoSuite);
queueThread =
new Thread(
() -> {
while (true) {
int seq = concurrentQueue.pop();
logger.debug("Receive queue message...", seq);
asyncThreadPool.submit(
() -> {
CallbackContent content = callbackMap.remove(seq);
if (content != null) {
TransactionReceipt receipt =
content.sendTransaction.get();
content.callback.onResponse(
toJSONTransactionReceipt(
receipt, content.transaction));
}
});
}
});
asyncThreadPool =
new ThreadPoolExecutor(
1,
Expand All @@ -72,7 +123,7 @@ public static void loadLibrary(String libPath) {

public static TarsClient build(String groupId, ConfigOption configOption, long nativePointer) {
logger.info(
"build, groupID: {}, configOption: {}, nativePointer: {}",
"TarsClient build, groupID: {}, configOption: {}, nativePointer: {}",
groupId,
configOption,
nativePointer);
Expand Down Expand Up @@ -101,25 +152,27 @@ public void sendTransactionAsync(
String signedTransactionData,
boolean withProof,
TransactionCallback callback) {
logger.debug("sendTransactionAsync...", node, withProof);
if (withProof) {
super.sendTransactionAsync(node, signedTransactionData, withProof, callback);
return;
}
node = Objects.isNull(node) ? "" : node;
Transaction transaction = toTransaction(signedTransactionData);
sendTransactionAsync(transaction, callback);
}

public void sendTransactionAsync(Transaction transaction, TransactionCallback callback) {
SendTransaction sendTransaction = new SendTransaction(tarsRPCClient);

sendTransaction.setCallback(
new Callback() {
public void onMessage() {
asyncThreadPool.submit(
() -> {
TransactionReceipt receipt = sendTransaction.get();
callback.onResponse(
toJSONTransactionReceipt(receipt, transaction));
});
}
});
int seq = callbackSeq.addAndGet(1);
CallbackContent callbackContent = new CallbackContent();
callbackContent.sendTransaction = sendTransaction;
callbackContent.callback = callback;
callbackContent.transaction = transaction;
callbackMap.put(seq, callbackContent);
sendTransaction.setCallback(new ConcurrentQueueCallback(concurrentQueue, seq));

sendTransaction.send(transaction);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.fisco.bcos.sdk.v3.transaction.manager;

import org.fisco.bcos.sdk.tars.KeyPairInterface;
import org.fisco.bcos.sdk.tars.SWIGTYPE_p_bcos__bytesConstRef;
import org.fisco.bcos.sdk.tars.SWIGTYPE_p_std__shared_ptrT_KeyInterface_t;
import org.fisco.bcos.sdk.tars.SWIGTYPE_p_std__unique_ptrT_bcos__crypto__KeyPairInterface_t;
import org.fisco.bcos.sdk.tars.SWIGTYPE_p_std__vectorT_unsigned_char_t;
import org.fisco.bcos.sdk.tars.Transaction;
import org.fisco.bcos.sdk.tars.bcos;
import org.fisco.bcos.sdk.v3.client.Client;
import org.fisco.bcos.sdk.v3.client.TarsClient;
import org.fisco.bcos.sdk.v3.crypto.keypair.CryptoKeyPair;
import org.fisco.bcos.sdk.v3.model.callback.TransactionCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TarsTransactionProcessor extends TransactionProcessor {
private TarsClient tarsClient;
private static final Logger logger = LoggerFactory.getLogger(TarsTransactionProcessor.class);

public TarsTransactionProcessor(
Client client, CryptoKeyPair cryptoKeyPair, String groupId, String chainId) {
super(client, cryptoKeyPair, groupId, chainId);
tarsClient = (TarsClient) client;
}

@Override
public String sendTransactionAsync(
String to,
byte[] data,
CryptoKeyPair cryptoKeyPair,
int txAttribute,
TransactionCallback callback) {
String extraData = client.getExtraData();

String hexPrivateKey = cryptoKeyPair.getHexPrivateKey();
SWIGTYPE_p_bcos__bytesConstRef hexPrivateKeyRef = bcos.toBytesConstRef(hexPrivateKey);
SWIGTYPE_p_std__vectorT_unsigned_char_t privateKey = bcos.fromHex(hexPrivateKeyRef);
SWIGTYPE_p_bcos__bytesConstRef privateKeyRef = bcos.toBytesConstRef(privateKey);
SWIGTYPE_p_std__shared_ptrT_KeyInterface_t key =
tarsClient
.getTransactionFactory()
.cryptoSuite()
.keyFactory()
.createKey(privateKeyRef);
SWIGTYPE_p_std__unique_ptrT_bcos__crypto__KeyPairInterface_t sharedKeyPair =
tarsClient.getTransactionFactory().cryptoSuite().signatureImpl().createKeyPair(key);
KeyPairInterface keyPair = bcos.pointerToReference(sharedKeyPair);

SWIGTYPE_p_std__vectorT_unsigned_char_t input = bcos.toBytes(data);

Transaction transaction =
tarsClient
.getTransactionFactory()
.createTransaction(
0,
to,
input,
tarsClient.getTarsRPCClient().generateNonce(),
500,
client.getChainId(),
client.getGroup(),
0,
keyPair,
"");
transaction.setExtraData(extraData);
transaction.setAttribute(txAttribute);

tarsClient.sendTransactionAsync(transaction, callback);

return bcos.toHex(transaction.hash());
}
}

0 comments on commit 3165b09

Please sign in to comment.