Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DG-1442 Modification of the PreProcessor for Custom Sort - Bulk Update Edge Cases #3265

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
7354911
Init commmit, added lexorank generation util lib and logic for append…
hr2904 May 22, 2024
22326c8
Added method to append lexicographicSortOrder attribute to any Glossa…
hr2904 May 22, 2024
7af1a45
Added LexoRank Validation method, with tentative rebalancing trigger …
hr2904 May 28, 2024
9ca21c5
Corrected a method rename
hr2904 May 28, 2024
a51e3ba
Added a check in validateLexoRank which checks if another entity with…
hr2904 May 28, 2024
4303ae7
Added a check , such that if bulk request is from migration, it will …
hr2904 May 29, 2024
d0ee0bc
Modified ES query.
hr2904 Jun 2, 2024
de5cead
Modified an edge statement such that, when a new lexorank for cat is …
hr2904 Jun 3, 2024
14ab6e6
Reverted the check, instead added padded new offset for terms.
hr2904 Jun 3, 2024
f35949f
fixed the caching logic, by adding the bifurcation for terms and cate…
hr2904 Jun 4, 2024
42588b3
Fixed PR comments
hr2904 Jun 12, 2024
27da407
Added caching for same ranks in same request, for prevention of dupli…
hr2904 Jun 12, 2024
c541587
Fixed a minor bug where an empty lexo attribute in update call was re…
hr2904 Jun 14, 2024
80335ab
Fixed PR comments
hr2904 Jun 18, 2024
ab3082f
Reverted the AbstractGlossaryPreProcessor file , previously edited.
hr2904 Jun 18, 2024
7217dcb
Fixed PR comments.
hr2904 Jun 20, 2024
6c4aa10
fixed attribute name in dsl query
hr2904 Jun 25, 2024
c4e1cbb
Fixed a caching logic while checking for duplicate ranks.
hr2904 Jun 28, 2024
5ee4f53
Removed term-category bifurcation as we are now allowing term-categor…
hr2904 Jul 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions intg/src/main/java/org/apache/atlas/type/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ public final class Constants {
public static final String GLOSSARY_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "glossary");
public static final String CATEGORIES_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "categories");
public static final String CATEGORIES_PARENT_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "parentCategory");

public static final String MEANINGS_TEXT_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "meaningsText");
public static final String MEANING_NAMES_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "meaningNames");
public static final String HAS_LINEAGE = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "hasLineage");
public static final String HAS_LINEAGE_VALID = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "hasLineageValid");
public static final String LEXICOGRAPHICAL_SORT_ORDER = "lexicographicalSortOrder";

