Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(openapi-v3): generic entities scroll #11564

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.metadata.utils.metrics.MetricUtils;
import io.datahubproject.metadata.context.OperationContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -213,7 +214,7 @@ public SearchResult searchAcrossEntities(
* @return some entities to search
*/
public List<String> getEntitiesToSearch(
@Nonnull OperationContext opContext, @Nonnull List<String> inputEntities, int size) {
@Nonnull OperationContext opContext, @Nonnull Collection<String> inputEntities, int size) {
List<String> nonEmptyEntities;
List<String> lowercaseEntities =
inputEntities.stream().map(String::toLowerCase).collect(Collectors.toList());
Expand Down Expand Up @@ -247,7 +248,7 @@ public List<String> getEntitiesToSearch(
@Nonnull
public ScrollResult scrollAcrossEntities(
@Nonnull OperationContext opContext,
@Nonnull List<String> entities,
@Nonnull Collection<String> entities,
@Nonnull String input,
@Nullable Filter postFilters,
List<SortCriterion> sortCriteria,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.datahubproject.openapi.v3.models;

import java.util.Set;
import javax.annotation.Nullable;
import lombok.Builder;
import lombok.Data;
import lombok.extern.jackson.Jacksonized;

@Data
@Jacksonized
@Builder
public class GenericEntityAspectsBodyV3 {
@Nullable private Set<String> entities;
@Nullable private Set<String> aspects;
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public abstract class GenericEntitiesController<
* @param aspectNames the aspect names present
* @param withSystemMetadata whether to include system metadata in the result
* @param scrollId the pagination token
* @param expandEmpty whether to expand an empty aspects list to all aspects
* @return result containing entities/aspects
* @throws URISyntaxException parsing error
*/
Expand All @@ -105,14 +106,16 @@ protected abstract S buildScrollResult(
SearchEntityArray searchEntities,
Set<String> aspectNames,
boolean withSystemMetadata,
@Nullable String scrollId)
@Nullable String scrollId,
boolean expandEmpty)
throws URISyntaxException;

protected List<E> buildEntityList(
@Nonnull OperationContext opContext,
List<Urn> urns,
Set<String> aspectNames,
boolean withSystemMetadata)
@Nullable Set<String> aspectNames,
boolean withSystemMetadata,
boolean expandEmpty)
throws URISyntaxException {

LinkedHashMap<Urn, Map<String, Long>> versionMap =
Expand All @@ -122,7 +125,7 @@ protected List<E> buildEntityList(
urn ->
Map.entry(
urn,
aspectNames.stream()
Optional.ofNullable(aspectNames).orElse(Set.of()).stream()
.map(aspectName -> Map.entry(aspectName, 0L))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))))
.collect(
Expand All @@ -133,14 +136,30 @@ protected List<E> buildEntityList(
throw new IllegalStateException("Duplicate key");
},
LinkedHashMap::new)),
0L);
return buildEntityVersionedAspectList(opContext, versionMap, withSystemMetadata);
0L,
expandEmpty);

return buildEntityVersionedAspectList(
opContext, urns, versionMap, withSystemMetadata, expandEmpty);
}

/**
* Build a list of entities for an API response
*
* @param opContext the operation context
* @param requestedUrns list of urns requested
* @param fetchUrnAspectVersions the map of urn to aspect name and version to fetch
* @param withSystemMetadata whether to include system metadata in the response entity
* @param expandEmpty whether to expand an empty aspects list to all aspects
* @return entity responses
* @throws URISyntaxException urn parsing error
*/
protected abstract List<E> buildEntityVersionedAspectList(
@Nonnull OperationContext opContext,
LinkedHashMap<Urn, Map<String, Long>> urnAspectVersions,
boolean withSystemMetadata)
Collection<Urn> requestedUrns,
LinkedHashMap<Urn, Map<String, Long>> fetchUrnAspectVersions,
boolean withSystemMetadata,
boolean expandEmpty)
throws URISyntaxException;

protected abstract List<E> buildEntityList(
Expand Down Expand Up @@ -225,13 +244,17 @@ public ResponseEntity<S> getEntities(
authentication.getActor().toUrnStr() + " is unauthorized to " + READ + " entities.");
}

Set<String> mergedAspects =
ImmutableSet.<String>builder().addAll(aspects1).addAll(aspects2).build();

return ResponseEntity.ok(
buildScrollResult(
opContext,
result.getEntities(),
ImmutableSet.<String>builder().addAll(aspects1).addAll(aspects2).build(),
mergedAspects,
withSystemMetadata,
result.getScrollId()));
result.getScrollId(),
true));
}

