Skip to content

Commit

Permalink
in FederatedNodeChangeDomain.primeNodeChangeInfo, add to any existing…
Browse files Browse the repository at this point in the history
… info instead of reinit data
  • Loading branch information
Doris Lam committed Mar 31, 2024
1 parent 26300d3 commit 404ef00
Showing 1 changed file with 22 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ protected NodeChangeInfo createNodeChangeInfo() {
@Override
public NodeChangeInfo initInfo(CommitJson commitJson, boolean overwrite, boolean preserveTimestamps) {
commitJson.setId(UUID.randomUUID().toString());
commitJson.setDocId(commitJson.getId());
commitJson.setDocId(commitJson.getId());

NodeChangeInfo info = super.initInfo(commitJson, overwrite, preserveTimestamps);
if(info instanceof FederatedNodeChangeInfo) {
((FederatedNodeChangeInfo)info).setOldDocIds(new HashSet<>());
((FederatedNodeChangeInfo)info).setReqIndexIds(new HashSet<>());
((FederatedNodeChangeInfo)info).setToSaveNodeMap(new HashMap<>());
} else {
} else {
throw new InternalErrorException("Unexpected NodeChangeInfo type in FederatedNodeChangeDomain");
}
return info;
Expand Down Expand Up @@ -140,7 +140,7 @@ protected void processElementAddedOrUpdated(NodeChangeInfo info, ElementJson ele
String docId = UUID.randomUUID().toString();
element.setDocId(docId);

super.processElementAddedOrUpdated(info, element);
super.processElementAddedOrUpdated(info, element);
n.setDocId(element.getDocId());
n.setLastCommit(info.getCommitJson().getId());
n.setDeleted(false);
Expand Down Expand Up @@ -255,35 +255,39 @@ public FederatedNodeChangeInfo processPostJson(NodeChangeInfo info, Collection<E
}


//ToDo :: Check
@Override
public void primeNodeChangeInfo(NodeChangeInfo nodeChangeInfo, Collection<ElementJson> transactedElements) {
Set<String> elementIds = transactedElements.stream().map(BaseJson::getId).filter(id->null!=id).collect(Collectors.toSet());
List<Node> existingNodes = nodeRepository.findAllByNodeIds(elementIds);

Set<String> indexIds = new HashSet<>();
Map<String, Node> existingNodeMap = new HashMap<>();
Map<String, ElementJson> reqElementMap = new HashMap<>();
if (!(nodeChangeInfo instanceof FederatedNodeChangeInfo)) {
throw new InternalErrorException("Node processing is not using FederatedNodeChangeInfo");
}

FederatedNodeChangeInfo info = (FederatedNodeChangeInfo)nodeChangeInfo;
Set<String> indexIds = Optional.ofNullable(info.getReqIndexIds()).orElse(new HashSet<>());
Map<String, Node> existingNodeMap = Optional.ofNullable(info.getExistingNodeMap()).orElse(new HashMap<>());
Map<String, ElementJson> reqElementMap = Optional.ofNullable(info.getReqElementMap()).orElse(new HashMap<>());
Map<String, ElementJson> existingElementMap = Optional.ofNullable(info.getExistingElementMap()).orElse(new HashMap<>());

for (Node node : existingNodes) {
indexIds.add(node.getDocId());
existingNodeMap.put(node.getNodeId(), node);
}
if(!transactedElements.isEmpty()){
if (!transactedElements.isEmpty()) {
reqElementMap.putAll(convertJsonToMap(transactedElements));
}

// bulk read existing elements in elastic
List<ElementJson> existingElements = nodeIndex.findAllById(indexIds);
Map<String, ElementJson> existingElementMap = convertJsonToMap(existingElements);

nodeChangeInfo.setExistingElementMap(existingElementMap);
nodeChangeInfo.setReqElementMap(reqElementMap);
nodeChangeInfo.setRejected(new HashMap<>());
nodeChangeInfo.setActiveElementMap(new HashMap<>());
if (nodeChangeInfo instanceof FederatedNodeChangeInfo) {
((FederatedNodeChangeInfo) nodeChangeInfo).setExistingNodeMap(existingNodeMap);
((FederatedNodeChangeInfo) nodeChangeInfo).setReqIndexIds(indexIds);
}
existingElementMap.putAll(convertJsonToMap(existingElements));

info.setExistingElementMap(existingElementMap);
info.setReqElementMap(reqElementMap);
info.setRejected(Optional.ofNullable(info.getRejected()).orElse(new HashMap<>()));
info.setActiveElementMap(Optional.ofNullable(info.getActiveElementMap()).orElse(new HashMap<>()));
info.setExistingNodeMap(existingNodeMap);
info.setReqIndexIds(indexIds);
}

}

0 comments on commit 404ef00

Please sign in to comment.