Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DG-1746 Support custom relationshipDef for any Assets linking #3479

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
39 changes: 39 additions & 0 deletions addons/policies/bootstrap_relationship_policies.json
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,45 @@
"remove-relationship"
]
}
},

{
"typeName": "AuthPolicy",
"attributes":
{
"name": "LINK_ASSETS_USER_DEFINED_RELATIONSHIP",
"qualifiedName": "LINK_ASSETS_USER_DEFINED_RELATIONSHIP",
"policyCategory": "bootstrap",
"policySubCategory": "default",
"policyServiceName": "atlas",
"policyType": "allow",
"policyUsers":
[],
"policyGroups":
[],
"policyRoles":
[
"$admin",
"$api-token-default-access"
],
"policyResourceCategory": "RELATIONSHIP",
"policyResources":
[
"relationship-type:UserDefRelationship",
"end-one-entity-type:Referenceable",
"end-two-entity-type:Referenceable",
"end-one-entity-classification:*",
"end-two-entity-classification:*",
"end-one-entity:*",
"end-two-entity:*"
],
"policyActions":
[
"add-relationship",
"update-relationship",
"remove-relationship"
]
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ public final class Constants {
public static final String INPUT_PORT_PRODUCT_EDGE_LABEL = "__Asset.inputPortDataProducts";
public static final String OUTPUT_PORT_PRODUCT_EDGE_LABEL = "__Asset.outputPortDataProducts";

public static final String UD_RELATIONSHIP_EDGE_LABEL = "__Referenceable.userDefRelationshipTo";
public static final String UD_RELATIONSHIP_END_NAME_FROM = "userDefRelationshipFrom";
public static final String UD_RELATIONSHIP_END_NAME_TO = "userDefRelationshipTo";

/**
* SQL property keys.
*/
Expand Down Expand Up @@ -260,6 +264,7 @@ public final class Constants {
public static final String INDEX_PREFIX = "janusgraph_";

public static final String VERTEX_INDEX_NAME = INDEX_PREFIX + VERTEX_INDEX;
public static final String EDGE_INDEX_NAME = INDEX_PREFIX + EDGE_INDEX;

public static final String NAME = "name";
public static final String QUALIFIED_NAME = "qualifiedName";
Expand Down
4 changes: 3 additions & 1 deletion intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ public enum AtlasConfiguration {

INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS("atlas.indexsearch.async.search.keep.alive.time.in.seconds", 300),

ATLAS_MAINTENANCE_MODE("atlas.maintenance.mode", false);
ATLAS_MAINTENANCE_MODE("atlas.maintenance.mode", false),

ATLAS_UD_RELATIONSHIPS_MAX_COUNT("atlas.ud.relationship.max.count", 100);


private static final Configuration APPLICATION_PROPERTIES;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class IndexSearchParams extends SearchParams {
* */
private boolean allowDeletedRelations;
private boolean accessControlExclusive;
private boolean includeRelationshipAttributes;

@Override
public String getQuery() {
Expand Down Expand Up @@ -79,6 +80,14 @@ public void setRelationAttributes(Set<String> relationAttributes) {
this.relationAttributes = relationAttributes;
}

public boolean isIncludeRelationshipAttributes() {
return includeRelationshipAttributes;
}

public void setIncludeRelationshipAttributes(boolean includeRelationshipAttributes) {
this.includeRelationshipAttributes = includeRelationshipAttributes;
}

@Override
public String toString() {
return "IndexSearchParams{" +
Expand All @@ -88,6 +97,7 @@ public String toString() {
", queryString='" + queryString + '\'' +
", allowDeletedRelations=" + allowDeletedRelations +
", accessControlExclusive=" + accessControlExclusive +
", includeRelationshipAttributes=" + includeRelationshipAttributes +
", utmTags="+ getUtmTags() +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,10 @@ public AtlasObjectId getNormalizedValue(Object obj) {
private boolean isValidMap(Map map) {
Object guid = map.get(AtlasObjectId.KEY_GUID);

if (map.containsKey(AtlasRelatedObjectId.KEY_RELATIONSHIP_ATTRIBUTES) && !map.containsKey(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE)) {
return false;
}

if (guid != null && StringUtils.isNotEmpty(guid.toString())) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sample

"appendRelationshipAttributes": {
                "userDefRelationshipTo": [
                    {
                        "guid": "6549985b-b6cc-430f-9042-cbb515847536",
                        "typeName": "Table",
                        "relationshipType": "UserDefRelationship",
                        "relationshipAttributes": {
                            "attributes": {
                                "toType": "Classifiedby",
                                "fromType": "Isa"
                            }
                        }
                    }
                ],
}

return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ AtlasSearchResult searchUsingBasicQuery(String query, String type, String classi
*/
AtlasSearchResult directIndexSearch(SearchParams searchParams) throws AtlasBaseException;

/**
* Search for direct ES query in janusgraph_edge_index
* @param searchParams Search criteria
* @return Matching entities
* @throws AtlasBaseException
*/
AtlasSearchResult directRelationshipIndexSearch(SearchParams searchParams) throws AtlasBaseException;

/**
* Search for direct ES query on search logs index
* @param searchParams Search criteria
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.atlas.repository.userprofile.UserProfileService;
import org.apache.atlas.repository.util.AccessControlUtils;
import org.apache.atlas.searchlog.ESSearchLogger;
import org.apache.atlas.service.FeatureFlagStore;
import org.apache.atlas.stats.StatsClient;
import org.apache.atlas.type.*;
import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
Expand Down Expand Up @@ -976,6 +975,7 @@ public AtlasSearchResult directIndexSearch(SearchParams searchParams) throws Atl
IndexSearchParams params = (IndexSearchParams) searchParams;
RequestContext.get().setRelationAttrsForSearch(params.getRelationAttributes());
RequestContext.get().setAllowDeletedRelationsIndexsearch(params.isAllowDeletedRelations());
RequestContext.get().setIncludeRelationshipAttributes(params.isIncludeRelationshipAttributes());

AtlasSearchResult ret = new AtlasSearchResult();
AtlasIndexQuery indexQuery;
Expand Down Expand Up @@ -1013,6 +1013,38 @@ public AtlasSearchResult directIndexSearch(SearchParams searchParams) throws Atl
return ret;
}

@Override
public AtlasSearchResult directRelationshipIndexSearch(SearchParams searchParams) throws AtlasBaseException {
AtlasSearchResult ret = new AtlasSearchResult();
AtlasIndexQuery indexQuery;

ret.setSearchParameters(searchParams);
ret.setQueryType(AtlasQueryType.INDEX);

try {
if(LOG.isDebugEnabled()){
LOG.debug("Performing ES relationship search for the params ({})", searchParams);
}

indexQuery = graph.elasticsearchQuery(EDGE_INDEX_NAME);
AtlasPerfMetrics.MetricRecorder elasticSearchQueryMetric = RequestContext.get().startMetricRecord("elasticSearchQueryEdge");
DirectIndexQueryResult indexQueryResult = indexQuery.vertices(searchParams);
if (indexQueryResult == null) {
return null;
}
RequestContext.get().endMetricRecord(elasticSearchQueryMetric);

//Note: AtlasSearchResult.entities are not supported yet

ret.setAggregations(indexQueryResult.getAggregationMap());
ret.setApproximateCount(indexQuery.vertexTotals());
} catch (Exception e) {
LOG.error("Error while performing direct relationship search for the params ({}), {}", searchParams, e.getMessage());
throw e;
}
return ret;
}

@Override
public SearchLogSearchResult searchLogs(SearchLogSearchParams searchParams) throws AtlasBaseException {
SearchLogSearchResult ret = new SearchLogSearchResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,9 @@ private void initialize(AtlasGraph graph) throws RepositoryException, IndexExcep
createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, Arrays.asList(CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY));

// create edge indexes
createEdgeIndex(management, RELATIONSHIP_GUID_PROPERTY_KEY, String.class, SINGLE, true);
createEdgeIndex(management, EDGE_ID_IN_IMPORT_KEY, String.class, SINGLE, true);
createEdgeIndex(management, ATTRIBUTE_INDEX_PROPERTY_KEY, Integer.class, SINGLE, true);
createEdgeIndex(management, RELATIONSHIP_GUID_PROPERTY_KEY, String.class, SINGLE, true, false);
sumandas0 marked this conversation as resolved.
Show resolved Hide resolved
createEdgeIndex(management, EDGE_ID_IN_IMPORT_KEY, String.class, SINGLE, true, false);
createEdgeIndex(management, ATTRIBUTE_INDEX_PROPERTY_KEY, Integer.class, SINGLE, true, false);

// create fulltext indexes
createFullTextIndex(management, ENTITY_TEXT_PROPERTY_KEY, String.class, SINGLE);
Expand Down Expand Up @@ -647,21 +647,22 @@ private void createIndexForAttribute(AtlasGraphManagement management, AtlasStruc
createEdgeLabel(management, propertyName);

} else if (isBuiltInType || isArrayOfPrimitiveType || isArrayOfEnum) {
if (isRelationshipType(atlasType)) {
createEdgeIndex(management, propertyName, getPrimitiveClass(attribTypeName), cardinality, false);
Class primitiveClassType;
boolean isStringField = false;

if (isArrayOfEnum) {
primitiveClassType = String.class;
} else {
Class primitiveClassType;
boolean isStringField = false;
primitiveClassType = isArrayOfPrimitiveType ? getPrimitiveClass(arrayElementType.getTypeName()): getPrimitiveClass(attribTypeName);
}

if (isArrayOfEnum) {
primitiveClassType = String.class;
} else {
primitiveClassType = isArrayOfPrimitiveType ? getPrimitiveClass(arrayElementType.getTypeName()): getPrimitiveClass(attribTypeName);
}
if(primitiveClassType == String.class) {
isStringField = AtlasAttributeDef.IndexType.STRING.equals(indexType);
}

if(primitiveClassType == String.class) {
isStringField = AtlasAttributeDef.IndexType.STRING.equals(indexType);
}
if (isRelationshipType(atlasType)) {
createEdgeIndex(management, propertyName, getPrimitiveClass(attribTypeName), cardinality, false, isStringField);
} else {

createVertexIndex(management, propertyName, UniqueKind.NONE, primitiveClassType, cardinality, isIndexable, false, isStringField, indexTypeESConfig, indexTypeESFields);

Expand All @@ -671,10 +672,10 @@ private void createIndexForAttribute(AtlasGraphManagement management, AtlasStruc

}
} else if (isEnumType(attributeType)) {
boolean isStringField = AtlasAttributeDef.IndexType.STRING.equals(indexType);
if (isRelationshipType(atlasType)) {
createEdgeIndex(management, propertyName, String.class, cardinality, false);
createEdgeIndex(management, propertyName, String.class, cardinality, false, isStringField);
} else {
boolean isStringField = AtlasAttributeDef.IndexType.STRING.equals(indexType);
createVertexIndex(management, propertyName, UniqueKind.NONE, String.class, cardinality, isIndexable, false, isStringField, indexTypeESConfig, indexTypeESFields);

if (uniqPropName != null) {
Expand Down Expand Up @@ -908,21 +909,32 @@ private void createVertexCentricIndex(AtlasGraphManagement management, String ed


private void createEdgeIndex(AtlasGraphManagement management, String propertyName, Class propertyClass,
AtlasCardinality cardinality, boolean createCompositeIndex) {
AtlasCardinality cardinality, boolean createCompositeIndex, boolean isStringField) {
if (propertyName != null) {
AtlasPropertyKey propertyKey = management.getPropertyKey(propertyName);
boolean enforceMixedIndex = false;
//management.getGraphIndex("edge_index").getFieldKeys().stream().map(x -> x.getName()).collect(Collectors.toSet());

if (propertyKey == null) {
propertyKey = management.makePropertyKey(propertyName, propertyClass, cardinality);
if (propertyKey != null) {
// validate property present for EDGE_INDEX or not
// there are properties like __typeName, __state which might have been added as MixedIndex for VERTEX_INDEX but not for EDGE_INDEX
// in such case, enforceMixedIndex will ensure creation of nixed index for EDDGE_INDEX for the property
enforceMixedIndex = !management.getGraphIndex(EDGE_INDEX).getFieldKeys().contains(propertyKey);
}

if (propertyKey == null || enforceMixedIndex) {
if (propertyKey == null) {
propertyKey = management.makePropertyKey(propertyName, propertyClass, cardinality);
}

if (isIndexApplicable(propertyClass, cardinality)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating backing index for edge property {} of type {} ", propertyName, propertyClass.getName());
}

management.addMixedIndex(EDGE_INDEX, propertyKey, false);
management.addMixedIndex(EDGE_INDEX, propertyKey, isStringField);

LOG.info("Created backing index for edge property {} of type {} ", propertyName, propertyClass.getName());
LOG.info("Created backing index for edge property {} of type {} on index {}", propertyName, propertyClass.getName(), EDGE_INDEX);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void deleteRelationships(Collection<AtlasEdge> edges, final boolean force
}
continue;
}
deleteEdge(edge, isInternal || forceDelete);
deleteEdge(edge, isInternal || forceDelete || isCustomRelationship(edge));
}
}

Expand Down Expand Up @@ -381,7 +381,7 @@ public boolean deleteEdgeReference(AtlasEdge edge, TypeCategory typeCategory, bo
// for relationship edges, inverse vertex's relationship attribute doesn't need to be updated.
// only delete the reference relationship edge
if (GraphHelper.isRelationshipEdge(edge)) {
deleteEdge(edge, isInternalType);
deleteEdge(edge, isInternalType || isCustomRelationship(edge));
AtlasVertex referencedVertex = entityRetriever.getReferencedEntityVertex(edge, relationshipDirection, entityVertex);

if (referencedVertex != null) {
Expand All @@ -398,7 +398,7 @@ public boolean deleteEdgeReference(AtlasEdge edge, TypeCategory typeCategory, bo
//legacy case - not a relationship edge
//If deleting just the edge, reverse attribute should be updated for any references
//For example, for the department type system, if the person's manager edge is deleted, subordinates of manager should be updated
deleteEdge(edge, true, isInternalType);
deleteEdge(edge, true, isInternalType || isCustomRelationship(edge));
}
}

Expand Down Expand Up @@ -976,7 +976,8 @@ protected void deleteEdgeBetweenVertices(AtlasVertex outVertex, AtlasVertex inVe
}

if (edge != null) {
deleteEdge(edge, isInternalType(inVertex) && isInternalType(outVertex));
boolean isInternal = isInternalType(inVertex) && isInternalType(outVertex);
deleteEdge(edge, isInternal || isCustomRelationship(edge));

final RequestContext requestContext = RequestContext.get();
final String outId = GraphHelper.getGuid(outVertex);
Expand Down Expand Up @@ -1059,6 +1060,10 @@ private boolean isInternalType(final AtlasVertex instanceVertex) {
return Objects.nonNull(entityType) && entityType.isInternalType();
}

private boolean isCustomRelationship(final AtlasEdge edge) {
return edge.getLabel().equals(UD_RELATIONSHIP_EDGE_LABEL);
}

private void addToPropagatedClassificationNames(AtlasVertex entityVertex, String classificationName) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding property {} = \"{}\" to vertex {}", PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, classificationName, string(entityVertex));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.atlas.repository.graphdb.janus.JanusUtils;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasRelationshipType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
Expand Down Expand Up @@ -100,6 +99,8 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
private static final String END_2_DOC_ID_KEY = "end2DocId";
private static final String ES_DOC_ID_MAP_KEY = "esDocIdMap";

private static final String UD_RELATIONSHIP_TYPE_NAME = "UserDefRelationship";

private static Set<String> EXCLUDE_MUTATION_REL_TYPE_NAMES = new HashSet<String>() {{
add(REL_DOMAIN_TO_DOMAINS);
add(REL_DOMAIN_TO_PRODUCTS);
Expand Down Expand Up @@ -140,6 +141,10 @@ public AtlasRelationship create(AtlasRelationship relationship) throws AtlasBase
AtlasVertex end1Vertex = getVertexFromEndPoint(relationship.getEnd1());
AtlasVertex end2Vertex = getVertexFromEndPoint(relationship.getEnd2());

if (relationship.getTypeName().equals(UD_RELATIONSHIP_TYPE_NAME)) {
EntityGraphMapper.validateCustomRelationship(end1Vertex, end2Vertex);
}

AtlasEdge edge = createRelationship(end1Vertex, end2Vertex, relationship);

AtlasRelationship ret = edge != null ? entityRetriever.mapEdgeToAtlasRelationship(edge) : null;
Expand Down Expand Up @@ -676,6 +681,9 @@ private void validateRelationship(AtlasVertex end1Vertex, AtlasVertex end2Vertex
}

relationshipType.getNormalizedValue(relationship);

Map<String, Object> relAttrs = relationship.getAttributes();
EntityGraphMapper.validateCustomRelationshipAttributeValueCase(relAttrs);
}


Expand Down
Loading
Loading