Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MESH-184 Validations for domainGUIDs de-norm attribute #3442

Merged
merged 21 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ public final class Constants {
public static final String INDEX_SEARCH_TAGS_MAX_QUERY_STR_LENGTH = "atlas.graph.index.search.tags.max-query-str-length";
public static final String INDEX_SEARCH_VERTEX_PREFIX_PROPERTY = "atlas.graph.index.search.vertex.prefix";
public static final String INDEX_SEARCH_VERTEX_PREFIX_DEFAULT = "$v$";
public static final String DOMAIN_GUIDS = "domainGUIDs";

public static final String ATTR_TENANT_ID = "tenantId";
public static final String DEFAULT_TENANT_ID = "default";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
import org.apache.atlas.repository.store.graph.v1.RestoreHandlerV1;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityComparator.AtlasEntityDiffResult;
import org.apache.atlas.repository.store.graph.v2.preprocessor.AssetPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.AuthPolicyPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.ConnectionPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor;
Expand Down Expand Up @@ -1564,25 +1565,23 @@ private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean

private void executePreProcessor(EntityMutationContext context) throws AtlasBaseException {
AtlasEntityType entityType;
PreProcessor preProcessor;
List<PreProcessor> preProcessors;

List<AtlasEntity> copyOfCreated = new ArrayList<>(context.getCreatedEntities());
for (AtlasEntity entity : copyOfCreated) {
entityType = context.getType(entity.getGuid());
preProcessor = getPreProcessor(entityType.getTypeName());

if (preProcessor != null) {
preProcessor.processAttributes(entity, context, CREATE);
preProcessors = getPreProcessor(entityType.getTypeName());
for(PreProcessor processor : preProcessors){
processor.processAttributes(entity, context, CREATE);
}
}

List<AtlasEntity> copyOfUpdated = new ArrayList<>(context.getUpdatedEntities());
for (AtlasEntity entity: copyOfUpdated) {
entityType = context.getType(entity.getGuid());
preProcessor = getPreProcessor(entityType.getTypeName());

if (preProcessor != null) {
preProcessor.processAttributes(entity, context, UPDATE);
preProcessors = getPreProcessor(entityType.getTypeName());
for(PreProcessor processor : preProcessors){
processor.processAttributes(entity, context, UPDATE);
}
}
}
Expand Down Expand Up @@ -1854,80 +1853,83 @@ private void createQualifiedNameHierarchyField(AtlasEntity entity, AtlasVertex v
}


public PreProcessor getPreProcessor(String typeName) {
PreProcessor preProcessor = null;
public List<PreProcessor> getPreProcessor(String typeName) {
List<PreProcessor> preProcessors = new ArrayList<>();

switch (typeName) {
case ATLAS_GLOSSARY_ENTITY_TYPE:
preProcessor = new GlossaryPreProcessor(typeRegistry, entityRetriever);
preProcessors.add(new GlossaryPreProcessor(typeRegistry, entityRetriever));
break;

case ATLAS_GLOSSARY_TERM_ENTITY_TYPE:
preProcessor = new TermPreProcessor(typeRegistry, entityRetriever, graph, taskManagement);
preProcessors.add(new TermPreProcessor(typeRegistry, entityRetriever, graph, taskManagement));
break;

case ATLAS_GLOSSARY_CATEGORY_ENTITY_TYPE:
preProcessor = new CategoryPreProcessor(typeRegistry, entityRetriever, graph, taskManagement, entityGraphMapper);
preProcessors.add(new CategoryPreProcessor(typeRegistry, entityRetriever, graph, taskManagement, entityGraphMapper));
break;

case DATA_DOMAIN_ENTITY_TYPE:
preProcessor = new DataDomainPreProcessor(typeRegistry, entityRetriever, graph);
preProcessors.add(new DataDomainPreProcessor(typeRegistry, entityRetriever, graph));
break;

case DATA_PRODUCT_ENTITY_TYPE:
preProcessor = new DataProductPreProcessor(typeRegistry, entityRetriever, graph, this);
preProcessors.add(new DataProductPreProcessor(typeRegistry, entityRetriever, graph, this));
break;

case QUERY_ENTITY_TYPE:
preProcessor = new QueryPreProcessor(typeRegistry, entityRetriever);
preProcessors.add(new QueryPreProcessor(typeRegistry, entityRetriever));
break;

case QUERY_FOLDER_ENTITY_TYPE:
preProcessor = new QueryFolderPreProcessor(typeRegistry, entityRetriever);
preProcessors.add(new QueryFolderPreProcessor(typeRegistry, entityRetriever));
break;

case QUERY_COLLECTION_ENTITY_TYPE:
preProcessor = new QueryCollectionPreProcessor(typeRegistry, discovery, entityRetriever, featureFlagStore, this);
preProcessors.add(new QueryCollectionPreProcessor(typeRegistry, discovery, entityRetriever, featureFlagStore, this));
break;

case PERSONA_ENTITY_TYPE:
preProcessor = new PersonaPreProcessor(graph, typeRegistry, entityRetriever, this);
preProcessors.add(new PersonaPreProcessor(graph, typeRegistry, entityRetriever, this));
break;

case PURPOSE_ENTITY_TYPE:
preProcessor = new PurposePreProcessor(graph, typeRegistry, entityRetriever, this);
preProcessors.add(new PurposePreProcessor(graph, typeRegistry, entityRetriever, this));
break;

case POLICY_ENTITY_TYPE:
preProcessor = new AuthPolicyPreProcessor(graph, typeRegistry, entityRetriever);
preProcessors.add(new AuthPolicyPreProcessor(graph, typeRegistry, entityRetriever));
break;

case STAKEHOLDER_ENTITY_TYPE:
preProcessor = new StakeholderPreProcessor(graph, typeRegistry, entityRetriever, this);
preProcessors.add(new StakeholderPreProcessor(graph, typeRegistry, entityRetriever, this));
break;

case CONNECTION_ENTITY_TYPE:
preProcessor = new ConnectionPreProcessor(graph, discovery, entityRetriever, featureFlagStore, deleteDelegate, this);
preProcessors.add(new ConnectionPreProcessor(graph, discovery, entityRetriever, featureFlagStore, deleteDelegate, this));
break;

case LINK_ENTITY_TYPE:
preProcessor = new LinkPreProcessor(typeRegistry, entityRetriever);
preProcessors.add(new LinkPreProcessor(typeRegistry, entityRetriever));
break;

case README_ENTITY_TYPE:
preProcessor = new ReadmePreProcessor(typeRegistry, entityRetriever);
preProcessors.add(new ReadmePreProcessor(typeRegistry, entityRetriever));
break;

case CONTRACT_ENTITY_TYPE:
preProcessor = new ContractPreProcessor(graph, typeRegistry, entityRetriever, storeDifferentialAudits, discovery);
preProcessors.add(new ContractPreProcessor(graph, typeRegistry, entityRetriever, storeDifferentialAudits, discovery));
break;

case STAKEHOLDER_TITLE_ENTITY_TYPE:
preProcessor = new StakeholderTitlePreProcessor(graph, typeRegistry, entityRetriever);
preProcessors.add(new StakeholderTitlePreProcessor(graph, typeRegistry, entityRetriever));
break;
}

return preProcessor;
// The default global pre-processor for all AssetTypes
preProcessors.add(new AssetPreProcessor(typeRegistry, entityRetriever));

return preProcessors;
}

private AtlasVertex getResolvedEntityVertex(EntityGraphDiscoveryContext context, AtlasEntity entity) throws AtlasBaseException {
Expand Down Expand Up @@ -1973,9 +1975,9 @@ private EntityMutationResponse deleteVertices(Collection<AtlasVertex> deletionCa
for (AtlasVertex vertex : deletionCandidates) {
String typeName = getTypeName(vertex);

PreProcessor preProcessor = getPreProcessor(typeName);
if (preProcessor != null) {
preProcessor.processDelete(vertex);
List<PreProcessor> preProcessors = getPreProcessor(typeName);
for(PreProcessor processor : preProcessors){
processor.processDelete(vertex);
}

if (ATLAS_GLOSSARY_CATEGORY_ENTITY_TYPE.equals(typeName)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package org.apache.atlas.repository.store.graph.v2.preprocessor ;

import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.authorize.AtlasEntityAccessRequest;
import org.apache.atlas.authorize.AtlasPrivilege;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.*;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.EntityMutationContext;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

import static org.apache.atlas.repository.Constants.*;

public class AssetPreProcessor implements PreProcessor {
private static final Logger LOG = LoggerFactory.getLogger(AssetPreProcessor.class);

private EntityMutationContext context;
private AtlasTypeRegistry typeRegistry;
private EntityGraphRetriever entityRetriever;

private static final Set<String> excludedTypes = new HashSet<>(Arrays.asList(ATLAS_GLOSSARY_ENTITY_TYPE, ATLAS_GLOSSARY_TERM_ENTITY_TYPE, ATLAS_GLOSSARY_CATEGORY_ENTITY_TYPE, DATA_PRODUCT_ENTITY_TYPE, DATA_DOMAIN_ENTITY_TYPE));

public AssetPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever) {
this.typeRegistry = typeRegistry;
this.entityRetriever = entityRetriever;
}

@Override
public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context,
EntityMutations.EntityOperation operation) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("AssetPreProcessor.processAttributes: pre processing {}, {}",
entityStruct.getAttribute(QUALIFIED_NAME), operation);
}
this.context = context;

AtlasEntity entity = (AtlasEntity) entityStruct;

AtlasVertex vertex = context.getVertex(entity.getGuid());

switch (operation) {
case CREATE:
processCreateAsset(entity, vertex);
break;
case UPDATE:
processUpdateAsset(entity, vertex);
break;
}
}

private void processCreateAsset(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processCreateAsset");

processDomainLinkAttribute(entity);

RequestContext.get().endMetricRecord(metricRecorder);
}


private void processUpdateAsset(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateAsset");

processDomainLinkAttribute(entity);

RequestContext.get().endMetricRecord(metricRecorder);

}

private void processDomainLinkAttribute(AtlasEntity entity) throws AtlasBaseException {
if(entity.hasAttribute(DOMAIN_GUIDS)){
validateDomainAssetLinks(entity);
isAuthorized(entity);
}
}

private void validateDomainAssetLinks(AtlasEntity entity) throws AtlasBaseException {
List<String> domainGuids = ( List<String>) entity.getAttribute(DOMAIN_GUIDS);
PRATHAM2002-DS marked this conversation as resolved.
Show resolved Hide resolved

if(CollectionUtils.isNotEmpty(domainGuids)){
if(domainGuids.size() > 1) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Asset can be linked to only one domain");
}

if (excludedTypes.contains(entity.getTypeName())) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "This AssetType is not allowed to link with Domain", entity.getTypeName());
}

for(String domainGuid : domainGuids) {
PRATHAM2002-DS marked this conversation as resolved.
Show resolved Hide resolved
AtlasVertex domainVertex = entityRetriever.getEntityVertex(domainGuid);
if(domainVertex == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, domainGuid);
}

String domainEntityType = domainVertex.getProperty(TYPE_NAME_PROPERTY_KEY, String.class);

if (!Objects.equals(domainEntityType, DATA_DOMAIN_ENTITY_TYPE)){
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Asset can be linked to only domain");
}
}
}
}

private void isAuthorized(AtlasEntity entity) throws AtlasBaseException {
AtlasEntityHeader sourceEntity = new AtlasEntityHeader(entity);

// source -> UPDATE + READ
AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, sourceEntity),
PRATHAM2002-DS marked this conversation as resolved.
Show resolved Hide resolved
"update on source Entity, link/unlink operation denied: ", sourceEntity.getAttribute(NAME));

AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ, sourceEntity),
"read on source Entity, link/unlink operation denied: ", sourceEntity.getAttribute(NAME));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.atlas.authorize.AtlasPrivilege;
import org.apache.atlas.discovery.EntityDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.IndexSearchParams;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
Expand Down Expand Up @@ -69,6 +70,7 @@ public abstract class AbstractDomainPreProcessor implements PreProcessor {

private static final Set<String> POLICY_ATTRIBUTES_FOR_SEARCH = new HashSet<>(Arrays.asList(ATTR_POLICY_RESOURCES));
private static final Set<String> STAKEHOLDER_ATTRIBUTES_FOR_SEARCH = new HashSet<>(Arrays.asList(ATTR_DOMAIN_QUALIFIED_NAMES, ATTR_DOMAIN_QUALIFIED_NAME));
private static final Set<String> DOMAIN_GUID_ATTR = new HashSet<>(Arrays.asList(DOMAIN_GUIDS));

static final Set<String> PARENT_ATTRIBUTES = new HashSet<>(Arrays.asList(SUPER_DOMAIN_QN_ATTR, PARENT_DOMAIN_QN_ATTR));

Expand Down Expand Up @@ -285,6 +287,47 @@ protected List<AtlasEntityHeader> getPolicies(Set<String> resources) throws Atla
}
}

