Skip to content

Commit

Permalink
Update tars client
Browse files Browse the repository at this point in the history
  • Loading branch information
morebtcg committed Sep 13, 2023
1 parent 2df3285 commit 21017b0
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 47 deletions.
107 changes: 65 additions & 42 deletions src/main/java/org/fisco/bcos/sdk/v3/client/TarsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.fisco.bcos.sdk.tars.ConcurrentQueue;
import org.fisco.bcos.sdk.tars.ConcurrentQueueCallback;
import org.fisco.bcos.sdk.tars.Callback;
import org.fisco.bcos.sdk.tars.Config;
import org.fisco.bcos.sdk.tars.CryptoSuite;
import org.fisco.bcos.sdk.tars.LogEntry;
Expand All @@ -35,22 +34,45 @@ public class TarsClient extends ClientImpl implements Client {
private static Logger logger = LoggerFactory.getLogger(TarsClient.class);
private RPCClient tarsRPCClient;
private TransactionFactoryImpl transactionFactory;
private Thread queueThread;
private ThreadPoolExecutor asyncThreadPool;
private Callback callback;

static final int queueSize = 10 * 10000;
static final String libFileName = System.mapLibraryName("bcos_swig_java");
private static final int queueSize = 10 * 10000;
private static final String libFileName = System.mapLibraryName("bcos_swig_java");

private class CallbackContent {
public SendTransaction sendTransaction;
TransactionCallback callback;
Transaction transaction;
private static class Content {
private SendTransaction sendTransaction;
private Transaction transaction;
private TransactionCallback callback;

public SendTransaction getSendTransaction() {
return sendTransaction;
}

public void setSendTransaction(SendTransaction sendTransaction) {
this.sendTransaction = sendTransaction;
}

public Transaction getTransaction() {
return transaction;
}

public void setTransaction(Transaction transaction) {
this.transaction = transaction;
}

public TransactionCallback getCallback() {
return callback;
}

public void setCallback(TransactionCallback callback) {
this.callback = callback;
}
};

ConcurrentQueue concurrentQueue = new ConcurrentQueue();
ConcurrentHashMap<Integer, CallbackContent> callbackMap =
new ConcurrentHashMap<Integer, CallbackContent>();
AtomicInteger callbackSeq = new AtomicInteger(0);
ConcurrentHashMap<Integer, Content> callbackMap =
new ConcurrentHashMap<Integer, TarsClient.Content>();
AtomicInteger currentSeq = new AtomicInteger();

public RPCClient getTarsRPCClient() {
return tarsRPCClient;
Expand Down Expand Up @@ -78,38 +100,37 @@ protected TarsClient(String groupID, ConfigOption configOption, long nativePoint
Config config = new Config();
config.setConnectionString(connectionString);
config.setSendQueueSize(queueSize);
config.setTimeoutMs(configOption.getNetworkConfig().getTimeout() * 1000);
config.setTimeoutMs(60 * 1000);
tarsRPCClient = new RPCClient(config);

CryptoSuite cryptoSuite =
bcos.newCryptoSuite(configOption.getCryptoMaterialConfig().getUseSmCrypto());
transactionFactory = new TransactionFactoryImpl(cryptoSuite);
queueThread =
new Thread(
() -> {
while (true) {
int seq = concurrentQueue.pop();
logger.debug("Receive queue message...", seq);
asyncThreadPool.submit(
() -> {
CallbackContent content = callbackMap.remove(seq);
if (content != null) {
TransactionReceipt receipt =
content.sendTransaction.get();
content.callback.onResponse(
toJSONTransactionReceipt(
receipt, content.transaction));
}
});
}
});
asyncThreadPool =
new ThreadPoolExecutor(
1,
configOption.getThreadPoolConfig().getThreadPoolSize(),
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueSize));
callback =
new Callback() {
public void onMessage(int seq) {
asyncThreadPool.submit(
() -> {
logger.debug("Receive seq: {}", seq);
Content content = callbackMap.remove(seq);
if (content != null) {
TransactionReceipt receipt =
content.getSendTransaction().get();
content.getCallback()
.onResponse(
toJSONTransactionReceipt(
receipt, content.getTransaction()));
}
});
}
};
}

public static void loadLibrary() {
Expand Down Expand Up @@ -152,7 +173,7 @@ public void sendTransactionAsync(
String signedTransactionData,
boolean withProof,
TransactionCallback callback) {
logger.debug("sendTransactionAsync...", node, withProof);
logger.debug("sendTransactionAsync... {} {}", node, withProof);
if (withProof) {
super.sendTransactionAsync(node, signedTransactionData, withProof, callback);
return;
Expand All @@ -163,15 +184,17 @@ public void sendTransactionAsync(
}

public void sendTransactionAsync(Transaction transaction, TransactionCallback callback) {
SendTransaction sendTransaction = new SendTransaction(tarsRPCClient);
int seq = currentSeq.addAndGet(1);

int seq = callbackSeq.addAndGet(1);
CallbackContent callbackContent = new CallbackContent();
callbackContent.sendTransaction = sendTransaction;
callbackContent.callback = callback;
callbackContent.transaction = transaction;
callbackMap.put(seq, callbackContent);
sendTransaction.setCallback(new ConcurrentQueueCallback(concurrentQueue, seq));
SendTransaction sendTransaction = new SendTransaction(tarsRPCClient);
sendTransaction.setCallback(this.callback);
sendTransaction.setSeq(seq);

Content content = new Content();
content.setSendTransaction(sendTransaction);
content.setTransaction(transaction);
content.setCallback(callback);
callbackMap.put(seq, content);

sendTransaction.send(transaction);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ public String sendTransactionAsync(
String extraData = client.getExtraData();

String hexPrivateKey = cryptoKeyPair.getHexPrivateKey();
SWIGTYPE_p_bcos__bytesConstRef hexPrivateKeyRef = bcos.toBytesConstRef(hexPrivateKey);
SWIGTYPE_p_std__vectorT_unsigned_char_t privateKey = bcos.fromHex(hexPrivateKeyRef);
SWIGTYPE_p_std__vectorT_unsigned_char_t privateKey = bcos.fromHex(hexPrivateKey);
SWIGTYPE_p_bcos__bytesConstRef privateKeyRef = bcos.toBytesConstRef(privateKey);
SWIGTYPE_p_std__shared_ptrT_KeyInterface_t key =
tarsClient
Expand All @@ -46,7 +45,6 @@ public String sendTransactionAsync(
SWIGTYPE_p_std__unique_ptrT_bcos__crypto__KeyPairInterface_t sharedKeyPair =
tarsClient.getTransactionFactory().cryptoSuite().signatureImpl().createKeyPair(key);
KeyPairInterface keyPair = bcos.pointerToReference(sharedKeyPair);

SWIGTYPE_p_std__vectorT_unsigned_char_t input = bcos.toBytes(data);

Transaction transaction =
Expand All @@ -57,15 +55,14 @@ public String sendTransactionAsync(
to,
input,
tarsClient.getTarsRPCClient().generateNonce(),
500,
client.getBlockLimit().longValue(),
client.getChainId(),
client.getGroup(),
0,
keyPair,
"");
transaction.setExtraData(extraData);
transaction.setAttribute(txAttribute);

tarsClient.sendTransactionAsync(transaction, callback);

return bcos.toHex(transaction.hash());
Expand Down

0 comments on commit 21017b0

Please sign in to comment.