Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #17723: Generate Incremental Change Events even when consolidation of events applied #19550

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 43 additions & 10 deletions ingestion/tests/integration/ometa/test_ometa_role_policy_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,17 +214,37 @@ def tearDownClass(cls) -> None:
policies = cls.metadata.list_entities(entity=Policy)
for policy in policies.entities:
if model_str(policy.name).startswith(model_str(cls.policy_entity.name)):
cls.metadata.delete(entity=Policy, entity_id=model_str(policy.id))

cls.metadata.delete(entity=Policy, entity_id=model_str(cls.role_policy_1.id))
cls.metadata.delete(entity=Policy, entity_id=model_str(cls.role_policy_2.id))
cls.metadata.delete(
entity=Policy,
entity_id=model_str(policy.id),
hard_delete=True,
recursive=True,
)

cls.metadata.delete(
entity=Policy,
entity_id=model_str(cls.role_policy_1.id),
hard_delete=True,
recursive=True,
)
cls.metadata.delete(
entity=Policy,
entity_id=model_str(cls.role_policy_2.id),
hard_delete=True,
recursive=True,
)

roles = cls.metadata.list_entities(entity=Role)
for role in roles.entities:
if model_str(role.name.root).startswith(
model_str(cls.role_entity.name.root)
):
cls.metadata.delete(entity=Role, entity_id=model_str(role.id))
cls.metadata.delete(
entity=Role,
entity_id=model_str(role.id),
hard_delete=True,
recursive=True,
)

def test_policy_create(self):
"""
Expand Down Expand Up @@ -328,7 +348,12 @@ def test_policy_delete(self):
res_id = self.metadata.get_by_id(entity=Policy, entity_id=res_name.id)

# Delete
self.metadata.delete(entity=Policy, entity_id=model_str(res_id.id))
self.metadata.delete(
entity=Policy,
entity_id=model_str(res_id.id),
hard_delete=True,
recursive=True,
)

# Then we should not find it
res = self.metadata.list_entities(entity=Policy)
Expand Down Expand Up @@ -547,7 +572,9 @@ def test_role_delete(self):
res_id = self.metadata.get_by_id(entity=Role, entity_id=res_name.id)

# Delete
self.metadata.delete(entity=Role, entity_id=str(res_id.id.root))
self.metadata.delete(
entity=Role, entity_id=str(res_id.id.root), hard_delete=True, recursive=True
)

# Then we should not find it
res = self.metadata.list_entities(entity=Role)
Expand Down Expand Up @@ -626,7 +653,9 @@ def test_role_add_user(self):
)
assert res.users.root[0].id == user.id

self.metadata.delete(entity=User, entity_id=user.id)
self.metadata.delete(
entity=User, entity_id=user.id, hard_delete=True, recursive=True
)

def test_role_add_team(self):
"""
Expand Down Expand Up @@ -657,8 +686,12 @@ def test_role_add_team(self):
)
assert res.teams.root[0].id == team.id

self.metadata.delete(entity=Team, entity_id=team.id)
self.metadata.delete(entity=User, entity_id=user.id)
self.metadata.delete(
entity=Team, entity_id=team.id, hard_delete=True, recursive=True
)
self.metadata.delete(
entity=User, entity_id=user.id, hard_delete=True, recursive=True
)

