Skip to content

Commit

Permalink
<perf>(xatransaction): Improve the xaTransaction list interface (#596)
Browse files Browse the repository at this point in the history
* Improve the xaTransaction list interface

* feat:add annotation

* feat:Formatted code

* feat:Version changed to 1.0

---------

Co-authored-by: ylxiao5 <[email protected]>
  • Loading branch information
yinghuochongfly and ylxiao5 authored Dec 15, 2023
1 parent 8d3b6b3 commit 6def3f7
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public void setPaths(Set<String> paths) {

public static class ListXATransactionsRequest {
private int size;
private String path;
private Map<String, Long> offsets = Collections.synchronizedMap(new HashMap<>());

public int getSize() {
Expand All @@ -75,6 +76,14 @@ public Map<String, Long> getOffsets() {
public void setOffsets(Map<String, Long> offsets) {
this.offsets = offsets;
}

public String getPath() {
return path;
}

public void setPath(String path) {
this.path = path;
}
}

@Override
Expand Down Expand Up @@ -229,6 +238,7 @@ public void handle(
host.getAccountManager().getAdminUA(),
xaRequest.getData().getOffsets(),
xaRequest.getData().getSize(),
xaRequest.getData().getPath(),
(exception, xaTransactionListResponse) -> {
if (logger.isDebugEnabled()) {
logger.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,44 @@ public void asyncGetXATransaction(
}
}

private ListXAReduceCallback getListXACallback(
int count, Map<String, Long> offsets, int size, ListXATransactionsCallback callback) {
List<XAResponse.ChainErrorMessage> errors =
Collections.synchronizedList(new LinkedList<>());
Map<String, Long> nextOffsets = new ConcurrentHashMap<>(offsets);

return (chain, listXAResponse) -> {

// first time to list, reset offsets
if (nextOffsets.get(chain) == -1) {
nextOffsets.put(chain, listXAResponse.getTotal() - 1);
}

XATransactionListResponse response = new XATransactionListResponse();
if (Objects.nonNull(listXAResponse.getChainErrorMessage())) {
errors.add(listXAResponse.getChainErrorMessage());
if (!errors.isEmpty()) {
XAResponse xaResponse = new XAResponse();
xaResponse.setStatus(-1);
xaResponse.setChainErrorMessages(errors);
response.setXaResponse(xaResponse);
}
} else {
response.setXaList(listXAResponse.getXaTransactions());
// update offsets
Long nextOffset =
nextOffsets.get(chain) - listXAResponse.getXaTransactions().size();
nextOffsets.put(chain, nextOffset);
response.setNextOffsets(nextOffsets);
if (nextOffsets.get(chain) == -1) {
response.setFinished(true);
}
response.recoverUsername(accountManager);
}
callback.onResponse(null, response);
};
}

public interface ListXATransactionsCallback {
void onResponse(WeCrossException e, XATransactionListResponse xaTransactionListResponse);
}
Expand Down Expand Up @@ -728,6 +766,7 @@ public void asyncListXATransactions(
UniversalAccount ua,
Map<String, Long> offsets,
int size,
String chainPath,
ListXATransactionsCallback callback) {

try {
Expand All @@ -742,7 +781,17 @@ public void asyncListXATransactions(

XATransactionListResponse response = new XATransactionListResponse();

List<Path> chainPaths = setToList(zoneManager.getAllChainsInfo(false).keySet());
ListXAReduceCallback reduceCallback = null;
List<Path> chainPaths = new ArrayList<>();
if (Objects.isNull(chainPath) || chainPath.isEmpty()) {
chainPaths = setToList(zoneManager.getAllChainsInfo(false).keySet());
// has sort operation callback
reduceCallback = getListXAReduceCallback(offsets.size(), offsets, size, callback);
} else {
chainPaths.add(Path.decode(chainPath));
// Remove sort operation callback
reduceCallback = getListXACallback(offsets.size(), offsets, size, callback);
}

int chainNum = chainPaths.size();
if (chainNum == 0) {
Expand All @@ -761,8 +810,6 @@ public void asyncListXATransactions(
}
}

ListXAReduceCallback reduceCallback =
getListXAReduceCallback(offsets.size(), offsets, size, callback);
for (String chain : offsets.keySet()) {
if (!requireIgnore || offsets.get(chain) != -1L) {
asyncListXATransactions(
Expand Down

0 comments on commit 6def3f7

Please sign in to comment.