Skip to content

Commit

Permalink
feat(graphql/lineage): Support including ghost entities (#11510)
Browse files Browse the repository at this point in the history
Co-authored-by: david-leifker <[email protected]>
  • Loading branch information
asikowitz and david-leifker authored Oct 4, 2024
1 parent 1856200 commit 81151db
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import graphql.schema.DataFetchingEnvironment;
import io.datahubproject.metadata.services.RestrictedService;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -60,14 +61,16 @@ public CompletableFuture<EntityLineageResult> get(DataFetchingEnvironment enviro
final LineageInput input = bindArgument(environment.getArgument("input"), LineageInput.class);

final LineageDirection lineageDirection = input.getDirection();
@Nullable final Integer start = input.getStart(); // Optional!
@Nullable final Integer count = input.getCount(); // Optional!
@Nullable final Boolean separateSiblings = input.getSeparateSiblings(); // Optional!
@Nullable final Long startTimeMillis = input.getStartTimeMillis(); // Optional!
// All inputs are optional
@Nullable final Integer start = input.getStart();
@Nullable final Integer count = input.getCount();
@Nullable final Boolean separateSiblings = input.getSeparateSiblings();
@Nullable final Long startTimeMillis = input.getStartTimeMillis();
@Nullable
final Long endTimeMillis =
ResolverUtils.getLineageEndTimeMillis(
input.getStartTimeMillis(), input.getEndTimeMillis()); // Optional!
ResolverUtils.getLineageEndTimeMillis(input.getStartTimeMillis(), input.getEndTimeMillis());
final Boolean includeGhostEntities =
Optional.ofNullable(input.getIncludeGhostEntities()).orElse(false);

com.linkedin.metadata.graph.LineageDirection resolvedDirection =
com.linkedin.metadata.graph.LineageDirection.valueOf(lineageDirection.toString());
Expand All @@ -80,6 +83,8 @@ public CompletableFuture<EntityLineageResult> get(DataFetchingEnvironment enviro
_siblingGraphService.getLineage(
context
.getOperationContext()
.withSearchFlags(
searchFlags -> searchFlags.setIncludeSoftDeleted(includeGhostEntities))
.withLineageFlags(
flags ->
flags
Expand All @@ -91,6 +96,7 @@ public CompletableFuture<EntityLineageResult> get(DataFetchingEnvironment enviro
count != null ? count : 100,
1,
separateSiblings != null ? input.getSeparateSiblings() : false,
input.getIncludeGhostEntities(),
new HashSet<>());

Set<Urn> restrictedUrns = new HashSet<>();
Expand Down
5 changes: 5 additions & 0 deletions datahub-graphql-core/src/main/resources/entity.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,11 @@ input LineageInput {
An optional ending time to filter on
"""
endTimeMillis: Long

"""
If enabled, include entities that do not exist or are soft deleted.
"""
includeGhostEntities: Boolean = false
}

"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ public static LineageSearchResult validateLineageSearchResult(
public static EntityLineageResult validateEntityLineageResult(
@Nonnull OperationContext opContext,
@Nullable final EntityLineageResult entityLineageResult,
@Nonnull final EntityService<?> entityService) {
@Nonnull final EntityService<?> entityService,
boolean includeGhostEntities) {
if (entityLineageResult == null) {
return null;
}
Expand All @@ -223,8 +224,8 @@ public static EntityLineageResult validateEntityLineageResult(
entityLineageResult.getRelationships(),
LineageRelationship::getEntity,
entityService,
true,
false)
!includeGhostEntities,
includeGhostEntities)
.collect(Collectors.toCollection(LineageRelationshipArray::new));

validatedEntityLineageResult.setFiltered(
Expand Down Expand Up @@ -280,6 +281,8 @@ private static <T> Stream<T> validateSearchUrns(
boolean includeSoftDeleted) {

if (enforceSQLExistence) {
// TODO: Always set includeSoftDeleted to true once 0.3.7 OSS merge occurs, as soft deleted
// results will be filtered by graph service
Set<Urn> existingUrns =
entityService.exists(
opContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,29 @@ public EntityLineageResult getLineage(
int offset,
int count,
int maxHops) {
return ValidationUtils.validateEntityLineageResult(
return getLineage(opContext, entityUrn, direction, offset, count, maxHops, false, false);
}

@Nonnull
public EntityLineageResult getLineage(
@Nonnull OperationContext opContext,
@Nonnull Urn entityUrn,
@Nonnull LineageDirection direction,
int offset,
int count,
int maxHops,
boolean separateSiblings,
boolean includeGhostEntities) {
return getLineage(
opContext,
getLineage(opContext, entityUrn, direction, offset, count, maxHops, false, new HashSet<>()),
_entityService);
entityUrn,
direction,
offset,
count,
maxHops,
separateSiblings,
includeGhostEntities,
new HashSet<>());
}

/**
Expand All @@ -60,12 +79,14 @@ public EntityLineageResult getLineage(
int count,
int maxHops,
boolean separateSiblings,
boolean includeGhostEntities,
@Nonnull Set<Urn> visitedUrns) {
if (separateSiblings) {
return ValidationUtils.validateEntityLineageResult(
opContext,
_graphService.getLineage(opContext, entityUrn, direction, offset, count, maxHops),
_entityService);
_entityService,
includeGhostEntities);
}

if (maxHops > 1) {
Expand All @@ -89,7 +110,7 @@ public EntityLineageResult getLineage(
// remove your siblings from your lineage
entityLineage =
filterLineageResultFromSiblings(
opContext, entityUrn, allSiblingsInGroup, entityLineage, null);
opContext, entityUrn, allSiblingsInGroup, entityLineage, null, includeGhostEntities);

// Update offset and count to fetch the correct number of edges from the next sibling node
offset = Math.max(0, offset - entityLineage.getTotal());
Expand All @@ -109,8 +130,17 @@ public EntityLineageResult getLineage(
siblingUrn,
allSiblingsInGroup,
getLineage(
opContext, siblingUrn, direction, offset, count, maxHops, false, visitedUrns),
entityLineage);
opContext,
siblingUrn,
direction,
offset,
count,
maxHops,
false,
includeGhostEntities,
visitedUrns),
entityLineage,
includeGhostEntities);

// Update offset and count to fetch the correct number of edges from the next sibling node
offset = Math.max(0, offset - nextEntityLineage.getTotal());
Expand All @@ -122,7 +152,8 @@ public EntityLineageResult getLineage(
;
}

return ValidationUtils.validateEntityLineageResult(opContext, entityLineage, _entityService);
return ValidationUtils.validateEntityLineageResult(
opContext, entityLineage, _entityService, includeGhostEntities);
}

private int getFiltered(@Nullable EntityLineageResult entityLineageResult) {
Expand All @@ -138,7 +169,8 @@ private EntityLineageResult filterLineageResultFromSiblings(
@Nonnull final Urn urn,
@Nonnull final Set<Urn> allSiblingsInGroup,
@Nonnull final EntityLineageResult entityLineageResult,
@Nullable final EntityLineageResult existingResult) {
@Nullable final EntityLineageResult existingResult,
boolean includeGhostEntities) {
int numFiltered = 0;

// 1) remove the source entities siblings from this entity's downstreams
Expand Down Expand Up @@ -231,6 +263,6 @@ private EntityLineageResult filterLineageResultFromSiblings(
combinedLineageResult.setFiltered(
numFiltered + getFiltered(existingResult) + getFiltered(entityLineageResult));
return ValidationUtils.validateEntityLineageResult(
opContext, combinedLineageResult, _entityService);
opContext, combinedLineageResult, _entityService, includeGhostEntities);
}
}
Loading

0 comments on commit 81151db

Please sign in to comment.