protected Boolean hasLinkedAssets(String domainGuid) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("isAssetLinked");
boolean exists = false;
try {
List<Map<String, Object>> mustClauseList = new ArrayList<>();
mustClauseList.add(mapOf("term", mapOf(DOMAIN_GUIDS, domainGuid)));

Map<String, Object> bool = new HashMap<>();
bool.put("must", mustClauseList);

Map<String, Object> dsl = mapOf("query", mapOf("bool", bool));

boolean hasLinkedAsset = fetchLinkedAssets(dsl, DOMAIN_GUID_ATTR, this.discovery);
if (hasLinkedAsset) {
PRATHAM2002-DS marked this conversation as resolved.
Show resolved Hide resolved
exists = true;
}

return exists;

} finally {
RequestContext.get().endMetricRecord(metricRecorder);
}
}

protected static Boolean fetchLinkedAssets(Map<String, Object> dsl, Set<String> attributes, EntityDiscoveryService discovery) throws AtlasBaseException {
PRATHAM2002-DS marked this conversation as resolved.
Show resolved Hide resolved
IndexSearchParams searchParams = new IndexSearchParams();
boolean exists = false;

searchParams.setAttributes(attributes);
dsl.put("from", 0);
dsl.put("size", 1);
searchParams.setDsl(dsl);

List<AtlasEntityHeader> headers = discovery.directIndexSearch(searchParams).getEntities();

if (CollectionUtils.isNotEmpty(headers)) {
exists = true;
}
return exists;
}

protected List<AtlasEntityHeader> getStakeholderTitlesAndStakeholders(Set<String> qualifiedNames) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getStakeholderTitlesAndStakeholders");
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,19 @@ private void validateStakeholderRelationship(AtlasEntity entity) throws AtlasBas
throw new AtlasBaseException(AtlasErrorCode.OPERATION_NOT_SUPPORTED, "Managing Stakeholders while creating/updating a domain");
}
}

@Override
public void processDelete(AtlasVertex vertex) throws AtlasBaseException {
String domainGuid = GraphHelper.getGuid(vertex);

if(LOG.isDebugEnabled()) {
LOG.debug("DataDomainPreProcessor.processDelete: pre processing {}", domainGuid);
}

if (hasLinkedAssets(domainGuid)) {
throw new AtlasBaseException(AtlasErrorCode.OPERATION_NOT_SUPPORTED, "Domain cannot be deleted because some assets are linked to this domain");
}
}
}


Loading