Skip to content

Commit

Permalink
feat(openapi-v3): generic entities scroll
Browse files Browse the repository at this point in the history
* added generic cross-entity scroll endpoint
  • Loading branch information
david-leifker committed Oct 8, 2024
1 parent 6fd36ef commit 8362e9d
Show file tree
Hide file tree
Showing 6 changed files with 492 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.metadata.utils.metrics.MetricUtils;
import io.datahubproject.metadata.context.OperationContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -213,7 +214,7 @@ public SearchResult searchAcrossEntities(
* @return some entities to search
*/
public List<String> getEntitiesToSearch(
@Nonnull OperationContext opContext, @Nonnull List<String> inputEntities, int size) {
@Nonnull OperationContext opContext, @Nonnull Collection<String> inputEntities, int size) {
List<String> nonEmptyEntities;
List<String> lowercaseEntities =
inputEntities.stream().map(String::toLowerCase).collect(Collectors.toList());
Expand Down Expand Up @@ -247,7 +248,7 @@ public List<String> getEntitiesToSearch(
@Nonnull
public ScrollResult scrollAcrossEntities(
@Nonnull OperationContext opContext,
@Nonnull List<String> entities,
@Nonnull Collection<String> entities,
@Nonnull String input,
@Nullable Filter postFilters,
List<SortCriterion> sortCriteria,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.datahubproject.openapi.v3.models;

import java.util.Set;
import javax.annotation.Nullable;
import lombok.Builder;
import lombok.Data;
import lombok.extern.jackson.Jacksonized;

@Data
@Jacksonized
@Builder
public class GenericEntityAspectsBodyV3 {
@Nullable private Set<String> entities;
@Nullable private Set<String> aspects;
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public abstract class GenericEntitiesController<
* @param aspectNames the aspect names present
* @param withSystemMetadata whether to include system metadata in the result
* @param scrollId the pagination token
* @param expandEmpty whether to expand an empty aspects list to all entities
* @return result containing entities/aspects
* @throws URISyntaxException parsing error
*/
Expand All @@ -105,14 +106,16 @@ protected abstract S buildScrollResult(
SearchEntityArray searchEntities,
Set<String> aspectNames,
boolean withSystemMetadata,
@Nullable String scrollId)
@Nullable String scrollId,
boolean expandEmpty)
throws URISyntaxException;

protected List<E> buildEntityList(
@Nonnull OperationContext opContext,
List<Urn> urns,
Set<String> aspectNames,
boolean withSystemMetadata)
@Nullable Set<String> aspectNames,
boolean withSystemMetadata,
boolean expandEmpty)
throws URISyntaxException {

LinkedHashMap<Urn, Map<String, Long>> versionMap =
Expand All @@ -122,7 +125,7 @@ protected List<E> buildEntityList(
urn ->
Map.entry(
urn,
aspectNames.stream()
Optional.ofNullable(aspectNames).orElse(Set.of()).stream()
.map(aspectName -> Map.entry(aspectName, 0L))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))))
.collect(
Expand All @@ -133,14 +136,30 @@ protected List<E> buildEntityList(
throw new IllegalStateException("Duplicate key");
},
LinkedHashMap::new)),
0L);
return buildEntityVersionedAspectList(opContext, versionMap, withSystemMetadata);
0L,
expandEmpty);

return buildEntityVersionedAspectList(
opContext, urns, versionMap, withSystemMetadata, expandEmpty);
}

/**
* Build a list of entities for an API response
*
* @param opContext the operation context
* @param requestedUrns list of urns requested
* @param fetchUrnAspectVersions the map of urn to aspect name and version to fetch
* @param withSystemMetadata whether to include system metadata in the response entity
* @param expandEmpty whether to expand an empty aspects list to all aspects
* @return entity responses
* @throws URISyntaxException urn parsing error
*/
protected abstract List<E> buildEntityVersionedAspectList(
@Nonnull OperationContext opContext,
LinkedHashMap<Urn, Map<String, Long>> urnAspectVersions,
boolean withSystemMetadata)
Collection<Urn> requestedUrns,
LinkedHashMap<Urn, Map<String, Long>> fetchUrnAspectVersions,
boolean withSystemMetadata,
boolean expandEmpty)
throws URISyntaxException;

protected abstract List<E> buildEntityList(
Expand Down Expand Up @@ -225,13 +244,17 @@ public ResponseEntity<S> getEntities(
authentication.getActor().toUrnStr() + " is unauthorized to " + READ + " entities.");
}

Set<String> mergedAspects =
ImmutableSet.<String>builder().addAll(aspects1).addAll(aspects2).build();

return ResponseEntity.ok(
buildScrollResult(
opContext,
result.getEntities(),
ImmutableSet.<String>builder().addAll(aspects1).addAll(aspects2).build(),
mergedAspects,
withSystemMetadata,
result.getScrollId()));
result.getScrollId(),
true));
}

