Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #17723: Generate Incremental Change Events even when consolidation of events applied #19550

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1048,8 +1048,12 @@ public final PutResponse<T> update(UriInfo uriInfo, T original, T updated) {
// Update the attributes and relationships of an entity
EntityUpdater entityUpdater = getUpdater(original, updated, Operation.PUT);
entityUpdater.update();
EventType change = entityUpdater.fieldsChanged() ? EventType.ENTITY_UPDATED : ENTITY_NO_CHANGE;
EventType change =
entityUpdater.incrementalFieldsChanged() ? EventType.ENTITY_UPDATED : ENTITY_NO_CHANGE;
setInheritedFields(updated, new Fields(allowedFields));
if (change == ENTITY_UPDATED) {
updated.setChangeDescription(entityUpdater.getIncrementalChangeDescription());
}
return new PutResponse<>(Status.OK, withHref(uriInfo, updated), change);
}

Expand All @@ -1060,29 +1064,7 @@ public final PatchResponse<T> patch(UriInfo uriInfo, UUID id, String user, JsonP
setInheritedFields(original, patchFields);

// Apply JSON patch to the original entity to get the updated entity
T updated = JsonUtils.applyPatch(original, patch, entityClass);
updated.setUpdatedBy(user);
updated.setUpdatedAt(System.currentTimeMillis());

prepareInternal(updated, true);
// Validate and populate owners
List<EntityReference> validatedOwners = getValidatedOwners(updated.getOwners());
updated.setOwners(validatedOwners);

restorePatchAttributes(original, updated);

// Update the attributes and relationships of an entity
EntityUpdater entityUpdater = getUpdater(original, updated, Operation.PATCH);
entityUpdater.update();

entityRelationshipReindex(original, updated);

EventType change = ENTITY_NO_CHANGE;
if (entityUpdater.fieldsChanged()) {
change = EventType.ENTITY_UPDATED;
setInheritedFields(updated, patchFields); // Restore inherited fields after a change
}
return new PatchResponse<>(Status.OK, withHref(uriInfo, updated), change);
return patchCommon(original, patch, user, uriInfo);
}

