From 1d844bb331e9c5f52aa443afa271f26b8f1a0884 Mon Sep 17 00:00:00 2001 From: ksrinath Date: Thu, 22 Aug 2024 17:53:46 +0530 Subject: [PATCH 01/13] fix(gms): filter out runs of a dataJob without any run-events --- .../resolvers/jobs/DataJobRunsResolver.java | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/jobs/DataJobRunsResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/jobs/DataJobRunsResolver.java index 09039e530631d0..dc637a4028ee6e 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/jobs/DataJobRunsResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/jobs/DataJobRunsResolver.java @@ -1,12 +1,12 @@ package com.linkedin.datahub.graphql.resolvers.jobs; +import static com.linkedin.metadata.Constants.DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME; + import com.google.common.collect.ImmutableList; import com.linkedin.common.urn.Urn; import com.linkedin.datahub.graphql.QueryContext; import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils; -import com.linkedin.datahub.graphql.generated.DataProcessInstance; -import com.linkedin.datahub.graphql.generated.DataProcessInstanceResult; -import com.linkedin.datahub.graphql.generated.Entity; +import com.linkedin.datahub.graphql.generated.*; import com.linkedin.datahub.graphql.types.dataprocessinst.mappers.DataProcessInstanceMapper; import com.linkedin.entity.EntityResponse; import com.linkedin.entity.client.EntityClient; @@ -33,6 +33,8 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** GraphQL Resolver used for fetching a list of Task Runs associated with a Data Job */ public class DataJobRunsResolver @@ -40,6 +42,7 @@ public class DataJobRunsResolver private static final String PARENT_TEMPLATE_URN_SEARCH_INDEX_FIELD_NAME = "parentTemplate"; private static final String CREATED_TIME_SEARCH_INDEX_FIELD_NAME = "created"; + private static final Logger log = LoggerFactory.getLogger(DataJobRunsResolver.class); private final EntityClient _entityClient; @@ -93,6 +96,27 @@ public CompletableFuture get(DataFetchingEnvironment gmsResults.stream() .filter(Objects::nonNull) .map(p -> DataProcessInstanceMapper.map(context, p)) + .filter( + p -> { + try { + // check that at least one run-event exists + return !_entityClient + .getTimeseriesAspectValues( + context.getOperationContext(), + p.getUrn(), + "dataProcessInstance", + DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME, + null, + null, + 1, + null, + null) + .isEmpty(); + } catch (RemoteInvocationException e) { + log.error("Could not fetch runEvent aspect of {}", p.getUrn()); + return false; + } + }) .collect(Collectors.toList()); // Step 4: Package and return result From 827fee467dda010b502f095d036ea00a8b44aa87 Mon Sep 17 00:00:00 2001 From: ksrinath Date: Fri, 23 Aug 2024 16:47:12 +0530 Subject: [PATCH 02/13] perf improvement, comments on pagination behavior --- .../graphql/resolvers/jobs/DataJobRunsResolver.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/jobs/DataJobRunsResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/jobs/DataJobRunsResolver.java index dc637a4028ee6e..aec67e0c57fb46 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/jobs/DataJobRunsResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/jobs/DataJobRunsResolver.java @@ -95,7 +95,6 @@ public CompletableFuture get(DataFetchingEnvironment final List dataProcessInstances = gmsResults.stream() .filter(Objects::nonNull) - .map(p -> DataProcessInstanceMapper.map(context, p)) .filter( p -> { try { @@ -103,7 +102,7 @@ public CompletableFuture get(DataFetchingEnvironment return !_entityClient .getTimeseriesAspectValues( context.getOperationContext(), - p.getUrn(), + p.getUrn().toString(), "dataProcessInstance", DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME, null, @@ -117,12 +116,19 @@ public CompletableFuture get(DataFetchingEnvironment return false; } }) + .map(p -> DataProcessInstanceMapper.map(context, p)) .collect(Collectors.toList()); // Step 4: Package and return result final DataProcessInstanceResult result = new DataProcessInstanceResult(); + + /* Note: the number of dataProcessInstance records may be less than the "count" due to the above filtering. + Pagination on a UI would work correctly, except that some pages may have less than "count" records. + */ result.setCount(gmsResult.getPageSize()); result.setStart(gmsResult.getFrom()); + + // Note: "total" could be an inexact upper bound due to the above filtering. result.setTotal(gmsResult.getNumEntities()); result.setRuns(dataProcessInstances); return result; From 93b261f20c397beaca15113f865716fe73252f8b Mon Sep 17 00:00:00 2001 From: ksrinath Date: Wed, 28 Aug 2024 17:10:12 +0530 Subject: [PATCH 03/13] updated implementation, filtering on a new hasRunEvents field --- .../resolvers/jobs/DataJobRunsResolver.java | 35 ++----- .../hook/DataProcessInstanceRunEventHook.java | 91 +++++++++++++++++++ 2 files changed, 98 insertions(+), 28 deletions(-) create mode 100644 metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/DataProcessInstanceRunEventHook.java diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/jobs/DataJobRunsResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/jobs/DataJobRunsResolver.java index aec67e0c57fb46..4338d0e0066273 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/jobs/DataJobRunsResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/jobs/DataJobRunsResolver.java @@ -42,6 +42,7 @@ public class DataJobRunsResolver private static final String PARENT_TEMPLATE_URN_SEARCH_INDEX_FIELD_NAME = "parentTemplate"; private static final String CREATED_TIME_SEARCH_INDEX_FIELD_NAME = "created"; + private static final String HAS_RUN_EVENTS_FIELD_NAME = "hasRunEvents"; private static final Logger log = LoggerFactory.getLogger(DataJobRunsResolver.class); private final EntityClient _entityClient; @@ -95,40 +96,13 @@ public CompletableFuture get(DataFetchingEnvironment final List dataProcessInstances = gmsResults.stream() .filter(Objects::nonNull) - .filter( - p -> { - try { - // check that at least one run-event exists - return !_entityClient - .getTimeseriesAspectValues( - context.getOperationContext(), - p.getUrn().toString(), - "dataProcessInstance", - DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME, - null, - null, - 1, - null, - null) - .isEmpty(); - } catch (RemoteInvocationException e) { - log.error("Could not fetch runEvent aspect of {}", p.getUrn()); - return false; - } - }) .map(p -> DataProcessInstanceMapper.map(context, p)) .collect(Collectors.toList()); // Step 4: Package and return result final DataProcessInstanceResult result = new DataProcessInstanceResult(); - - /* Note: the number of dataProcessInstance records may be less than the "count" due to the above filtering. - Pagination on a UI would work correctly, except that some pages may have less than "count" records. - */ result.setCount(gmsResult.getPageSize()); result.setStart(gmsResult.getFrom()); - - // Note: "total" could be an inexact upper bound due to the above filtering. result.setTotal(gmsResult.getNumEntities()); result.setRuns(dataProcessInstances); return result; @@ -147,7 +121,12 @@ private Filter buildTaskRunsEntityFilter(final String entityUrn) { new Criterion() .setField(PARENT_TEMPLATE_URN_SEARCH_INDEX_FIELD_NAME) .setCondition(Condition.EQUAL) - .setValue(entityUrn))); + .setValue(entityUrn), + new Criterion() + .setField(HAS_RUN_EVENTS_FIELD_NAME) + .setCondition(Condition.EQUAL) + .setValue(Boolean.TRUE.toString()))); + final Filter filter = new Filter(); filter.setOr( new ConjunctiveCriterionArray(ImmutableList.of(new ConjunctiveCriterion().setAnd(array)))); diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/DataProcessInstanceRunEventHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/DataProcessInstanceRunEventHook.java new file mode 100644 index 00000000000000..687171e645201d --- /dev/null +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/DataProcessInstanceRunEventHook.java @@ -0,0 +1,91 @@ +package com.linkedin.metadata.kafka.hook; + +import static com.linkedin.metadata.Constants.*; + +import com.google.common.annotations.VisibleForTesting; +import com.linkedin.gms.factory.common.GraphServiceFactory; +import com.linkedin.gms.factory.common.SystemMetadataServiceFactory; +import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory; +import com.linkedin.gms.factory.search.EntitySearchServiceFactory; +import com.linkedin.gms.factory.search.SearchDocumentTransformerFactory; +import com.linkedin.gms.factory.timeseries.TimeseriesAspectServiceFactory; +import com.linkedin.metadata.search.elasticsearch.ElasticSearchService; +import com.linkedin.metadata.search.utils.SearchUtils; +import com.linkedin.mxe.MetadataChangeLog; +import io.datahubproject.metadata.context.OperationContext; + +import java.util.Objects; +import java.util.Optional; +import javax.annotation.Nonnull; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.codehaus.jackson.node.JsonNodeFactory; +import org.codehaus.jackson.node.ObjectNode; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Import; +import org.springframework.stereotype.Component; + +@Slf4j +@Component + +// TODO check required imports +@Import({ + GraphServiceFactory.class, + EntitySearchServiceFactory.class, + TimeseriesAspectServiceFactory.class, + EntityRegistryFactory.class, + SystemMetadataServiceFactory.class, + SearchDocumentTransformerFactory.class +}) +public class DataProcessInstanceRunEventHook implements MetadataChangeLogHook { + + private final boolean isEnabled; + private OperationContext systemOperationContext; + private final ElasticSearchService elasticSearchService; + @Getter private final String consumerGroupSuffix; + + @Autowired + public DataProcessInstanceRunEventHook( + ElasticSearchService elasticSearchService, + @Nonnull @Value("${myhook.enabled:true}") Boolean isEnabled, + @Nonnull @Value("${updateIndices.consumerGroupSuffix}") String consumerGroupSuffix) { + this.elasticSearchService = elasticSearchService; + this.isEnabled = isEnabled; + this.consumerGroupSuffix = consumerGroupSuffix; + } + + @VisibleForTesting + public DataProcessInstanceRunEventHook( + ElasticSearchService elasticSearchService, + @Nonnull Boolean isEnabled, + @Nonnull Boolean reprocessUIEvents) { + this(elasticSearchService, isEnabled, ""); + } + + @Override + public boolean isEnabled() { + return isEnabled; + } + + @Override + public DataProcessInstanceRunEventHook init(@Nonnull OperationContext systemOperationContext) { + this.systemOperationContext = systemOperationContext; + return this; + } + + @Override + public void invoke(@Nonnull final MetadataChangeLog event) { + if (!Objects.equals(event.getAspectName(), DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME)) { + return; + } + Optional docId = SearchUtils.getDocId(event.getEntityUrn()); + if (docId.isEmpty()) { + return; + } + ObjectNode json = JsonNodeFactory.instance.objectNode(); + json.put("hasRunEvents", true); + elasticSearchService.upsertDocument( + systemOperationContext, DATA_PROCESS_INSTANCE_ENTITY_NAME, json.toString(), docId.get()); + } +} From bb10b0200707f371bd66a0ac48002d3da38af7e2 Mon Sep 17 00:00:00 2001 From: ksrinath Date: Wed, 28 Aug 2024 17:26:25 +0530 Subject: [PATCH 04/13] spotless --- .../graphql/resolvers/jobs/DataJobRunsResolver.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/jobs/DataJobRunsResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/jobs/DataJobRunsResolver.java index 4338d0e0066273..d7c76c0235dcc0 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/jobs/DataJobRunsResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/jobs/DataJobRunsResolver.java @@ -1,7 +1,5 @@ package com.linkedin.datahub.graphql.resolvers.jobs; -import static com.linkedin.metadata.Constants.DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME; - import com.google.common.collect.ImmutableList; import com.linkedin.common.urn.Urn; import com.linkedin.datahub.graphql.QueryContext; @@ -123,9 +121,9 @@ private Filter buildTaskRunsEntityFilter(final String entityUrn) { .setCondition(Condition.EQUAL) .setValue(entityUrn), new Criterion() - .setField(HAS_RUN_EVENTS_FIELD_NAME) - .setCondition(Condition.EQUAL) - .setValue(Boolean.TRUE.toString()))); + .setField(HAS_RUN_EVENTS_FIELD_NAME) + .setCondition(Condition.EQUAL) + .setValue(Boolean.TRUE.toString()))); final Filter filter = new Filter(); filter.setOr( From cfcd688ea4cf088eb032bea403c7ea3fcb640f79 Mon Sep 17 00:00:00 2001 From: ksrinath Date: Wed, 28 Aug 2024 17:37:19 +0530 Subject: [PATCH 05/13] spotless --- .../metadata/kafka/hook/DataProcessInstanceRunEventHook.java | 1 - 1 file changed, 1 deletion(-) diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/DataProcessInstanceRunEventHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/DataProcessInstanceRunEventHook.java index 687171e645201d..a826466ca9e770 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/DataProcessInstanceRunEventHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/DataProcessInstanceRunEventHook.java @@ -13,7 +13,6 @@ import com.linkedin.metadata.search.utils.SearchUtils; import com.linkedin.mxe.MetadataChangeLog; import io.datahubproject.metadata.context.OperationContext; - import java.util.Objects; import java.util.Optional; import javax.annotation.Nonnull; From 42b54cdeb7b947a6e0e6d46051830d7f0496baf5 Mon Sep 17 00:00:00 2001 From: ksrinath Date: Wed, 28 Aug 2024 19:53:36 +0530 Subject: [PATCH 06/13] simply add searchable annotation in pdl instead of creating a new hook --- .../hook/DataProcessInstanceRunEventHook.java | 90 ------------------- .../DataProcessInstanceRunEvent.pdl | 4 + 2 files changed, 4 insertions(+), 90 deletions(-) delete mode 100644 metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/DataProcessInstanceRunEventHook.java diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/DataProcessInstanceRunEventHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/DataProcessInstanceRunEventHook.java deleted file mode 100644 index a826466ca9e770..00000000000000 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/DataProcessInstanceRunEventHook.java +++ /dev/null @@ -1,90 +0,0 @@ -package com.linkedin.metadata.kafka.hook; - -import static com.linkedin.metadata.Constants.*; - -import com.google.common.annotations.VisibleForTesting; -import com.linkedin.gms.factory.common.GraphServiceFactory; -import com.linkedin.gms.factory.common.SystemMetadataServiceFactory; -import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory; -import com.linkedin.gms.factory.search.EntitySearchServiceFactory; -import com.linkedin.gms.factory.search.SearchDocumentTransformerFactory; -import com.linkedin.gms.factory.timeseries.TimeseriesAspectServiceFactory; -import com.linkedin.metadata.search.elasticsearch.ElasticSearchService; -import com.linkedin.metadata.search.utils.SearchUtils; -import com.linkedin.mxe.MetadataChangeLog; -import io.datahubproject.metadata.context.OperationContext; -import java.util.Objects; -import java.util.Optional; -import javax.annotation.Nonnull; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.codehaus.jackson.node.JsonNodeFactory; -import org.codehaus.jackson.node.ObjectNode; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Import; -import org.springframework.stereotype.Component; - -@Slf4j -@Component - -// TODO check required imports -@Import({ - GraphServiceFactory.class, - EntitySearchServiceFactory.class, - TimeseriesAspectServiceFactory.class, - EntityRegistryFactory.class, - SystemMetadataServiceFactory.class, - SearchDocumentTransformerFactory.class -}) -public class DataProcessInstanceRunEventHook implements MetadataChangeLogHook { - - private final boolean isEnabled; - private OperationContext systemOperationContext; - private final ElasticSearchService elasticSearchService; - @Getter private final String consumerGroupSuffix; - - @Autowired - public DataProcessInstanceRunEventHook( - ElasticSearchService elasticSearchService, - @Nonnull @Value("${myhook.enabled:true}") Boolean isEnabled, - @Nonnull @Value("${updateIndices.consumerGroupSuffix}") String consumerGroupSuffix) { - this.elasticSearchService = elasticSearchService; - this.isEnabled = isEnabled; - this.consumerGroupSuffix = consumerGroupSuffix; - } - - @VisibleForTesting - public DataProcessInstanceRunEventHook( - ElasticSearchService elasticSearchService, - @Nonnull Boolean isEnabled, - @Nonnull Boolean reprocessUIEvents) { - this(elasticSearchService, isEnabled, ""); - } - - @Override - public boolean isEnabled() { - return isEnabled; - } - - @Override - public DataProcessInstanceRunEventHook init(@Nonnull OperationContext systemOperationContext) { - this.systemOperationContext = systemOperationContext; - return this; - } - - @Override - public void invoke(@Nonnull final MetadataChangeLog event) { - if (!Objects.equals(event.getAspectName(), DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME)) { - return; - } - Optional docId = SearchUtils.getDocId(event.getEntityUrn()); - if (docId.isEmpty()) { - return; - } - ObjectNode json = JsonNodeFactory.instance.objectNode(); - json.put("hasRunEvents", true); - elasticSearchService.upsertDocument( - systemOperationContext, DATA_PROCESS_INSTANCE_ENTITY_NAME, json.toString(), docId.get()); - } -} diff --git a/metadata-models/src/main/pegasus/com/linkedin/dataprocess/DataProcessInstanceRunEvent.pdl b/metadata-models/src/main/pegasus/com/linkedin/dataprocess/DataProcessInstanceRunEvent.pdl index d9850c82442bf6..42179db0554bdb 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/dataprocess/DataProcessInstanceRunEvent.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/dataprocess/DataProcessInstanceRunEvent.pdl @@ -15,6 +15,10 @@ import com.linkedin.common.Urn record DataProcessInstanceRunEvent includes TimeseriesAspectBase, ExternalReference { @TimeseriesField = {} + @Searchable = { + "fieldType": "TEXT", + "hasValuesFieldName": "hasRunEvents" + } status: enum DataProcessRunStatus { /** * The status where the Data processing run is in. From 4fb5e8cb202f5d878ecdf3e2b5d341033f7d6500 Mon Sep 17 00:00:00 2001 From: ksrinath Date: Wed, 28 Aug 2024 21:07:04 +0530 Subject: [PATCH 07/13] misc fix --- .../com/linkedin/dataprocess/DataProcessInstanceRunEvent.pdl | 1 - 1 file changed, 1 deletion(-) diff --git a/metadata-models/src/main/pegasus/com/linkedin/dataprocess/DataProcessInstanceRunEvent.pdl b/metadata-models/src/main/pegasus/com/linkedin/dataprocess/DataProcessInstanceRunEvent.pdl index 42179db0554bdb..c18a4168a2a76a 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/dataprocess/DataProcessInstanceRunEvent.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/dataprocess/DataProcessInstanceRunEvent.pdl @@ -16,7 +16,6 @@ record DataProcessInstanceRunEvent includes TimeseriesAspectBase, ExternalRefere @TimeseriesField = {} @Searchable = { - "fieldType": "TEXT", "hasValuesFieldName": "hasRunEvents" } status: enum DataProcessRunStatus { From f158490c8ba98bff1b48244ff1cfaa904242a504 Mon Sep 17 00:00:00 2001 From: ksrinath Date: Fri, 30 Aug 2024 13:04:09 +0530 Subject: [PATCH 08/13] WIP backfilling dataProcessInstances --- .../BackfillDataProcessInstancesConfig.java | 29 +++ .../BackfillDataProcessInstances.java | 42 +++++ ...lDataProcessInstancesHasRunEventsStep.java | 177 ++++++++++++++++++ .../src/main/resources/application.yaml | 3 + 4 files changed, 251 insertions(+) create mode 100644 datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java create mode 100644 datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java create mode 100644 datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java new file mode 100644 index 00000000000000..36286b47251616 --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java @@ -0,0 +1,29 @@ +package com.linkedin.datahub.upgrade.config; + +import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade; +import com.linkedin.datahub.upgrade.system.dataprocessinstances.BackfillDataProcessInstances; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.search.elasticsearch.ElasticSearchService; +import io.datahubproject.metadata.context.OperationContext; +import org.opensearch.client.RestHighLevelClient; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Conditional; +import org.springframework.context.annotation.Configuration; + +@Configuration +@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class) +public class BackfillDataProcessInstancesConfig { + + @Bean + public NonBlockingSystemUpgrade backfillProcessInstancesHasRunEvents( + final OperationContext opContext, + EntityService entityService, + ElasticSearchService elasticSearchService, + RestHighLevelClient restHighLevelClient, + @Value("${systemUpdate.processInstanceHasRunEvents.enabled}") final boolean enabled, + @Value("${systemUpdate.processInstanceHasRunEvents.batchSize}") final Integer batchSize) { + return new BackfillDataProcessInstances( + opContext, entityService, elasticSearchService, restHighLevelClient, enabled, batchSize); + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java new file mode 100644 index 00000000000000..edc12a077d8298 --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java @@ -0,0 +1,42 @@ +package com.linkedin.datahub.upgrade.system.dataprocessinstances; + +import com.google.common.collect.ImmutableList; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.search.elasticsearch.ElasticSearchService; +import io.datahubproject.metadata.context.OperationContext; +import java.util.List; +import org.opensearch.client.RestHighLevelClient; + +public class BackfillDataProcessInstances implements NonBlockingSystemUpgrade { + + private final List _steps; + + public BackfillDataProcessInstances( + OperationContext opContext, + EntityService entityService, + ElasticSearchService elasticSearchService, + RestHighLevelClient restHighLevelClient, + boolean enabled, + Integer batchSize) { + if (enabled) { + _steps = + ImmutableList.of( + new BackfillDataProcessInstancesHasRunEventsStep( + opContext, entityService, elasticSearchService, restHighLevelClient, batchSize)); + } else { + _steps = ImmutableList.of(); + } + } + + @Override + public String id() { + return "BackfillDataProcessInstanceHasRunEvents"; + } + + @Override + public List steps() { + return _steps; + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java new file mode 100644 index 00000000000000..875de545dcd2ec --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java @@ -0,0 +1,177 @@ +package com.linkedin.datahub.upgrade.system.dataprocessinstances; + +import static com.linkedin.metadata.Constants.DATA_PROCESS_INSTANCE_ENTITY_NAME; +import static com.linkedin.metadata.Constants.DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME; + +import com.google.common.base.Throwables; +import com.linkedin.common.AuditStamp; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.datahub.upgrade.UpgradeContext; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.UpgradeStepResult; +import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.boot.BootstrapStep; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.search.elasticsearch.ElasticSearchService; +import com.linkedin.metadata.search.utils.SearchUtils; +import io.datahubproject.metadata.context.OperationContext; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.codehaus.jackson.node.JsonNodeFactory; +import org.codehaus.jackson.node.ObjectNode; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.search.aggregations.Aggregation; +import org.opensearch.search.aggregations.AggregationBuilders; +import org.opensearch.search.aggregations.bucket.composite.CompositeAggregation; +import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; +import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder; +import org.opensearch.search.builder.SearchSourceBuilder; + +@Slf4j +public class BackfillDataProcessInstancesHasRunEventsStep implements UpgradeStep { + + private static final String UPGRADE_ID = "BackfillDataProcessInstanceHasRunEvents"; + private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID); + + private final OperationContext opContext; + private final EntityService entityService; + private final ElasticSearchService elasticSearchService; + private final RestHighLevelClient restHighLevelClient; + + private final Integer batchSize; + + public BackfillDataProcessInstancesHasRunEventsStep( + OperationContext opContext, + EntityService entityService, + ElasticSearchService elasticSearchService, + RestHighLevelClient restHighLevelClient, + Integer batchSize) { + this.opContext = opContext; + this.entityService = entityService; + this.elasticSearchService = elasticSearchService; + this.restHighLevelClient = restHighLevelClient; + this.batchSize = batchSize; + } + + @Override + public Function executable() { + return (context) -> { + final AuditStamp auditStamp = + new AuditStamp() + .setActor(UrnUtils.getUrn(Constants.SYSTEM_ACTOR)) + .setTime(System.currentTimeMillis()); + + TermsValuesSourceBuilder termsValuesSourceBuilder = + new TermsValuesSourceBuilder("urn").field("urn"); + + CompositeAggregationBuilder aggregationBuilder = + AggregationBuilders.composite("aggs", List.of(termsValuesSourceBuilder)).size(batchSize); + ObjectNode json = JsonNodeFactory.instance.objectNode(); + json.put("hasRunEvents", true); + + String runEventsIndexName = + opContext + .getSearchContext() + .getIndexConvention() + .getTimeseriesAspectIndexName( + DATA_PROCESS_INSTANCE_ENTITY_NAME, DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME); + + UpgradeStepResult.Result result = UpgradeStepResult.Result.SUCCEEDED; + + while (true) { + SearchRequest searchRequest = new SearchRequest(runEventsIndexName); + searchRequest.source(new SearchSourceBuilder().size(0).aggregation(aggregationBuilder)); + SearchResponse response; + + try { + response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); + } catch (IOException e) { + log.error(Throwables.getStackTraceAsString(e)); + log.error("Error querying index {}", runEventsIndexName); + result = UpgradeStepResult.Result.FAILED; + break; + } + List aggregations = response.getAggregations().asList(); + if (aggregations.isEmpty()) { + break; + } + CompositeAggregation aggregation = (CompositeAggregation) aggregations.get(0); + Set urns = new HashSet<>(); + for (CompositeAggregation.Bucket bucket : aggregation.getBuckets()) { + for (Object value : bucket.getKey().values()) { + try { + urns.add(Urn.createFromString(String.valueOf(value))); + } catch (URISyntaxException e) { + log.warn("Ignoring invalid urn {}", value); + } + } + } + if (!urns.isEmpty()) { + urns = entityService.exists(opContext, urns); + urns.forEach( + urn -> { + Optional docId = SearchUtils.getDocId(urn); + if (docId.isEmpty()) { + return; + } + elasticSearchService.upsertDocument( + opContext, DATA_PROCESS_INSTANCE_ENTITY_NAME, json.toString(), docId.get()); + }); + } + if (aggregation.afterKey() == null) { + break; + } + aggregationBuilder.aggregateAfter(aggregation.afterKey()); + } + BootstrapStep.setUpgradeResult(context.opContext(), UPGRADE_ID_URN, entityService); + return new DefaultUpgradeStepResult(id(), result); + }; + } + + @Override + public String id() { + return UPGRADE_ID; + } + + /** + * Returns whether the upgrade should proceed if the step fails after exceeding the maximum + * retries. + */ + @Override + public boolean isOptional() { + return true; + } + + /** Returns whether the upgrade should be skipped. */ + @Override + public boolean skip(UpgradeContext context) { + return false; + + /*boolean envEnabled = + Boolean.parseBoolean(System.getenv("BACKFILL_PROCESS_INSTANCES_HAS_RUN_EVENTS")); + + if (envEnabled) { + return false; + } + + boolean previouslyRun = + entityService.exists( + context.opContext(), UPGRADE_ID_URN, DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true); + if (previouslyRun) { + log.info("{} was already run. Skipping.", id()); + return true; + } + return false;*/ + } +} diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index 5b3673ddca52c6..fba548a79d207d 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -350,6 +350,9 @@ systemUpdate: batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_OWNERSHIP_TYPES_BATCH_SIZE:1000} reprocess: enabled: ${BOOTSTRAP_SYSTEM_UPDATE_OWNERSHIP_TYPES_REPROCESS:false} + processInstanceHasRunEvents: + enabled: true + batchSize: 50 structuredProperties: enabled: ${ENABLE_STRUCTURED_PROPERTIES_HOOK:true} # applies structured properties mappings From df1e6e8914d9a550c0cf91f94eb8a5b85a834aac Mon Sep 17 00:00:00 2001 From: ksrinath Date: Mon, 2 Sep 2024 12:05:22 +0530 Subject: [PATCH 09/13] reprocess-flag, config parameters --- .../BackfillDataProcessInstancesConfig.java | 10 +++++++++- .../BackfillDataProcessInstances.java | 8 +++++++- ...fillDataProcessInstancesHasRunEventsStep.java | 16 ++++++---------- .../src/main/resources/application.yaml | 6 ++++-- 4 files changed, 26 insertions(+), 14 deletions(-) diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java index 36286b47251616..f09cd65fb4422d 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java @@ -22,8 +22,16 @@ public NonBlockingSystemUpgrade backfillProcessInstancesHasRunEvents( ElasticSearchService elasticSearchService, RestHighLevelClient restHighLevelClient, @Value("${systemUpdate.processInstanceHasRunEvents.enabled}") final boolean enabled, + @Value("${systemUpdate.processInstanceHasRunEvents.reprocess.enabled}") + boolean reprocessEnabled, @Value("${systemUpdate.processInstanceHasRunEvents.batchSize}") final Integer batchSize) { return new BackfillDataProcessInstances( - opContext, entityService, elasticSearchService, restHighLevelClient, enabled, batchSize); + opContext, + entityService, + elasticSearchService, + restHighLevelClient, + enabled, + reprocessEnabled, + batchSize); } } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java index edc12a077d8298..35a86ec3510f51 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java @@ -19,12 +19,18 @@ public BackfillDataProcessInstances( ElasticSearchService elasticSearchService, RestHighLevelClient restHighLevelClient, boolean enabled, + boolean reprocessEnabled, Integer batchSize) { if (enabled) { _steps = ImmutableList.of( new BackfillDataProcessInstancesHasRunEventsStep( - opContext, entityService, elasticSearchService, restHighLevelClient, batchSize)); + opContext, + entityService, + elasticSearchService, + restHighLevelClient, + reprocessEnabled, + batchSize)); } else { _steps = ImmutableList.of(); } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java index 875de545dcd2ec..6c83e06e35ca76 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java @@ -1,7 +1,6 @@ package com.linkedin.datahub.upgrade.system.dataprocessinstances; -import static com.linkedin.metadata.Constants.DATA_PROCESS_INSTANCE_ENTITY_NAME; -import static com.linkedin.metadata.Constants.DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME; +import static com.linkedin.metadata.Constants.*; import com.google.common.base.Throwables; import com.linkedin.common.AuditStamp; @@ -49,6 +48,7 @@ public class BackfillDataProcessInstancesHasRunEventsStep implements UpgradeStep private final ElasticSearchService elasticSearchService; private final RestHighLevelClient restHighLevelClient; + private final boolean reprocessEnabled; private final Integer batchSize; public BackfillDataProcessInstancesHasRunEventsStep( @@ -56,11 +56,13 @@ public BackfillDataProcessInstancesHasRunEventsStep( EntityService entityService, ElasticSearchService elasticSearchService, RestHighLevelClient restHighLevelClient, + boolean reprocessEnabled, Integer batchSize) { this.opContext = opContext; this.entityService = entityService; this.elasticSearchService = elasticSearchService; this.restHighLevelClient = restHighLevelClient; + this.reprocessEnabled = reprocessEnabled; this.batchSize = batchSize; } @@ -156,12 +158,7 @@ public boolean isOptional() { /** Returns whether the upgrade should be skipped. */ @Override public boolean skip(UpgradeContext context) { - return false; - - /*boolean envEnabled = - Boolean.parseBoolean(System.getenv("BACKFILL_PROCESS_INSTANCES_HAS_RUN_EVENTS")); - - if (envEnabled) { + if (reprocessEnabled) { return false; } @@ -170,8 +167,7 @@ public boolean skip(UpgradeContext context) { context.opContext(), UPGRADE_ID_URN, DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true); if (previouslyRun) { log.info("{} was already run. Skipping.", id()); - return true; } - return false;*/ + return previouslyRun; } } diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index fba548a79d207d..e2a4ae1ce41d78 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -351,8 +351,10 @@ systemUpdate: reprocess: enabled: ${BOOTSTRAP_SYSTEM_UPDATE_OWNERSHIP_TYPES_REPROCESS:false} processInstanceHasRunEvents: - enabled: true - batchSize: 50 + enabled: ${BOOTSTRAP_SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_ENABLED:true} + batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_BATCH_SIZE:100} + reprocess: + enabled: ${BOOTSTRAP_SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_REPROCESS:false} structuredProperties: enabled: ${ENABLE_STRUCTURED_PROPERTIES_HOOK:true} # applies structured properties mappings From 4c41415f507be921e21413a0809d9b85839fff85 Mon Sep 17 00:00:00 2001 From: ksrinath Date: Mon, 2 Sep 2024 17:24:03 +0530 Subject: [PATCH 10/13] misc fix --- .../BackfillDataProcessInstancesHasRunEventsStep.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java index 6c83e06e35ca76..39863f9625c31b 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java @@ -40,7 +40,7 @@ @Slf4j public class BackfillDataProcessInstancesHasRunEventsStep implements UpgradeStep { - private static final String UPGRADE_ID = "BackfillDataProcessInstanceHasRunEvents"; + private static final String UPGRADE_ID = "BackfillDataProcessInstancesHasRunEvents"; private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID); private final OperationContext opContext; From 140082a804bc108e04d21cbe7d587bd09b360e56 Mon Sep 17 00:00:00 2001 From: ksrinath Date: Thu, 5 Sep 2024 14:20:21 +0530 Subject: [PATCH 11/13] fix compile errors due to upstream changes; misc. --- .../BackfillDataProcessInstances.java | 2 +- ...lDataProcessInstancesHasRunEventsStep.java | 33 +++++++++---------- 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java index 35a86ec3510f51..c049f0e2e8d5af 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java @@ -38,7 +38,7 @@ public BackfillDataProcessInstances( @Override public String id() { - return "BackfillDataProcessInstanceHasRunEvents"; + return "BackfillDataProcessInstances"; } @Override diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java index 39863f9625c31b..7945f0164b7295 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java @@ -14,13 +14,13 @@ import com.linkedin.metadata.boot.BootstrapStep; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.search.elasticsearch.ElasticSearchService; -import com.linkedin.metadata.search.utils.SearchUtils; +import com.linkedin.metadata.utils.elasticsearch.IndexConvention; +import com.linkedin.upgrade.DataHubUpgradeState; import io.datahubproject.metadata.context.OperationContext; import java.io.IOException; import java.net.URISyntaxException; import java.util.HashSet; import java.util.List; -import java.util.Optional; import java.util.Set; import java.util.function.Function; import lombok.extern.slf4j.Slf4j; @@ -82,14 +82,13 @@ public Function executable() { ObjectNode json = JsonNodeFactory.instance.objectNode(); json.put("hasRunEvents", true); + IndexConvention indexConvention = opContext.getSearchContext().getIndexConvention(); + String runEventsIndexName = - opContext - .getSearchContext() - .getIndexConvention() - .getTimeseriesAspectIndexName( - DATA_PROCESS_INSTANCE_ENTITY_NAME, DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME); + indexConvention.getTimeseriesAspectIndexName( + DATA_PROCESS_INSTANCE_ENTITY_NAME, DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME); - UpgradeStepResult.Result result = UpgradeStepResult.Result.SUCCEEDED; + DataHubUpgradeState upgradeState = DataHubUpgradeState.SUCCEEDED; while (true) { SearchRequest searchRequest = new SearchRequest(runEventsIndexName); @@ -101,7 +100,7 @@ public Function executable() { } catch (IOException e) { log.error(Throwables.getStackTraceAsString(e)); log.error("Error querying index {}", runEventsIndexName); - result = UpgradeStepResult.Result.FAILED; + upgradeState = DataHubUpgradeState.FAILED; break; } List aggregations = response.getAggregations().asList(); @@ -122,14 +121,12 @@ public Function executable() { if (!urns.isEmpty()) { urns = entityService.exists(opContext, urns); urns.forEach( - urn -> { - Optional docId = SearchUtils.getDocId(urn); - if (docId.isEmpty()) { - return; - } - elasticSearchService.upsertDocument( - opContext, DATA_PROCESS_INSTANCE_ENTITY_NAME, json.toString(), docId.get()); - }); + urn -> + elasticSearchService.upsertDocument( + opContext, + DATA_PROCESS_INSTANCE_ENTITY_NAME, + json.toString(), + indexConvention.getEntityDocumentId(urn))); } if (aggregation.afterKey() == null) { break; @@ -137,7 +134,7 @@ public Function executable() { aggregationBuilder.aggregateAfter(aggregation.afterKey()); } BootstrapStep.setUpgradeResult(context.opContext(), UPGRADE_ID_URN, entityService); - return new DefaultUpgradeStepResult(id(), result); + return new DefaultUpgradeStepResult(id(), upgradeState); }; } From dd798f3e18241e7dfb11b1d08e07cd37d104ab23 Mon Sep 17 00:00:00 2001 From: ksrinath Date: Thu, 5 Sep 2024 14:38:19 +0530 Subject: [PATCH 12/13] add batchDelayMs to throttle aggregation query --- .../BackfillDataProcessInstancesConfig.java | 6 +++-- .../BackfillDataProcessInstances.java | 6 +++-- ...lDataProcessInstancesHasRunEventsStep.java | 22 +++++++++++-------- .../src/main/resources/application.yaml | 7 +++--- 4 files changed, 25 insertions(+), 16 deletions(-) diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java index f09cd65fb4422d..3ffb34358b754b 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java @@ -24,7 +24,8 @@ public NonBlockingSystemUpgrade backfillProcessInstancesHasRunEvents( @Value("${systemUpdate.processInstanceHasRunEvents.enabled}") final boolean enabled, @Value("${systemUpdate.processInstanceHasRunEvents.reprocess.enabled}") boolean reprocessEnabled, - @Value("${systemUpdate.processInstanceHasRunEvents.batchSize}") final Integer batchSize) { + @Value("${systemUpdate.processInstanceHasRunEvents.batchSize}") final Integer batchSize, + @Value("${systemUpdate.processInstanceHasRunEvents.delayMs}") final Integer delayMs) { return new BackfillDataProcessInstances( opContext, entityService, @@ -32,6 +33,7 @@ public NonBlockingSystemUpgrade backfillProcessInstancesHasRunEvents( restHighLevelClient, enabled, reprocessEnabled, - batchSize); + batchSize, + delayMs); } } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java index c049f0e2e8d5af..ef4e7f2a7e8213 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java @@ -20,7 +20,8 @@ public BackfillDataProcessInstances( RestHighLevelClient restHighLevelClient, boolean enabled, boolean reprocessEnabled, - Integer batchSize) { + Integer batchSize, + Integer batchDelayMs) { if (enabled) { _steps = ImmutableList.of( @@ -30,7 +31,8 @@ public BackfillDataProcessInstances( elasticSearchService, restHighLevelClient, reprocessEnabled, - batchSize)); + batchSize, + batchDelayMs)); } else { _steps = ImmutableList.of(); } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java index 7945f0164b7295..22489fc499028c 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java @@ -3,14 +3,11 @@ import static com.linkedin.metadata.Constants.*; import com.google.common.base.Throwables; -import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; -import com.linkedin.common.urn.UrnUtils; import com.linkedin.datahub.upgrade.UpgradeContext; import com.linkedin.datahub.upgrade.UpgradeStep; import com.linkedin.datahub.upgrade.UpgradeStepResult; import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; -import com.linkedin.metadata.Constants; import com.linkedin.metadata.boot.BootstrapStep; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.search.elasticsearch.ElasticSearchService; @@ -50,6 +47,7 @@ public class BackfillDataProcessInstancesHasRunEventsStep implements UpgradeStep private final boolean reprocessEnabled; private final Integer batchSize; + private final Integer batchDelayMs; public BackfillDataProcessInstancesHasRunEventsStep( OperationContext opContext, @@ -57,23 +55,21 @@ public BackfillDataProcessInstancesHasRunEventsStep( ElasticSearchService elasticSearchService, RestHighLevelClient restHighLevelClient, boolean reprocessEnabled, - Integer batchSize) { + Integer batchSize, + Integer batchDelayMs) { this.opContext = opContext; this.entityService = entityService; this.elasticSearchService = elasticSearchService; this.restHighLevelClient = restHighLevelClient; this.reprocessEnabled = reprocessEnabled; this.batchSize = batchSize; + this.batchDelayMs = batchDelayMs; } + @SuppressWarnings("BusyWait") @Override public Function executable() { return (context) -> { - final AuditStamp auditStamp = - new AuditStamp() - .setActor(UrnUtils.getUrn(Constants.SYSTEM_ACTOR)) - .setTime(System.currentTimeMillis()); - TermsValuesSourceBuilder termsValuesSourceBuilder = new TermsValuesSourceBuilder("urn").field("urn"); @@ -132,6 +128,14 @@ public Function executable() { break; } aggregationBuilder.aggregateAfter(aggregation.afterKey()); + if (batchDelayMs > 0) { + log.info("Sleeping for {} ms", batchDelayMs); + try { + Thread.sleep(batchDelayMs); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } } BootstrapStep.setUpgradeResult(context.opContext(), UPGRADE_ID_URN, entityService); return new DefaultUpgradeStepResult(id(), upgradeState); diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index 518c18be2c2a11..32bbb00e5e9561 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -371,10 +371,11 @@ systemUpdate: delayMs: ${SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_DELAY_MS:5000} limit: ${SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_LIMIT:0} processInstanceHasRunEvents: - enabled: ${BOOTSTRAP_SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_ENABLED:true} - batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_BATCH_SIZE:100} + enabled: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_ENABLED:true} + batchSize: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_BATCH_SIZE:100} + delayMs: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_DELAY_MS:1000} reprocess: - enabled: ${BOOTSTRAP_SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_REPROCESS:false} + enabled: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_REPROCESS:false} structuredProperties: enabled: ${ENABLE_STRUCTURED_PROPERTIES_HOOK:true} # applies structured properties mappings From 72cc63ab14486371a2bb3b22a61974c86ad6a8ad Mon Sep 17 00:00:00 2001 From: ksrinath Date: Tue, 10 Sep 2024 16:35:06 +0530 Subject: [PATCH 13/13] time-window filtering and capped history --- .../BackfillDataProcessInstancesConfig.java | 8 +- .../BackfillDataProcessInstances.java | 8 +- ...lDataProcessInstancesHasRunEventsStep.java | 131 ++++++++++++------ .../src/main/resources/application.yaml | 2 + 4 files changed, 99 insertions(+), 50 deletions(-) diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java index 3ffb34358b754b..bc55ad38765ed5 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java @@ -25,7 +25,9 @@ public NonBlockingSystemUpgrade backfillProcessInstancesHasRunEvents( @Value("${systemUpdate.processInstanceHasRunEvents.reprocess.enabled}") boolean reprocessEnabled, @Value("${systemUpdate.processInstanceHasRunEvents.batchSize}") final Integer batchSize, - @Value("${systemUpdate.processInstanceHasRunEvents.delayMs}") final Integer delayMs) { + @Value("${systemUpdate.processInstanceHasRunEvents.delayMs}") final Integer delayMs, + @Value("${systemUpdate.processInstanceHasRunEvents.totalDays}") Integer totalDays, + @Value("${systemUpdate.processInstanceHasRunEvents.windowDays}") Integer windowDays) { return new BackfillDataProcessInstances( opContext, entityService, @@ -34,6 +36,8 @@ public NonBlockingSystemUpgrade backfillProcessInstancesHasRunEvents( enabled, reprocessEnabled, batchSize, - delayMs); + delayMs, + totalDays, + windowDays); } } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java index ef4e7f2a7e8213..643a0ff5a4ce25 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java @@ -21,7 +21,9 @@ public BackfillDataProcessInstances( boolean enabled, boolean reprocessEnabled, Integer batchSize, - Integer batchDelayMs) { + Integer batchDelayMs, + Integer totalDays, + Integer windowDays) { if (enabled) { _steps = ImmutableList.of( @@ -32,7 +34,9 @@ public BackfillDataProcessInstances( restHighLevelClient, reprocessEnabled, batchSize, - batchDelayMs)); + batchDelayMs, + totalDays, + windowDays)); } else { _steps = ImmutableList.of(); } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java index 22489fc499028c..55cdcae931ab5b 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java @@ -16,6 +16,8 @@ import io.datahubproject.metadata.context.OperationContext; import java.io.IOException; import java.net.URISyntaxException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -27,6 +29,8 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestHighLevelClient; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.aggregations.Aggregation; import org.opensearch.search.aggregations.AggregationBuilders; import org.opensearch.search.aggregations.bucket.composite.CompositeAggregation; @@ -49,6 +53,9 @@ public class BackfillDataProcessInstancesHasRunEventsStep implements UpgradeStep private final Integer batchSize; private final Integer batchDelayMs; + private final Integer totalDays; + private final Integer windowDays; + public BackfillDataProcessInstancesHasRunEventsStep( OperationContext opContext, EntityService entityService, @@ -56,7 +63,9 @@ public BackfillDataProcessInstancesHasRunEventsStep( RestHighLevelClient restHighLevelClient, boolean reprocessEnabled, Integer batchSize, - Integer batchDelayMs) { + Integer batchDelayMs, + Integer totalDays, + Integer windowDays) { this.opContext = opContext; this.entityService = entityService; this.elasticSearchService = elasticSearchService; @@ -64,6 +73,8 @@ public BackfillDataProcessInstancesHasRunEventsStep( this.reprocessEnabled = reprocessEnabled; this.batchSize = batchSize; this.batchDelayMs = batchDelayMs; + this.totalDays = totalDays; + this.windowDays = windowDays; } @SuppressWarnings("BusyWait") @@ -73,8 +84,6 @@ public Function executable() { TermsValuesSourceBuilder termsValuesSourceBuilder = new TermsValuesSourceBuilder("urn").field("urn"); - CompositeAggregationBuilder aggregationBuilder = - AggregationBuilders.composite("aggs", List.of(termsValuesSourceBuilder)).size(batchSize); ObjectNode json = JsonNodeFactory.instance.objectNode(); json.put("hasRunEvents", true); @@ -86,54 +95,84 @@ public Function executable() { DataHubUpgradeState upgradeState = DataHubUpgradeState.SUCCEEDED; - while (true) { - SearchRequest searchRequest = new SearchRequest(runEventsIndexName); - searchRequest.source(new SearchSourceBuilder().size(0).aggregation(aggregationBuilder)); - SearchResponse response; - - try { - response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); - } catch (IOException e) { - log.error(Throwables.getStackTraceAsString(e)); - log.error("Error querying index {}", runEventsIndexName); - upgradeState = DataHubUpgradeState.FAILED; + Instant now = Instant.now(); + Instant overallStart = now.minus(totalDays, ChronoUnit.DAYS); + for (int i = 0; ; i++) { + Instant windowEnd = now.minus(i * windowDays, ChronoUnit.DAYS); + if (!windowEnd.isAfter(overallStart)) { break; } - List aggregations = response.getAggregations().asList(); - if (aggregations.isEmpty()) { - break; + Instant windowStart = windowEnd.minus(windowDays, ChronoUnit.DAYS); + if (windowStart.isBefore(overallStart)) { + // last iteration, cap at overallStart + windowStart = overallStart; } - CompositeAggregation aggregation = (CompositeAggregation) aggregations.get(0); - Set urns = new HashSet<>(); - for (CompositeAggregation.Bucket bucket : aggregation.getBuckets()) { - for (Object value : bucket.getKey().values()) { - try { - urns.add(Urn.createFromString(String.valueOf(value))); - } catch (URISyntaxException e) { - log.warn("Ignoring invalid urn {}", value); + + QueryBuilder queryBuilder = + QueryBuilders.boolQuery() + .must( + QueryBuilders.rangeQuery("@timestamp") + .gte(windowStart.toString()) + .lt(windowEnd.toString())); + + CompositeAggregationBuilder aggregationBuilder = + AggregationBuilders.composite("aggs", List.of(termsValuesSourceBuilder)) + .size(batchSize); + + while (true) { + SearchRequest searchRequest = new SearchRequest(runEventsIndexName); + searchRequest.source( + new SearchSourceBuilder() + .size(0) + .aggregation(aggregationBuilder) + .query(queryBuilder)); + + SearchResponse response; + + try { + response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); + } catch (IOException e) { + log.error(Throwables.getStackTraceAsString(e)); + log.error("Error querying index {}", runEventsIndexName); + upgradeState = DataHubUpgradeState.FAILED; + break; + } + List aggregations = response.getAggregations().asList(); + if (aggregations.isEmpty()) { + break; + } + CompositeAggregation aggregation = (CompositeAggregation) aggregations.get(0); + Set urns = new HashSet<>(); + for (CompositeAggregation.Bucket bucket : aggregation.getBuckets()) { + for (Object value : bucket.getKey().values()) { + try { + urns.add(Urn.createFromString(String.valueOf(value))); + } catch (URISyntaxException e) { + log.warn("Ignoring invalid urn {}", value); + } } } - } - if (!urns.isEmpty()) { - urns = entityService.exists(opContext, urns); - urns.forEach( - urn -> - elasticSearchService.upsertDocument( - opContext, - DATA_PROCESS_INSTANCE_ENTITY_NAME, - json.toString(), - indexConvention.getEntityDocumentId(urn))); - } - if (aggregation.afterKey() == null) { - break; - } - aggregationBuilder.aggregateAfter(aggregation.afterKey()); - if (batchDelayMs > 0) { - log.info("Sleeping for {} ms", batchDelayMs); - try { - Thread.sleep(batchDelayMs); - } catch (InterruptedException e) { - throw new RuntimeException(e); + if (!urns.isEmpty()) { + urns = entityService.exists(opContext, urns); + urns.forEach( + urn -> + elasticSearchService.upsertDocument( + opContext, + DATA_PROCESS_INSTANCE_ENTITY_NAME, + json.toString(), + indexConvention.getEntityDocumentId(urn))); + } + if (aggregation.afterKey() == null) { + break; + } + aggregationBuilder.aggregateAfter(aggregation.afterKey()); + if (batchDelayMs > 0) { + log.info("Sleeping for {} ms", batchDelayMs); + try { + Thread.sleep(batchDelayMs); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } } diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index cb73f303469e57..12d0175bac854a 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -391,6 +391,8 @@ systemUpdate: enabled: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_ENABLED:true} batchSize: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_BATCH_SIZE:100} delayMs: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_DELAY_MS:1000} + totalDays: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_TOTAL_DAYS:90} + windowDays: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_WINDOW_DAYS:1} reprocess: enabled: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_REPROCESS:false}