@Tag(name = "Generic Entities")
Expand Down Expand Up @@ -269,7 +292,8 @@ public ResponseEntity<E> getEntity(
opContext,
List.of(urn),
ImmutableSet.<String>builder().addAll(aspects1).addAll(aspects2).build(),
withSystemMetadata)
withSystemMetadata,
true)
.stream()
.findFirst()
.map(ResponseEntity::ok)
Expand Down Expand Up @@ -344,13 +368,16 @@ public ResponseEntity<A> getAspect(

final List<E> resultList;
if (version == 0) {
resultList = buildEntityList(opContext, List.of(urn), Set.of(aspectName), withSystemMetadata);
resultList =
buildEntityList(opContext, List.of(urn), Set.of(aspectName), withSystemMetadata, true);
} else {
resultList =
buildEntityVersionedAspectList(
opContext,
List.of(urn),
new LinkedHashMap<>(Map.of(urn, Map.of(aspectName, version))),
withSystemMetadata);
withSystemMetadata,
true);
}

return resultList.stream()
Expand Down Expand Up @@ -395,9 +422,10 @@ public ResponseEntity<Object> headAspect(
authentication.getActor().toUrnStr() + " is unauthorized to " + EXISTS + " entities.");
}

return exists(opContext, urn, lookupAspectSpec(urn, aspectName).getName(), includeSoftDelete)
? ResponseEntity.noContent().build()
: ResponseEntity.notFound().build();
return lookupAspectSpec(urn, aspectName)
.filter(aspectSpec -> exists(opContext, urn, aspectSpec.getName(), includeSoftDelete))
.map(aspectSpec -> ResponseEntity.noContent().build())
.orElse(ResponseEntity.notFound().build());
}

@Tag(name = "Generic Entities")
Expand Down Expand Up @@ -443,7 +471,7 @@ public void deleteEntity(
entityService.deleteUrn(opContext, urn);
} else {
aspects.stream()
.map(aspectName -> lookupAspectSpec(urn, aspectName).getName())
.map(aspectName -> lookupAspectSpec(urn, aspectName).get().getName())
.forEach(
aspectName ->
entityService.deleteAspect(opContext, entityUrn, aspectName, Map.of(), true));
Expand Down Expand Up @@ -515,8 +543,11 @@ public void deleteAspect(
authentication.getActor().toUrnStr() + " is unauthorized to " + DELETE + " entities.");
}

entityService.deleteAspect(
opContext, entityUrn, lookupAspectSpec(urn, aspectName).getName(), Map.of(), true);
lookupAspectSpec(urn, aspectName)
.ifPresent(
aspectSpec ->
entityService.deleteAspect(
opContext, entityUrn, aspectSpec.getName(), Map.of(), true));
}

@Tag(name = "Generic Aspects")
Expand Down Expand Up @@ -554,7 +585,7 @@ public ResponseEntity<E> createAspect(
authentication.getActor().toUrnStr() + " is unauthorized to " + CREATE + " entities.");
}

AspectSpec aspectSpec = lookupAspectSpec(entitySpec, aspectName);
AspectSpec aspectSpec = lookupAspectSpec(entitySpec, aspectName).get();
ChangeMCP upsert =
toUpsertItem(
opContext.getRetrieverContext().get().getAspectRetriever(),
Expand Down Expand Up @@ -618,7 +649,7 @@ public ResponseEntity<E> patchAspect(
authentication.getActor().toUrnStr() + " is unauthorized to " + UPDATE + " entities.");
}

AspectSpec aspectSpec = lookupAspectSpec(entitySpec, aspectName);
AspectSpec aspectSpec = lookupAspectSpec(entitySpec, aspectName).get();
RecordTemplate currentValue = entityService.getAspect(opContext, urn, aspectSpec.getName(), 0);