//Classification-Only System Attributes
public static final String CLASSIFICATION_ENTITY_STATUS_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "entityStatus");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public abstract class GlossaryUtils {
public static final String TERM_ASSIGNMENT_ATTR_SOURCE = "source";

static final String ATLAS_GLOSSARY_TYPENAME = "AtlasGlossary";
static final String ATLAS_GLOSSARY_TERM_TYPENAME = "AtlasGlossaryTerm";
static final String ATLAS_GLOSSARY_CATEGORY_TYPENAME = "AtlasGlossaryCategory";
public static final String ATLAS_GLOSSARY_TERM_TYPENAME = "AtlasGlossaryTerm";
public static final String ATLAS_GLOSSARY_CATEGORY_TYPENAME = "AtlasGlossaryCategory";

public static final String NAME = "name";
public static final String QUALIFIED_NAME = "qualifiedName";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1821,7 +1821,7 @@ public PreProcessor getPreProcessor(String typeName) {

switch (typeName) {
case ATLAS_GLOSSARY_ENTITY_TYPE:
preProcessor = new GlossaryPreProcessor(typeRegistry, entityRetriever);
preProcessor = new GlossaryPreProcessor(typeRegistry, entityRetriever, graph);
break;

case ATLAS_GLOSSARY_TERM_ENTITY_TYPE:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.apache.atlas.repository.store.graph.v2.preprocessor;
nikhilbonte21 marked this conversation as resolved.
Show resolved Hide resolved

import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.discovery.EntityDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.IndexSearchParams;
Expand All @@ -14,18 +15,22 @@
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.NanoIdUtils;
import org.apache.atlas.util.lexoRank.LexoRank;
import org.apache.atlas.utils.AtlasEntityUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.atlas.repository.Constants.QUERY_COLLECTION_ENTITY_TYPE;
import static org.apache.atlas.repository.Constants.QUALIFIED_NAME;
import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
import static org.apache.atlas.glossary.GlossaryUtils.ATLAS_GLOSSARY_CATEGORY_TYPENAME;
import static org.apache.atlas.glossary.GlossaryUtils.ATLAS_GLOSSARY_TERM_TYPENAME;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;
import static org.apache.atlas.type.Constants.LEXICOGRAPHICAL_SORT_ORDER;

public class PreProcessorUtils {
private static final Logger LOG = LoggerFactory.getLogger(PreProcessorUtils.class);
Expand All @@ -39,6 +44,8 @@ public class PreProcessorUtils {
public static final String CATEGORY_CHILDREN = "childrenCategories";
public static final String GLOSSARY_TERM_REL_TYPE = "AtlasGlossaryTermAnchor";
public static final String GLOSSARY_CATEGORY_REL_TYPE = "AtlasGlossaryCategoryAnchor";
public static final String INIT_LEXORANK_OFFSET = "0|100000:";
public static final String INIT_TERM_LEXORANK_OFFSET = "0|500000:";

//DataMesh models constants
public static final String PARENT_DOMAIN_REL_TYPE = "parentDomain";
Expand All @@ -52,6 +59,8 @@ public class PreProcessorUtils {

public static final String DATA_PRODUCT_EDGE_LABEL = "__DataDomain.dataProducts";
public static final String DOMAIN_PARENT_EDGE_LABEL = "__DataDomain.subDomains";
public static final String STAKEHOLDER_EDGE_LABEL = "__DataDomain.stakeholders";


public static final String PARENT_DOMAIN_QN_ATTR = "parentDomainQualifiedName";
public static final String SUPER_DOMAIN_QN_ATTR = "superDomainQualifiedName";
Expand Down Expand Up @@ -83,6 +92,13 @@ public enum MigrationStatus {

public static final String CHILDREN_QUERIES = "__Namespace.childrenQueries";
public static final String CHILDREN_FOLDERS = "__Namespace.childrenFolders";
public static final int REBALANCING_TRIGGER = 119;
public static final int PRE_DELIMITER_LENGTH = 9;
public static final String LEXORANK_HARD_LIMIT = "" + (256 - PRE_DELIMITER_LENGTH);
public static final String LEXORANK_VALID_REGEX = "^0\\|[0-9a-z]{6}:(?:[0-9a-z]{0," + LEXORANK_HARD_LIMIT + "})?$";
public static final Set<String> ATTRIBUTES = new HashSet<>(Arrays.asList("lexicographicalSortOrder"));

public static final Pattern LEXORANK_VALIDITY_PATTERN = Pattern.compile(LEXORANK_VALID_REGEX);

public static String getUUID(){
return NanoIdUtils.randomNanoId();
Expand Down Expand Up @@ -200,4 +216,173 @@ public static void verifyDuplicateAssetByName(String typeName, String assetName,
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, errorMessage);
}
}

public static void isValidLexoRank(String entityType, String inputLexorank, String glossaryQualifiedName, String parentQualifiedName, EntityDiscoveryService discovery) throws AtlasBaseException {

Matcher matcher = LEXORANK_VALIDITY_PATTERN.matcher(inputLexorank);

if(!matcher.matches() || StringUtils.isEmpty(inputLexorank)){
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Invalid value for lexicographicalSortOrder attribute");
}
// TODO : Need to discuss either to remove this after migration is successful on all tenants and custom-sort is successfully GA or keep it for re-balancing WF
Boolean requestFromMigration = RequestContext.get().getRequestContextHeaders().getOrDefault("x-atlan-request-id", "").contains("custom-sort-migration");
if(requestFromMigration) {
return;
}
Map<String, String> lexoRankCache = RequestContext.get().getLexoRankCache();
if(Objects.isNull(lexoRankCache)) {
lexoRankCache = new HashMap<>();
}
String cacheKey = entityType + "-" + glossaryQualifiedName + "-" + parentQualifiedName;
if(lexoRankCache.containsKey(cacheKey)){
hr2904 marked this conversation as resolved.
Show resolved Hide resolved
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Duplicate value for the attribute :" + LEXICOGRAPHICAL_SORT_ORDER +" found");
}
Map<String, Object> dslQuery = createDSLforCheckingPreExistingLexoRank(entityType.equals(ATLAS_GLOSSARY_TERM_TYPENAME), inputLexorank, glossaryQualifiedName, parentQualifiedName);
List<AtlasEntityHeader> assetsWithDuplicateRank = new ArrayList<>();
try {
IndexSearchParams searchParams = new IndexSearchParams();
searchParams.setDsl(dslQuery);
assetsWithDuplicateRank = discovery.directIndexSearch(searchParams).getEntities();
} catch (AtlasBaseException e) {
LOG.error("IndexSearch Error Occured : " + e.getMessage());
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Something went wrong with IndexSearch");
}

if (!CollectionUtils.isEmpty(assetsWithDuplicateRank)) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Duplicate Lexorank found");
}

lexoRankCache.put(cacheKey, inputLexorank);
RequestContext.get().setLexoRankCache(lexoRankCache);
// TODO : Add the rebalancing logic here
// int colonIndex = inputLexorank.indexOf(":");
// if (colonIndex != -1 && inputLexorank.substring(colonIndex + 1).length() >= REBALANCING_TRIGGER) {
// Rebalancing trigger
// }
}

public static void assignNewLexicographicalSortOrder(AtlasEntity entity, String glossaryQualifiedName, String parentQualifiedName, EntityDiscoveryService discovery) throws AtlasBaseException{
Map<String, String> lexoRankCache = RequestContext.get().getLexoRankCache();

if(Objects.isNull(lexoRankCache)) {
lexoRankCache = new HashMap<>();
}
String lexoRank = "";
String lastLexoRank = "";
boolean isTerm = entity.getTypeName().equals(ATLAS_GLOSSARY_TERM_TYPENAME);
String cacheKey = entity.getTypeName() + "-" + glossaryQualifiedName + "-" + parentQualifiedName;

if(lexoRankCache.containsKey(cacheKey)) {
lastLexoRank = lexoRankCache.get(cacheKey);
} else {

List<AtlasEntityHeader> categories = null;
Map<String, Object> dslQuery = generateDSLQueryForLastChild(glossaryQualifiedName, parentQualifiedName, isTerm);
try {
IndexSearchParams searchParams = new IndexSearchParams();
searchParams.setAttributes(ATTRIBUTES);
searchParams.setDsl(dslQuery);
categories = discovery.directIndexSearch(searchParams).getEntities();
} catch (AtlasBaseException e) {
e.printStackTrace();
throw new AtlasBaseException("Something went wrong in assigning lexicographicalSortOrder");
}

if (CollectionUtils.isNotEmpty(categories)) {
AtlasEntityHeader category = categories.get(0);
String lexicographicalSortOrder = (String) category.getAttribute(LEXICOGRAPHICAL_SORT_ORDER);
if (StringUtils.isNotEmpty(lexicographicalSortOrder)) {
lastLexoRank = lexicographicalSortOrder;
} else {
lastLexoRank = isTerm ? INIT_TERM_LEXORANK_OFFSET : INIT_LEXORANK_OFFSET;
}
} else {
lastLexoRank = isTerm ? INIT_TERM_LEXORANK_OFFSET : INIT_LEXORANK_OFFSET;
}
}

LexoRank parsedLexoRank = LexoRank.parse(lastLexoRank);
LexoRank nextLexoRank = parsedLexoRank.genNext().genNext();
lexoRank = nextLexoRank.toString();

entity.setAttribute(LEXICOGRAPHICAL_SORT_ORDER, lexoRank);
lexoRankCache.put(cacheKey, lexoRank);
RequestContext.get().setLexoRankCache(lexoRankCache);
}

public static Map<String, Object> createDSLforCheckingPreExistingLexoRank(boolean isTerm, String lexoRank, String glossaryQualifiedName, String parentQualifiedName) {

Map<String, Object> boolMap = buildBoolQueryDuplicateLexoRank(isTerm, lexoRank, glossaryQualifiedName, parentQualifiedName);

Map<String, Object> dsl = new HashMap<>();
dsl.put("from", 0);
dsl.put("size", 1);
dsl.put("query", mapOf("bool", boolMap));

return dsl;
}

private static Map<String, Object> buildBoolQueryDuplicateLexoRank(boolean isTerm, String lexoRank, String glossaryQualifiedName, String parentQualifiedName) {
Map<String, Object> boolFilter = new HashMap<>();
List<Map<String, Object>> mustArray = new ArrayList<>();
mustArray.add(mapOf("term", mapOf("__state", "ACTIVE")));
mustArray.add(mapOf("term", mapOf(LEXICOGRAPHICAL_SORT_ORDER, lexoRank)));
if(StringUtils.isNotEmpty(glossaryQualifiedName)) {
mustArray.add(mapOf("terms", mapOf("__typeName.keyword", Arrays.asList(ATLAS_GLOSSARY_TERM_TYPENAME, ATLAS_GLOSSARY_CATEGORY_TYPENAME))));
mustArray.add(mapOf("term", mapOf("__glossary", glossaryQualifiedName)));
String parentAttribute = isTerm ? "__categories" : "__parentCategory";
if(StringUtils.isEmpty(parentQualifiedName)) {
boolFilter.put("must_not", Arrays.asList(mapOf("exists", mapOf("field", parentAttribute))));
} else {
mustArray.add(mapOf("term", mapOf(parentAttribute, parentQualifiedName)));
}
} else{
mustArray.add(mapOf("terms", mapOf("__typeName.keyword", Arrays.asList(ATLAS_GLOSSARY_ENTITY_TYPE))));
}

boolFilter.put("must", mustArray);

return boolFilter;
}

public static Map<String, Object> generateDSLQueryForLastChild(String glossaryQualifiedName, String parentQualifiedName, boolean isTerm) {
hr2904 marked this conversation as resolved.
Show resolved Hide resolved

Map<String, Object> sortKeyOrder = mapOf(LEXICOGRAPHICAL_SORT_ORDER, mapOf("order", "desc"));

Object[] sortArray = {sortKeyOrder};

Map<String, Object> boolMap = buildBoolQuery(glossaryQualifiedName, parentQualifiedName, isTerm);

Map<String, Object> dsl = new HashMap<>();
dsl.put("from", 0);
dsl.put("size", 1);
dsl.put("sort", sortArray);
dsl.put("query", mapOf("bool", boolMap));

return dsl;
}

private static Map<String, Object> buildBoolQuery(String glossaryQualifiedName, String parentQualifiedName, boolean isTerm) {
Map<String, Object> boolFilter = new HashMap<>();
List<Map<String, Object>> mustArray = new ArrayList<>();
mustArray.add(mapOf("term", mapOf("__state", "ACTIVE")));
if(StringUtils.isNotEmpty(glossaryQualifiedName)) {
String typeName = isTerm ? ATLAS_GLOSSARY_TERM_TYPENAME : ATLAS_GLOSSARY_CATEGORY_TYPENAME;
mustArray.add(mapOf("term", mapOf("__typeName.keyword", typeName)));
mustArray.add(mapOf("term", mapOf("__glossary", glossaryQualifiedName)));
String parentAttribute = isTerm ? "__categories" : "__parentCategory";
if(StringUtils.isEmpty(parentQualifiedName)) {
boolFilter.put("must_not", Arrays.asList(mapOf("exists", mapOf("field", parentAttribute))));
}
else {
mustArray.add(mapOf("term", mapOf(parentAttribute, parentQualifiedName)));
}
} else{
mustArray.add(mapOf("terms", mapOf("__typeName.keyword", Arrays.asList("AtlasGlossary"))));
}

boolFilter.put("must", mustArray);

return boolFilter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -70,9 +71,7 @@
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*;
import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.UPDATE_ENTITY_MEANINGS_ON_TERM_UPDATE;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;
import static org.apache.atlas.type.Constants.CATEGORIES_PARENT_PROPERTY_KEY;
import static org.apache.atlas.type.Constants.CATEGORIES_PROPERTY_KEY;
import static org.apache.atlas.type.Constants.GLOSSARY_PROPERTY_KEY;
import static org.apache.atlas.type.Constants.*;

public class CategoryPreProcessor extends AbstractGlossaryPreProcessor {
private static final Logger LOG = LoggerFactory.getLogger(CategoryPreProcessor.class);
Expand Down Expand Up @@ -117,6 +116,7 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co
private void processCreateCategory(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processCreateCategory");
String catName = (String) entity.getAttribute(NAME);
String parentQname = null;

if (StringUtils.isEmpty(catName) || isNameInvalid(catName)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME);
Expand All @@ -126,6 +126,16 @@ private void processCreateCategory(AtlasEntity entity, AtlasVertex vertex) throw
categoryExists(catName, glossaryQualifiedName);
validateParent(glossaryQualifiedName);

if (parentCategory != null) {
parentQname = (String) parentCategory.getAttribute(QUALIFIED_NAME);
}
String lexicographicalSortOrder = (String) entity.getAttribute(LEXICOGRAPHICAL_SORT_ORDER);
if(StringUtils.isEmpty(lexicographicalSortOrder)){
assignNewLexicographicalSortOrder(entity,glossaryQualifiedName, parentQname, this.discovery);
} else {
isValidLexoRank(entity.getTypeName(), lexicographicalSortOrder, glossaryQualifiedName, parentQname, this.discovery);
}

entity.setAttribute(QUALIFIED_NAME, createQualifiedName(vertex));
AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, new AtlasEntityHeader(entity)),
"create entity: type=", entity.getTypeName());
Expand All @@ -151,6 +161,17 @@ private void processUpdateCategory(AtlasEntity entity, AtlasVertex vertex) throw

String newGlossaryQualifiedName = (String) anchor.getAttribute(QUALIFIED_NAME);

String lexicographicalSortOrder = (String) entity.getAttribute(LEXICOGRAPHICAL_SORT_ORDER);
String parentQname = "";
if(Objects.nonNull(parentCategory)) {
parentQname = (String) parentCategory.getAttribute(QUALIFIED_NAME);
}
if(StringUtils.isNotEmpty(lexicographicalSortOrder)) {
isValidLexoRank(entity.getTypeName(), lexicographicalSortOrder, newGlossaryQualifiedName, parentQname, this.discovery);
} else {
entity.removeAttribute(LEXICOGRAPHICAL_SORT_ORDER);
}

if (!currentGlossaryQualifiedName.equals(newGlossaryQualifiedName)){
//Auth check
isAuthorized(currentGlossaryHeader, anchor);
Expand Down Expand Up @@ -489,4 +510,5 @@ private String createQualifiedName(AtlasVertex vertex) {

return getUUID() + "@" + anchor.getAttribute(QUALIFIED_NAME);
}

}
Loading
Loading