diff --git a/src/main/java/com/webank/wecross/network/rpc/handler/XATransactionHandler.java b/src/main/java/com/webank/wecross/network/rpc/handler/XATransactionHandler.java index 4e5ddb560..f5c0f2078 100644 --- a/src/main/java/com/webank/wecross/network/rpc/handler/XATransactionHandler.java +++ b/src/main/java/com/webank/wecross/network/rpc/handler/XATransactionHandler.java @@ -58,6 +58,7 @@ public void setPaths(Set paths) { public static class ListXATransactionsRequest { private int size; + private String path; private Map offsets = Collections.synchronizedMap(new HashMap<>()); public int getSize() { @@ -75,6 +76,14 @@ public Map getOffsets() { public void setOffsets(Map offsets) { this.offsets = offsets; } + + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } } @Override @@ -229,6 +238,7 @@ public void handle( host.getAccountManager().getAdminUA(), xaRequest.getData().getOffsets(), xaRequest.getData().getSize(), + xaRequest.getData().getPath(), (exception, xaTransactionListResponse) -> { if (logger.isDebugEnabled()) { logger.debug( diff --git a/src/main/java/com/webank/wecross/routine/xa/XATransactionManager.java b/src/main/java/com/webank/wecross/routine/xa/XATransactionManager.java index 875fffd11..988010774 100644 --- a/src/main/java/com/webank/wecross/routine/xa/XATransactionManager.java +++ b/src/main/java/com/webank/wecross/routine/xa/XATransactionManager.java @@ -576,6 +576,44 @@ public void asyncGetXATransaction( } } + private ListXAReduceCallback getListXACallback( + int count, Map offsets, int size, ListXATransactionsCallback callback) { + List errors = + Collections.synchronizedList(new LinkedList<>()); + Map nextOffsets = new ConcurrentHashMap<>(offsets); + + return (chain, listXAResponse) -> { + + // first time to list, reset offsets + if (nextOffsets.get(chain) == -1) { + nextOffsets.put(chain, listXAResponse.getTotal() - 1); + } + + XATransactionListResponse response = new XATransactionListResponse(); + if (Objects.nonNull(listXAResponse.getChainErrorMessage())) { + errors.add(listXAResponse.getChainErrorMessage()); + if (!errors.isEmpty()) { + XAResponse xaResponse = new XAResponse(); + xaResponse.setStatus(-1); + xaResponse.setChainErrorMessages(errors); + response.setXaResponse(xaResponse); + } + } else { + response.setXaList(listXAResponse.getXaTransactions()); + // update offsets + Long nextOffset = + nextOffsets.get(chain) - listXAResponse.getXaTransactions().size(); + nextOffsets.put(chain, nextOffset); + response.setNextOffsets(nextOffsets); + if (nextOffsets.get(chain) == -1) { + response.setFinished(true); + } + response.recoverUsername(accountManager); + } + callback.onResponse(null, response); + }; + } + public interface ListXATransactionsCallback { void onResponse(WeCrossException e, XATransactionListResponse xaTransactionListResponse); } @@ -728,6 +766,7 @@ public void asyncListXATransactions( UniversalAccount ua, Map offsets, int size, + String chainPath, ListXATransactionsCallback callback) { try { @@ -742,7 +781,17 @@ public void asyncListXATransactions( XATransactionListResponse response = new XATransactionListResponse(); - List chainPaths = setToList(zoneManager.getAllChainsInfo(false).keySet()); + ListXAReduceCallback reduceCallback = null; + List chainPaths = new ArrayList<>(); + if (Objects.isNull(chainPath) || chainPath.isEmpty()) { + chainPaths = setToList(zoneManager.getAllChainsInfo(false).keySet()); + // has sort operation callback + reduceCallback = getListXAReduceCallback(offsets.size(), offsets, size, callback); + } else { + chainPaths.add(Path.decode(chainPath)); + // Remove sort operation callback + reduceCallback = getListXACallback(offsets.size(), offsets, size, callback); + } int chainNum = chainPaths.size(); if (chainNum == 0) { @@ -761,8 +810,6 @@ public void asyncListXATransactions( } } - ListXAReduceCallback reduceCallback = - getListXAReduceCallback(offsets.size(), offsets, size, callback); for (String chain : offsets.keySet()) { if (!requireIgnore || offsets.get(chain) != -1L) { asyncListXATransactions(