Skip to content

Commit

Permalink
Merge pull request #3703 from atlanhq/master-staging-sync
Browse files Browse the repository at this point in the history
Master to Staging Sync
  • Loading branch information
hr2904 authored Nov 7, 2024
2 parents 5733883 + d4ecb76 commit 273a74d
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 13 deletions.
2 changes: 1 addition & 1 deletion README.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.TransactionInterceptHelper;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasMapType;
import org.apache.atlas.utils.AtlasPerfMetrics;
Expand Down Expand Up @@ -359,18 +360,32 @@ public static boolean getRestrictPropagationThroughHierarchy(AtlasVertex classif
return getRestrictPropagation(classificationVertex,CLASSIFICATION_VERTEX_RESTRICT_PROPAGATE_THROUGH_HIERARCHY);
}

public static AtlasVertex getClassificationVertex(AtlasVertex entityVertex, String classificationName) {
public void repairTagVertex(AtlasEdge edge, AtlasVertex classificationVertex) {
LOG.info("Repairing corrupt tag-vertex");
removeEdge(edge);
removeVertex(classificationVertex);
}

public static AtlasVertex getClassificationVertex(GraphHelper graphHelper, AtlasVertex entityVertex, String classificationName) {
AtlasVertex ret = null;
Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL)
.has(CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, false)
.has(CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, classificationName).edges();

if (edges != null) {
Iterator<AtlasEdge> iterator = edges.iterator();

if (iterator.hasNext()) {
while (iterator.hasNext()) {
AtlasEdge edge = iterator.next();

ret = (edge != null) ? edge.getInVertex() : null;
if(Objects.nonNull(edge))
{
AtlasVertex classificationVertex = edge.getInVertex();
if(Objects.nonNull(classificationVertex) && StringUtils.isNotEmpty(classificationVertex.getProperty(TYPE_NAME_PROPERTY_KEY, String.class))) {
return edge.getInVertex();
} else if(graphHelper != null) {
graphHelper.repairTagVertex(edge, edge.getInVertex());
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3565,8 +3565,12 @@ public void deleteClassification(String entityGuid, String classificationName) t

validateClassificationExists(traitNames, classificationName);

AtlasVertex classificationVertex = getClassificationVertex(entityVertex, classificationName);
AtlasVertex classificationVertex = getClassificationVertex(graphHelper, entityVertex, classificationName);

if (Objects.isNull(classificationVertex)) {
LOG.error(AtlasErrorCode.CLASSIFICATION_NOT_FOUND.getFormattedErrorMessage(classificationName));
return;
}
// Get in progress task to see if there already is a propagation for this particular vertex
List<AtlasTask> inProgressTasks = taskManagement.getInProgressTasks();
for (AtlasTask task : inProgressTasks) {
Expand All @@ -3578,7 +3582,8 @@ public void deleteClassification(String entityGuid, String classificationName) t
AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex);

if (classification == null) {
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationName);
LOG.error(AtlasErrorCode.CLASSIFICATION_NOT_FOUND.getFormattedErrorMessage(classificationName));
return;
}

// remove classification from propagated entities if propagation is turned on
Expand Down Expand Up @@ -3778,10 +3783,11 @@ public void updateClassifications(EntityMutationContext context, String guid, Li
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_UPDATE_FROM_PROPAGATED_ENTITY, classificationName);
}

AtlasVertex classificationVertex = getClassificationVertex(entityVertex, classificationName);
AtlasVertex classificationVertex = getClassificationVertex(graphHelper, entityVertex, classificationName);

if (classificationVertex == null) {
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY, classificationName);
LOG.error(AtlasErrorCode.CLASSIFICATION_NOT_FOUND.getFormattedErrorMessage(classificationName));
continue;
}

if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.*;

import static org.apache.atlas.repository.Constants.TASK_GUID;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertex;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationTask.PARAM_CLASSIFICATION_VERTEX_ID;
import static org.apache.atlas.tasks.TaskRegistry.toAtlasTask;
Expand Down Expand Up @@ -185,7 +186,7 @@ private String resolveAndReturnClassificationId(String classificationName, Strin
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, entityGuid);
}

AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(entityVertex, classificationName);
AtlasVertex classificationVertex = getClassificationVertex(null, entityVertex, classificationName);

if (classificationVertex != null) {
ret = classificationVertex.getIdForDisplay();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void add() throws AtlasBaseException {
entityStore.addClassification(Collections.singletonList(HDFS_PATH_EMPLOYEES), tagY);

AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(hdfs_employees.getGuid());
AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(entityVertex, TAG_NAME_X);
AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(null, entityVertex, TAG_NAME_X);

assertNotNull(entityVertex);
assertNotNull(classificationVertex);
Expand All @@ -183,7 +183,7 @@ public void update() throws AtlasBaseException {
entityStore.updateClassifications(hdfs_employees.getGuid(), Collections.singletonList(tagY));

AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(hdfs_employees.getGuid());
AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(entityVertex, TAG_NAME_Y);
AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(null, entityVertex, TAG_NAME_Y);

assertNotNull(RequestContext.get().getQueuedTasks());
assertTrue(RequestContext.get().getQueuedTasks().size() > 0, "No tasks were queued!");
Expand All @@ -204,7 +204,7 @@ public void delete() throws AtlasBaseException {
tagX.setPropagate(false);

AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(hdfs_employees.getGuid());
AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(entityVertex, TAG_NAME);
AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(null, entityVertex, TAG_NAME);

try {
entityStore.deleteClassification(HDFS_PATH_EMPLOYEES, tagX.getTypeName());
Expand Down

0 comments on commit 273a74d

Please sign in to comment.