GenericPatchTemplate<? extends RecordTemplate> genericPatchTemplate =
Expand Down Expand Up @@ -672,15 +703,18 @@ protected Boolean exists(
*
* @param requestedAspectNames requested aspects
* @param <T> map values
* @param expandEmpty whether to expand empty aspect names to all aspect names
* @return updated map
*/
protected <T> LinkedHashMap<Urn, Map<String, T>> resolveAspectNames(
LinkedHashMap<Urn, Map<String, T>> requestedAspectNames, @Nonnull T defaultValue) {
LinkedHashMap<Urn, Map<String, T>> requestedAspectNames,
@Nonnull T defaultValue,
boolean expandEmpty) {
return requestedAspectNames.entrySet().stream()
.map(
entry -> {
final Urn urn = entry.getKey();
if (entry.getValue().isEmpty() || entry.getValue().containsKey("")) {
if (expandEmpty && (entry.getValue().isEmpty() || entry.getValue().containsKey(""))) {
// All aspects specified
Set<String> allNames =
entityRegistry.getEntitySpec(urn.getEntityType()).getAspectSpecs().stream()
Expand All @@ -694,15 +728,16 @@ protected <T> LinkedHashMap<Urn, Map<String, T>> resolveAspectNames(
Map.entry(
aspectName, entry.getValue().getOrDefault("", defaultValue)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
} else {
} else if (!entry.getValue().keySet().isEmpty()) {
final Map<String, String> normalizedNames =
entry.getValue().keySet().stream()
.map(
requestAspectName ->
Map.entry(
requestAspectName,
lookupAspectSpec(urn, requestAspectName).getName()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
requestAspectName, lookupAspectSpec(urn, requestAspectName)))
.filter(aspectSpecEntry -> aspectSpecEntry.getValue().isPresent())
.collect(
Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get().getName()));
return Map.entry(
urn,
entry.getValue().entrySet().stream()
Expand All @@ -712,8 +747,11 @@ protected <T> LinkedHashMap<Urn, Map<String, T>> resolveAspectNames(
Map.entry(
normalizedNames.get(reqEntry.getKey()), reqEntry.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
} else {
return (Map.Entry<Urn, Map<String, T>>) null;
}
})
.filter(Objects::nonNull)
.collect(
Collectors.toMap(
Map.Entry::getKey,
Expand All @@ -732,12 +770,12 @@ protected Map<String, Pair<RecordTemplate, SystemMetadata>> toAspectMap(
Map.entry(
a.getName(),
Pair.of(
toRecordTemplate(lookupAspectSpec(urn, a.getName()), a),
toRecordTemplate(lookupAspectSpec(urn, a.getName()).get(), a),
withSystemMetadata ? a.getSystemMetadata() : null)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

protected AspectSpec lookupAspectSpec(Urn urn, String aspectName) {
protected Optional<AspectSpec> lookupAspectSpec(Urn urn, String aspectName) {
return lookupAspectSpec(entityRegistry.getEntitySpec(urn.getEntityType()), aspectName);
}

Expand Down Expand Up @@ -777,13 +815,16 @@ protected ChangeMCP toUpsertItem(
*
* @return
*/
protected static AspectSpec lookupAspectSpec(EntitySpec entitySpec, String aspectName) {
protected static Optional<AspectSpec> lookupAspectSpec(EntitySpec entitySpec, String aspectName) {
if (entitySpec == null) {
return Optional.empty();
}

return entitySpec.getAspectSpec(aspectName) != null
? entitySpec.getAspectSpec(aspectName)
? Optional.of(entitySpec.getAspectSpec(aspectName))
: entitySpec.getAspectSpecs().stream()
.filter(aspec -> aspec.getName().toLowerCase().equals(aspectName))
.findFirst()
.get();
.findFirst();
}

protected static Urn validatedUrn(String urn) throws InvalidUrnException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -114,7 +115,8 @@ public ResponseEntity<BatchGetUrnResponseV2<GenericAspectV2, GenericEntityV2>> g
opContext,
urns,
new HashSet<>(request.getAspectNames()),
request.isWithSystemMetadata())))
request.isWithSystemMetadata(),
true)))
.build()));
}

Expand All @@ -124,10 +126,12 @@ public GenericEntityScrollResultV2 buildScrollResult(
SearchEntityArray searchEntities,
Set<String> aspectNames,
boolean withSystemMetadata,
@Nullable String scrollId)
@Nullable String scrollId,
boolean expandEmpty)
throws URISyntaxException {
return GenericEntityScrollResultV2.builder()
.results(toRecordTemplates(opContext, searchEntities, aspectNames, withSystemMetadata))
.results(
toRecordTemplates(opContext, searchEntities, aspectNames, withSystemMetadata, true))
.scrollId(scrollId)
.build();
}
Expand Down Expand Up @@ -155,7 +159,7 @@ protected AspectsBatch toMCPBatch(
while (aspectItr.hasNext()) {
Map.Entry<String, JsonNode> aspect = aspectItr.next();

AspectSpec aspectSpec = lookupAspectSpec(entityUrn, aspect.getKey());
AspectSpec aspectSpec = lookupAspectSpec(entityUrn, aspect.getKey()).get();

if (aspectSpec != null) {
ChangeItemImpl.ChangeItemImplBuilder builder =
Expand Down Expand Up @@ -192,12 +196,14 @@ protected AspectsBatch toMCPBatch(
@Override
protected List<GenericEntityV2> buildEntityVersionedAspectList(
@Nonnull OperationContext opContext,
Collection<Urn> requestedUrns,
LinkedHashMap<Urn, Map<String, Long>> urnAspectVersions,
boolean withSystemMetadata)
boolean withSystemMetadata,
boolean expandEmpty)
throws URISyntaxException {
Map<Urn, List<EnvelopedAspect>> aspects =
entityService.getEnvelopedVersionedAspects(
opContext, resolveAspectNames(urnAspectVersions, 0L), true);
opContext, resolveAspectNames(urnAspectVersions, 0L, true), true);

return urnAspectVersions.keySet().stream()
.map(
Expand Down Expand Up @@ -230,13 +236,15 @@ private List<GenericEntityV2> toRecordTemplates(
@Nonnull OperationContext opContext,
SearchEntityArray searchEntities,
Set<String> aspectNames,
boolean withSystemMetadata)
boolean withSystemMetadata,
boolean expandEmpty)
throws URISyntaxException {
return buildEntityList(
opContext,
searchEntities.stream().map(SearchEntity::getEntity).collect(Collectors.toList()),
aspectNames,
withSystemMetadata);
withSystemMetadata,
true);
}

@Override
Expand Down
Loading

0 comments on commit 8362e9d

Please sign in to comment.