/**
Expand All @@ -1095,6 +1077,10 @@ public final PatchResponse<T> patch(UriInfo uriInfo, String fqn, String user, Js
setInheritedFields(original, patchFields);

// Apply JSON patch to the original entity to get the updated entity
return patchCommon(original, patch, user, uriInfo);
}

private PatchResponse<T> patchCommon(T original, JsonPatch patch, String user, UriInfo uriInfo) {
T updated = JsonUtils.applyPatch(original, patch, entityClass);
updated.setUpdatedBy(user);
updated.setUpdatedAt(System.currentTimeMillis());
Expand All @@ -1108,12 +1094,15 @@ public final PatchResponse<T> patch(UriInfo uriInfo, String fqn, String user, Js
// Update the attributes and relationships of an entity
EntityUpdater entityUpdater = getUpdater(original, updated, Operation.PATCH);
entityUpdater.update();
EventType change = ENTITY_NO_CHANGE;
if (entityUpdater.fieldsChanged()) {
change = EventType.ENTITY_UPDATED;
setInheritedFields(updated, patchFields); // Restore inherited fields after a change
}
return new PatchResponse<>(Status.OK, withHref(uriInfo, updated), change);

if (entityUpdater.incrementalFieldsChanged()) {
updated.setChangeDescription(entityUpdater.getIncrementalChangeDescription());
return new PatchResponse<>(Status.OK, withHref(uriInfo, updated), ENTITY_UPDATED);
}
return new PatchResponse<>(Status.OK, withHref(uriInfo, updated), ENTITY_NO_CHANGE);
}

@Transaction
Expand Down Expand Up @@ -2486,6 +2475,35 @@ private ChangeEvent getChangeEvent(
.withPreviousVersion(prevVersion);
}

protected void createAndInsertChangeEvent(
harshach marked this conversation as resolved.
Show resolved Hide resolved
T original, T updated, ChangeDescription changeDescription, EventType eventType) {
if (changeDescription == null) {
return;
}

if (changeDescription.getPreviousVersion() == null) {
changeDescription.withPreviousVersion(original.getVersion());
}

ChangeEvent changeEvent =
new ChangeEvent()
.withId(UUID.randomUUID())
.withEventType(eventType)
.withEntityType(entityType)
.withEntityId(updated.getId())
.withEntityFullyQualifiedName(updated.getFullyQualifiedName())
.withUserName(updated.getUpdatedBy())
.withTimestamp(System.currentTimeMillis())
.withCurrentVersion(updated.getVersion())
.withPreviousVersion(changeDescription.getPreviousVersion())
.withChangeDescription(changeDescription)
.withEntity(updated);

daoCollection.changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent));
LOG.debug(
"Inserted incremental ChangeEvent for {} version {}", entityType, updated.getVersion());
}

/** Remove owner relationship for a given entity */
@Transaction
private void removeOwners(T entity, List<EntityReference> owners) {
Expand Down Expand Up @@ -2786,6 +2804,7 @@ public class EntityUpdater {
protected boolean majorVersionChange = false;
protected final User updatingUser;
private boolean entityChanged = false;
@Getter protected ChangeDescription incrementalChangeDescription = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harshach we will need to introduce the same 'incrementalChangeDescription' field in EntityInterface also

ChangeDescription getIncrementalChangeDescription(); and
void setIncrementalChangeDescription(ChangeDescription incrementalChangeDescription);

this is because when we are making a call to postUpdate and updating the search, we are using

if (entity.getChangeDescription() != null && Objects.equals(entity.getVersion(), entity.getChangeDescription().getPreviousVersion())) {

this needs to be replaced with incrementalChangeDescription as we have removed the consolidated changes for parent.

before making the same changes in pr, i want to confirm if we can go ahead and implement the same in EntityInterface.


public EntityUpdater(T original, T updated, Operation operation) {
this.original = original;
Expand All @@ -2800,6 +2819,7 @@ public EntityUpdater(T original, T updated, Operation operation) {
/** Compare original and updated entities and perform updates. Update the entity version and track changes. */
@Transaction
public final void update() {
incrementalChange();
boolean consolidateChanges = consolidateChanges(original, updated, operation);
// Revert the changes previously made by the user with in a session and consolidate all the
// changes
Expand All @@ -2809,12 +2829,16 @@ public final void update() {
// Now updated from previous/original to updated one
changeDescription = new ChangeDescription();
updateInternal();

// Store the updated entity
storeUpdate();
postUpdate(original, updated);
}

private void incrementalChange() {
changeDescription = new ChangeDescription();
updateInternal(false);
incrementalChangeDescription = changeDescription;
}

@Transaction
private void revert() {
// Revert from current version to previous version to go back to the previous version
Expand All @@ -2825,6 +2849,7 @@ private void revert() {
LOG.debug(
"In session change consolidation. Reverting to previous version {}",
previous.getVersion());
changeDescription = new ChangeDescription();
updated = previous;
updateInternal(true);
LOG.info(
Expand Down Expand Up @@ -3262,6 +3287,15 @@ public final boolean fieldsChanged() {
|| !changeDescription.getFieldsDeleted().isEmpty();
}

public final boolean incrementalFieldsChanged() {
if (incrementalChangeDescription == null) {
return false;
}
return !incrementalChangeDescription.getFieldsAdded().isEmpty()
|| !incrementalChangeDescription.getFieldsUpdated().isEmpty()
|| !incrementalChangeDescription.getFieldsDeleted().isEmpty();
}

public final <K> boolean recordChange(String field, K orig, K updated) {
return recordChange(field, orig, updated, false, objectMatch, true);
}
Expand Down
Loading