def test_role_patch_policies(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public APICollectionRepository() {
"",
"");
supportsSearch = true;
parent = true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,5 @@ public APIServiceRepository() {
"",
ServiceType.API);
supportsSearch = true;
parent = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public ClassificationRepository() {
quoteFqn = true;
supportsSearch = true;
renameAllowed = true;
parent = true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,5 @@ public DashboardServiceRepository() {
"",
ServiceType.DASHBOARD);
supportsSearch = true;
parent = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public DatabaseRepository() {
"",
"");
supportsSearch = true;
parent = true;
fieldFetchers.put("name", this::fetchAndSetService);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public DatabaseSchemaRepository() {
"",
"");
supportsSearch = true;
parent = true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public DatabaseServiceRepository() {
"",
ServiceType.DATABASE);
supportsSearch = true;
parent = true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public DomainRepository() {
UPDATE_FIELDS,
UPDATE_FIELDS);
supportsSearch = true;
parent = true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@ public record EntityHistoryWithOffset(EntityHistory entityHistory, int nextOffse
@Getter protected final Fields putFields;

protected boolean supportsSearch = false;
@Getter protected boolean parent = false;
protected final Map<String, BiConsumer<List<T>, Fields>> fieldFetchers = new HashMap<>();

protected EntityRepository(
Expand Down Expand Up @@ -1048,8 +1047,12 @@ public final PutResponse<T> update(UriInfo uriInfo, T original, T updated) {
// Update the attributes and relationships of an entity
EntityUpdater entityUpdater = getUpdater(original, updated, Operation.PUT);
entityUpdater.update();
EventType change = entityUpdater.fieldsChanged() ? EventType.ENTITY_UPDATED : ENTITY_NO_CHANGE;
EventType change =
entityUpdater.incrementalFieldsChanged() ? EventType.ENTITY_UPDATED : ENTITY_NO_CHANGE;
setInheritedFields(updated, new Fields(allowedFields));
if (change == ENTITY_UPDATED) {
updated.setChangeDescription(entityUpdater.getIncrementalChangeDescription());
}
return new PutResponse<>(Status.OK, withHref(uriInfo, updated), change);
}

Expand All @@ -1060,29 +1063,7 @@ public final PatchResponse<T> patch(UriInfo uriInfo, UUID id, String user, JsonP
setInheritedFields(original, patchFields);

// Apply JSON patch to the original entity to get the updated entity
T updated = JsonUtils.applyPatch(original, patch, entityClass);
updated.setUpdatedBy(user);
updated.setUpdatedAt(System.currentTimeMillis());

prepareInternal(updated, true);
// Validate and populate owners
List<EntityReference> validatedOwners = getValidatedOwners(updated.getOwners());
updated.setOwners(validatedOwners);

restorePatchAttributes(original, updated);

// Update the attributes and relationships of an entity
EntityUpdater entityUpdater = getUpdater(original, updated, Operation.PATCH);
entityUpdater.update();

entityRelationshipReindex(original, updated);

EventType change = ENTITY_NO_CHANGE;
if (entityUpdater.fieldsChanged()) {
change = EventType.ENTITY_UPDATED;
setInheritedFields(updated, patchFields); // Restore inherited fields after a change
}
return new PatchResponse<>(Status.OK, withHref(uriInfo, updated), change);
return patchCommon(original, patch, user, uriInfo);
}

/**
Expand All @@ -1095,6 +1076,10 @@ public final PatchResponse<T> patch(UriInfo uriInfo, String fqn, String user, Js
setInheritedFields(original, patchFields);

// Apply JSON patch to the original entity to get the updated entity
return patchCommon(original, patch, user, uriInfo);
}

private PatchResponse<T> patchCommon(T original, JsonPatch patch, String user, UriInfo uriInfo) {
T updated = JsonUtils.applyPatch(original, patch, entityClass);
updated.setUpdatedBy(user);
updated.setUpdatedAt(System.currentTimeMillis());
Expand All @@ -1108,12 +1093,14 @@ public final PatchResponse<T> patch(UriInfo uriInfo, String fqn, String user, Js
// Update the attributes and relationships of an entity
EntityUpdater entityUpdater = getUpdater(original, updated, Operation.PATCH);
entityUpdater.update();
EventType change = ENTITY_NO_CHANGE;
if (entityUpdater.fieldsChanged()) {
change = EventType.ENTITY_UPDATED;
setInheritedFields(updated, patchFields); // Restore inherited fields after a change
}
return new PatchResponse<>(Status.OK, withHref(uriInfo, updated), change);
updated.setChangeDescription(entityUpdater.getIncrementalChangeDescription());
if (entityUpdater.incrementalFieldsChanged()) {
return new PatchResponse<>(Status.OK, withHref(uriInfo, updated), ENTITY_UPDATED);
}
return new PatchResponse<>(Status.OK, withHref(uriInfo, updated), ENTITY_NO_CHANGE);
}

@Transaction
Expand Down Expand Up @@ -2486,6 +2473,35 @@ private ChangeEvent getChangeEvent(
.withPreviousVersion(prevVersion);
}

protected void createAndInsertChangeEvent(
harshach marked this conversation as resolved.
Show resolved Hide resolved
T original, T updated, ChangeDescription changeDescription, EventType eventType) {
if (changeDescription == null) {
return;
}

if (changeDescription.getPreviousVersion() == null) {
changeDescription.withPreviousVersion(original.getVersion());
}

ChangeEvent changeEvent =
new ChangeEvent()
.withId(UUID.randomUUID())
.withEventType(eventType)
.withEntityType(entityType)
.withEntityId(updated.getId())
.withEntityFullyQualifiedName(updated.getFullyQualifiedName())
.withUserName(updated.getUpdatedBy())
.withTimestamp(System.currentTimeMillis())
.withCurrentVersion(updated.getVersion())
.withPreviousVersion(changeDescription.getPreviousVersion())
.withChangeDescription(changeDescription)
.withEntity(updated);

daoCollection.changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent));
LOG.debug(
"Inserted incremental ChangeEvent for {} version {}", entityType, updated.getVersion());
}

/** Remove owner relationship for a given entity */
@Transaction
private void removeOwners(T entity, List<EntityReference> owners) {
Expand Down Expand Up @@ -2786,6 +2802,7 @@ public class EntityUpdater {
protected boolean majorVersionChange = false;
protected final User updatingUser;
private boolean entityChanged = false;
@Getter protected ChangeDescription incrementalChangeDescription = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harshach we will need to introduce the same 'incrementalChangeDescription' field in EntityInterface also

ChangeDescription getIncrementalChangeDescription(); and
void setIncrementalChangeDescription(ChangeDescription incrementalChangeDescription);

this is because when we are making a call to postUpdate and updating the search, we are using

if (entity.getChangeDescription() != null && Objects.equals(entity.getVersion(), entity.getChangeDescription().getPreviousVersion())) {

this needs to be replaced with incrementalChangeDescription as we have removed the consolidated changes for parent.

before making the same changes in pr, i want to confirm if we can go ahead and implement the same in EntityInterface.


public EntityUpdater(T original, T updated, Operation operation) {
this.original = original;
Expand All @@ -2801,20 +2818,24 @@ public EntityUpdater(T original, T updated, Operation operation) {
@Transaction
public final void update() {
boolean consolidateChanges = consolidateChanges(original, updated, operation);
// Revert the changes previously made by the user with in a session and consolidate all the
// changes
incrementalChange();
if (consolidateChanges) {
revert();
}
// Now updated from previous/original to updated one
changeDescription = new ChangeDescription();
updateInternal();

// Store the updated entity
storeUpdate();
postUpdate(original, updated);
}

private void incrementalChange() {
changeDescription = new ChangeDescription();
updateInternal(false);
incrementalChangeDescription = changeDescription;
incrementalChangeDescription.setPreviousVersion(original.getVersion());
}

@Transaction
private void revert() {
// Revert from current version to previous version to go back to the previous version
Expand All @@ -2825,6 +2846,7 @@ private void revert() {
LOG.debug(
"In session change consolidation. Reverting to previous version {}",
previous.getVersion());
changeDescription = new ChangeDescription();
updated = previous;
updateInternal(true);
LOG.info(
Expand Down Expand Up @@ -3262,6 +3284,15 @@ public final boolean fieldsChanged() {
|| !changeDescription.getFieldsDeleted().isEmpty();
}

public final boolean incrementalFieldsChanged() {
if (incrementalChangeDescription == null) {
return false;
}
return !incrementalChangeDescription.getFieldsAdded().isEmpty()
|| !incrementalChangeDescription.getFieldsUpdated().isEmpty()
|| !incrementalChangeDescription.getFieldsDeleted().isEmpty();
}

public final <K> boolean recordChange(String field, K orig, K updated) {
return recordChange(field, orig, updated, false, objectMatch, true);
}
Expand Down Expand Up @@ -3519,9 +3550,7 @@ public static void setSessionTimeout(long timeout) {

private boolean consolidateChanges(T original, T updated, Operation operation) {
// If user is the same and the new update is with in the user session timeout
return !parent // Parent entity shouldn't consolidate changes, as we need ChangeDescription to
// propagate to children
&& original.getVersion() > 0.1 // First update on an entity that
return original.getVersion() > 0.1 // First update on an entity that
&& operation == Operation.PATCH
&& !Boolean.TRUE.equals(original.getDeleted()) // Entity is not soft deleted
&& !operation.isDelete() // Operation must be an update
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ public GlossaryRepository() {
quoteFqn = true;
supportsSearch = true;
renameAllowed = true;
parent = true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,5 @@ public MessagingServiceRepository() {
UPDATE_FIELDS,
ServiceType.MESSAGING);
supportsSearch = true;
parent = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,5 @@ public MetadataServiceRepository() {
UPDATE_FIELDS,
ServiceType.METADATA);
supportsSearch = true;
parent = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,5 @@ public MlModelServiceRepository() {
UPDATE_FIELDS,
ServiceType.ML_MODEL);
supportsSearch = true;
parent = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,5 @@ public PipelineServiceRepository() {
"",
ServiceType.PIPELINE);
supportsSearch = true;
parent = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,5 @@ public SearchServiceRepository() {
"",
ServiceType.SEARCH);
supportsSearch = true;
parent = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,5 @@ public StorageServiceRepository() {
"",
ServiceType.STORAGE);
supportsSearch = true;
parent = true;
}
}
Loading
Loading