@Tag(name = "Generic Entities")
Expand Down Expand Up @@ -269,7 +292,8 @@ public ResponseEntity<E> getEntity(
opContext,
List.of(urn),
ImmutableSet.<String>builder().addAll(aspects1).addAll(aspects2).build(),
withSystemMetadata)
withSystemMetadata,
true)
.stream()
.findFirst()
.map(ResponseEntity::ok)
Expand Down Expand Up @@ -344,13 +368,16 @@ public ResponseEntity<A> getAspect(

final List<E> resultList;
if (version == 0) {
resultList = buildEntityList(opContext, List.of(urn), Set.of(aspectName), withSystemMetadata);
resultList =
buildEntityList(opContext, List.of(urn), Set.of(aspectName), withSystemMetadata, true);
} else {
resultList =
buildEntityVersionedAspectList(
opContext,
List.of(urn),
new LinkedHashMap<>(Map.of(urn, Map.of(aspectName, version))),
withSystemMetadata);
withSystemMetadata,
true);
}

return resultList.stream()
Expand Down Expand Up @@ -395,9 +422,10 @@ public ResponseEntity<Object> headAspect(
authentication.getActor().toUrnStr() + " is unauthorized to " + EXISTS + " entities.");
}

return exists(opContext, urn, lookupAspectSpec(urn, aspectName).getName(), includeSoftDelete)
? ResponseEntity.noContent().build()
: ResponseEntity.notFound().build();
return lookupAspectSpec(urn, aspectName)
.filter(aspectSpec -> exists(opContext, urn, aspectSpec.getName(), includeSoftDelete))
.map(aspectSpec -> ResponseEntity.noContent().build())
.orElse(ResponseEntity.notFound().build());
}

@Tag(name = "Generic Entities")
Expand Down Expand Up @@ -443,7 +471,7 @@ public void deleteEntity(
entityService.deleteUrn(opContext, urn);
} else {
aspects.stream()
.map(aspectName -> lookupAspectSpec(urn, aspectName).getName())
.map(aspectName -> lookupAspectSpec(urn, aspectName).get().getName())
.forEach(
aspectName ->
entityService.deleteAspect(opContext, entityUrn, aspectName, Map.of(), true));
Expand Down Expand Up @@ -515,8 +543,11 @@ public void deleteAspect(
authentication.getActor().toUrnStr() + " is unauthorized to " + DELETE + " entities.");
}

entityService.deleteAspect(
opContext, entityUrn, lookupAspectSpec(urn, aspectName).getName(), Map.of(), true);
lookupAspectSpec(urn, aspectName)
.ifPresent(
aspectSpec ->
entityService.deleteAspect(
opContext, entityUrn, aspectSpec.getName(), Map.of(), true));
}

@Tag(name = "Generic Aspects")
Expand Down Expand Up @@ -554,7 +585,7 @@ public ResponseEntity<E> createAspect(
authentication.getActor().toUrnStr() + " is unauthorized to " + CREATE + " entities.");
}

AspectSpec aspectSpec = lookupAspectSpec(entitySpec, aspectName);
AspectSpec aspectSpec = lookupAspectSpec(entitySpec, aspectName).get();
ChangeMCP upsert =
toUpsertItem(
opContext.getRetrieverContext().get().getAspectRetriever(),
Expand Down Expand Up @@ -618,7 +649,7 @@ public ResponseEntity<E> patchAspect(
authentication.getActor().toUrnStr() + " is unauthorized to " + UPDATE + " entities.");
}

AspectSpec aspectSpec = lookupAspectSpec(entitySpec, aspectName);
AspectSpec aspectSpec = lookupAspectSpec(entitySpec, aspectName).get();
RecordTemplate currentValue = entityService.getAspect(opContext, urn, aspectSpec.getName(), 0);

GenericPatchTemplate<? extends RecordTemplate> genericPatchTemplate =
Expand Down Expand Up @@ -672,15 +703,18 @@ protected Boolean exists(
*
* @param requestedAspectNames requested aspects
* @param <T> map values
* @param expandEmpty whether to expand empty aspect names to all aspect names
* @return updated map
*/
protected <T> LinkedHashMap<Urn, Map<String, T>> resolveAspectNames(
LinkedHashMap<Urn, Map<String, T>> requestedAspectNames, @Nonnull T defaultValue) {
LinkedHashMap<Urn, Map<String, T>> requestedAspectNames,
@Nonnull T defaultValue,
boolean expandEmpty) {
return requestedAspectNames.entrySet().stream()
.map(
entry -> {
final Urn urn = entry.getKey();
if (entry.getValue().isEmpty() || entry.getValue().containsKey("")) {
if (expandEmpty && (entry.getValue().isEmpty() || entry.getValue().containsKey(""))) {
// All aspects specified
Set<String> allNames =
entityRegistry.getEntitySpec(urn.getEntityType()).getAspectSpecs().stream()
Expand All @@ -694,15 +728,16 @@ protected <T> LinkedHashMap<Urn, Map<String, T>> resolveAspectNames(
Map.entry(
aspectName, entry.getValue().getOrDefault("", defaultValue)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
} else {
} else if (!entry.getValue().keySet().isEmpty()) {
final Map<String, String> normalizedNames =
entry.getValue().keySet().stream()
.map(
requestAspectName ->
Map.entry(
requestAspectName,
lookupAspectSpec(urn, requestAspectName).getName()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
requestAspectName, lookupAspectSpec(urn, requestAspectName)))
.filter(aspectSpecEntry -> aspectSpecEntry.getValue().isPresent())
.collect(
Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get().getName()));
return Map.entry(
urn,
entry.getValue().entrySet().stream()
Expand All @@ -712,8 +747,11 @@ protected <T> LinkedHashMap<Urn, Map<String, T>> resolveAspectNames(
Map.entry(
normalizedNames.get(reqEntry.getKey()), reqEntry.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
} else {
return (Map.Entry<Urn, Map<String, T>>) null;
}
})
.filter(Objects::nonNull)
.collect(
Collectors.toMap(
Map.Entry::getKey,
Expand All @@ -732,12 +770,12 @@ protected Map<String, Pair<RecordTemplate, SystemMetadata>> toAspectMap(
Map.entry(
a.getName(),
Pair.of(
toRecordTemplate(lookupAspectSpec(urn, a.getName()), a),
toRecordTemplate(lookupAspectSpec(urn, a.getName()).get(), a),
withSystemMetadata ? a.getSystemMetadata() : null)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

protected AspectSpec lookupAspectSpec(Urn urn, String aspectName) {
protected Optional<AspectSpec> lookupAspectSpec(Urn urn, String aspectName) {
return lookupAspectSpec(entityRegistry.getEntitySpec(urn.getEntityType()), aspectName);
}

Expand Down Expand Up @@ -777,13 +815,16 @@ protected ChangeMCP toUpsertItem(
*
* @return
*/
protected static AspectSpec lookupAspectSpec(EntitySpec entitySpec, String aspectName) {
protected static Optional<AspectSpec> lookupAspectSpec(EntitySpec entitySpec, String aspectName) {
if (entitySpec == null) {
return Optional.empty();
}

return entitySpec.getAspectSpec(aspectName) != null
? entitySpec.getAspectSpec(aspectName)
? Optional.of(entitySpec.getAspectSpec(aspectName))
: entitySpec.getAspectSpecs().stream()
.filter(aspec -> aspec.getName().toLowerCase().equals(aspectName))
.findFirst()
.get();
.findFirst();
}

protected static Urn validatedUrn(String urn) throws InvalidUrnException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -114,7 +115,8 @@ public ResponseEntity<BatchGetUrnResponseV2<GenericAspectV2, GenericEntityV2>> g
opContext,
urns,
new HashSet<>(request.getAspectNames()),
request.isWithSystemMetadata())))
request.isWithSystemMetadata(),
true)))
.build()));
}

Expand All @@ -124,10 +126,12 @@ public GenericEntityScrollResultV2 buildScrollResult(
SearchEntityArray searchEntities,
Set<String> aspectNames,
boolean withSystemMetadata,
@Nullable String scrollId)
@Nullable String scrollId,
boolean expandEmpty)
throws URISyntaxException {
return GenericEntityScrollResultV2.builder()
.results(toRecordTemplates(opContext, searchEntities, aspectNames, withSystemMetadata))
.results(
toRecordTemplates(opContext, searchEntities, aspectNames, withSystemMetadata, true))
.scrollId(scrollId)
.build();
}
Expand Down Expand Up @@ -155,7 +159,7 @@ protected AspectsBatch toMCPBatch(
while (aspectItr.hasNext()) {
Map.Entry<String, JsonNode> aspect = aspectItr.next();

AspectSpec aspectSpec = lookupAspectSpec(entityUrn, aspect.getKey());
AspectSpec aspectSpec = lookupAspectSpec(entityUrn, aspect.getKey()).get();

if (aspectSpec != null) {
ChangeItemImpl.ChangeItemImplBuilder builder =
Expand Down Expand Up @@ -192,12 +196,14 @@ protected AspectsBatch toMCPBatch(
@Override
protected List<GenericEntityV2> buildEntityVersionedAspectList(
@Nonnull OperationContext opContext,
Collection<Urn> requestedUrns,
LinkedHashMap<Urn, Map<String, Long>> urnAspectVersions,
boolean withSystemMetadata)
boolean withSystemMetadata,
boolean expandEmpty)
throws URISyntaxException {
Map<Urn, List<EnvelopedAspect>> aspects =
entityService.getEnvelopedVersionedAspects(
opContext, resolveAspectNames(urnAspectVersions, 0L), true);
opContext, resolveAspectNames(urnAspectVersions, 0L, true), true);

return urnAspectVersions.keySet().stream()
.map(
Expand Down Expand Up @@ -230,13 +236,15 @@ private List<GenericEntityV2> toRecordTemplates(
@Nonnull OperationContext opContext,
SearchEntityArray searchEntities,
Set<String> aspectNames,
boolean withSystemMetadata)
boolean withSystemMetadata,
boolean expandEmpty)
throws URISyntaxException {
return buildEntityList(
opContext,
searchEntities.stream().map(SearchEntity::getEntity).collect(Collectors.toList()),
aspectNames,
withSystemMetadata);
withSystemMetadata,
true);
}

@Override
Expand Down
Loading
Loading