From ed842e49b6265cc109776e96ca832237c4ec1d8b Mon Sep 17 00:00:00 2001 From: ritwiksahani Date: Thu, 16 Jan 2025 16:15:21 +0530 Subject: [PATCH] Validate workflow token on program status trigger. --- .../runtime/schedule/queue/JobQueueTable.java | 3 +- .../runtime/schedule/trigger/AndTrigger.java | 5 +- .../schedule/trigger/NotificationContext.java | 51 +++++++ .../runtime/schedule/trigger/OrTrigger.java | 5 +- .../schedule/trigger/PartitionTrigger.java | 4 +- .../trigger/ProgramStatusTrigger.java | 35 ++++- .../schedule/trigger/SatisfiableTrigger.java | 5 +- .../runtime/schedule/trigger/TimeTrigger.java | 4 +- .../trigger/ProgramStatusTriggerTest.java | 125 ++++++++++++++++++ 9 files changed, 217 insertions(+), 20 deletions(-) create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/NotificationContext.java create mode 100644 cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/ProgramStatusTriggerTest.java diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/queue/JobQueueTable.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/queue/JobQueueTable.java index 19cc03e7669..26afbf7de3d 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/queue/JobQueueTable.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/queue/JobQueueTable.java @@ -33,6 +33,7 @@ import io.cdap.cdap.internal.app.runtime.schedule.ProgramScheduleRecord; import io.cdap.cdap.internal.app.runtime.schedule.ProgramScheduleStatus; import io.cdap.cdap.internal.app.runtime.schedule.constraint.ConstraintCodec; +import io.cdap.cdap.internal.app.runtime.schedule.trigger.NotificationContext; import io.cdap.cdap.internal.app.runtime.schedule.trigger.SatisfiableTrigger; import io.cdap.cdap.internal.app.runtime.schedule.trigger.TriggerCodec; import io.cdap.cdap.internal.app.store.AppMetadataStore; @@ -209,7 +210,7 @@ private void addNotification(Job job, Notification notification) throws IOExcept } private boolean isTriggerSatisfied(ProgramSchedule schedule, List notifications) { - return ((SatisfiableTrigger) schedule.getTrigger()).isSatisfied(schedule, notifications); + return ((SatisfiableTrigger) schedule.getTrigger()).isSatisfied(schedule, new NotificationContext(notifications,appMetadataStore)); } @Override diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/AndTrigger.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/AndTrigger.java index 331d38db8b5..8bae0dc6ea1 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/AndTrigger.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/AndTrigger.java @@ -19,7 +19,6 @@ import io.cdap.cdap.api.schedule.Trigger; import io.cdap.cdap.api.schedule.TriggerInfo; import io.cdap.cdap.internal.app.runtime.schedule.ProgramSchedule; -import io.cdap.cdap.proto.Notification; import io.cdap.cdap.proto.id.ProgramId; import java.util.ArrayList; import java.util.Arrays; @@ -41,9 +40,9 @@ public AndTrigger(List triggers) { @Override - public boolean isSatisfied(ProgramSchedule schedule, List notifications) { + public boolean isSatisfied(ProgramSchedule schedule, NotificationContext notificationContext) { for (Trigger trigger : getTriggers()) { - if (!((SatisfiableTrigger) trigger).isSatisfied(schedule, notifications)) { + if (!((SatisfiableTrigger) trigger).isSatisfied(schedule, notificationContext)) { return false; } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/NotificationContext.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/NotificationContext.java new file mode 100644 index 00000000000..ace67e5320d --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/NotificationContext.java @@ -0,0 +1,51 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.runtime.schedule.trigger; + +import io.cdap.cdap.api.workflow.WorkflowToken; +import io.cdap.cdap.internal.app.store.AppMetadataStore; +import io.cdap.cdap.proto.Notification; +import io.cdap.cdap.proto.ProgramType; +import io.cdap.cdap.proto.id.ProgramId; +import io.cdap.cdap.proto.id.ProgramRunId; +import io.cdap.cdap.proto.id.WorkflowId; +import java.io.IOException; +import java.util.List; + +public class NotificationContext { + + private final List notifications; + private final AppMetadataStore appMetadataStore; + + public NotificationContext(List notifications, AppMetadataStore appMetadataStore) { + this.notifications = notifications; + this.appMetadataStore = appMetadataStore; + } + + public List getNotifications() { + return notifications; + } + + public WorkflowToken getWorkflowToken(ProgramRunId programRunId) throws IOException { + ProgramId programId = programRunId.getParent(); + if (!programId.getType().equals(ProgramType.WORKFLOW)) { + return null; + } + return appMetadataStore.getWorkflowToken(new WorkflowId(programId.getParent(), programId.getProgram()), + programRunId.getRun()); + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/OrTrigger.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/OrTrigger.java index 556194a3fde..b8eeb2655bc 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/OrTrigger.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/OrTrigger.java @@ -18,7 +18,6 @@ import io.cdap.cdap.api.schedule.TriggerInfo; import io.cdap.cdap.internal.app.runtime.schedule.ProgramSchedule; -import io.cdap.cdap.proto.Notification; import io.cdap.cdap.proto.id.ProgramId; import java.util.ArrayList; import java.util.Arrays; @@ -40,9 +39,9 @@ public OrTrigger(List triggers) { } @Override - public boolean isSatisfied(ProgramSchedule schedule, List notifications) { + public boolean isSatisfied(ProgramSchedule schedule, NotificationContext notificationContext) { for (SatisfiableTrigger trigger : getTriggers()) { - if (trigger.isSatisfied(schedule, notifications)) { + if (trigger.isSatisfied(schedule, notificationContext)) { return true; } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/PartitionTrigger.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/PartitionTrigger.java index 6f8baa4cacc..6ec6ce68487 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/PartitionTrigger.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/PartitionTrigger.java @@ -39,8 +39,8 @@ public PartitionTrigger(DatasetId dataset, int numPartitions) { } @Override - public boolean isSatisfied(ProgramSchedule schedule, List notifications) { - return getPartitionsCount(notifications) >= numPartitions; + public boolean isSatisfied(ProgramSchedule schedule, NotificationContext notificationContext) { + return getPartitionsCount(notificationContext.getNotifications()) >= numPartitions; } private int getPartitionsCount(List notifications) { diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/ProgramStatusTrigger.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/ProgramStatusTrigger.java index 6136e5e889e..7fad5997d6f 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/ProgramStatusTrigger.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/ProgramStatusTrigger.java @@ -24,6 +24,7 @@ import io.cdap.cdap.api.ProgramStatus; import io.cdap.cdap.api.app.ProgramType; import io.cdap.cdap.api.schedule.TriggerInfo; +import io.cdap.cdap.api.workflow.WorkflowToken; import io.cdap.cdap.common.app.RunIds; import io.cdap.cdap.internal.app.runtime.ProgramOptionConstants; import io.cdap.cdap.internal.app.runtime.schedule.ProgramSchedule; @@ -33,12 +34,15 @@ import io.cdap.cdap.proto.ProtoTrigger; import io.cdap.cdap.proto.id.ProgramId; import io.cdap.cdap.proto.id.ProgramRunId; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A Trigger that schedules a ProgramSchedule, when a certain status of a program has been @@ -46,8 +50,12 @@ */ public class ProgramStatusTrigger extends ProtoTrigger.ProgramStatusTrigger implements SatisfiableTrigger { + private static final String RESOLVED_PLUGIN_PROPERTIES_MAP = "resolved.plugin.properties.map"; private static final Gson GSON = new Gson(); + private static final Logger LOG = + LoggerFactory.getLogger( + io.cdap.cdap.internal.app.runtime.schedule.trigger.ProgramStatusTrigger.class); public ProgramStatusTrigger(ProgramId programId, Set programStatuses) { super(programId, programStatuses); @@ -59,13 +67,26 @@ public ProgramStatusTrigger(ProgramId programId, ProgramStatus... programStatuse } @Override - public boolean isSatisfied(ProgramSchedule schedule, List notifications) { - return getTriggerSatisfiedResult(notifications, false, new Function() { - @Override - public Boolean apply(ProgramRunInfo input) { - return true; - } - }); + public boolean isSatisfied(ProgramSchedule schedule, NotificationContext notificationContext) { + return getTriggerSatisfiedResult(notificationContext.getNotifications(), false, + new Function() { + @Override + public Boolean apply(ProgramRunInfo runInfo) { + try { + WorkflowToken workflowToken = notificationContext.getWorkflowToken( + runInfo.getProgramRunId()); + if (workflowToken != null && workflowToken.get(RESOLVED_PLUGIN_PROPERTIES_MAP) != null) { + // Return true only if workflow token has been recorded and resolved properties have + // been added. + return true; + } + + } catch (IOException e) { + LOG.error("Reading workflow token failed for runInfo {} with error:",runInfo,e); + } + return false; + } + }); } @Override diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/SatisfiableTrigger.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/SatisfiableTrigger.java index 400c2eafeef..e0327d9c178 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/SatisfiableTrigger.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/SatisfiableTrigger.java @@ -35,10 +35,11 @@ public interface SatisfiableTrigger extends Trigger { * it will remain satisfied no matter what new notifications it receives. * * @param schedule the schedule that this trigger belongs to - * @param notifications the notifications used to check whether this trigger is satisfied + * @param notificationContext that provides necessary information related to notifications + * received. * @return {@code true} if this trigger is satisfied, {@code false} otherwise */ - boolean isSatisfied(ProgramSchedule schedule, List notifications); + boolean isSatisfied(ProgramSchedule schedule, NotificationContext notificationContext); /** * Get all trigger keys which will be used to index the schedule containing this trigger, so that diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/TimeTrigger.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/TimeTrigger.java index 625a609fafb..64f7be18d48 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/TimeTrigger.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/TimeTrigger.java @@ -55,8 +55,8 @@ public void validate() { } @Override - public boolean isSatisfied(ProgramSchedule schedule, List notifications) { - for (Notification notification : notifications) { + public boolean isSatisfied(ProgramSchedule schedule, NotificationContext notificationContext) { + for (Notification notification : notificationContext.getNotifications()) { if (isSatisfied(schedule, notification)) { return true; } diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/ProgramStatusTriggerTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/ProgramStatusTriggerTest.java new file mode 100644 index 00000000000..d3bb4cb60ed --- /dev/null +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/runtime/schedule/trigger/ProgramStatusTriggerTest.java @@ -0,0 +1,125 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.runtime.schedule.trigger; + + +import static org.mockito.Mockito.mock; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.gson.Gson; +import io.cdap.cdap.api.ProgramStatus; +import io.cdap.cdap.api.app.ProgramType; +import io.cdap.cdap.internal.app.runtime.ProgramOptionConstants; +import io.cdap.cdap.internal.app.runtime.workflow.BasicWorkflowToken; +import io.cdap.cdap.internal.app.store.AppMetadataStore; +import io.cdap.cdap.proto.Notification; +import io.cdap.cdap.proto.id.ProgramRunId; +import java.util.List; +import java.util.Map; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ProgramStatusTriggerTest { + + private static final Gson GSON = new Gson(); + private ProgramStatusTrigger trigger; + private ProgramRunId programRunId; + @Mock + private AppMetadataStore mockMetadataStore; + + @Before + public void setUp() throws Exception { + trigger = new ProgramStatusTriggerBuilder(ProgramType.WORKFLOW.name(), "program", + ProgramStatus.COMPLETED, ProgramStatus.FAILED).build("default", "application", "123"); + programRunId = new ProgramRunId("default", "application", + io.cdap.cdap.proto.ProgramType.WORKFLOW, "program", "testRun"); + mockMetadataStore = mock(AppMetadataStore.class); + } + + @Test + public void testIsSatisfiedTrue() throws Exception { + Map properties = ImmutableMap.of(ProgramOptionConstants.PROGRAM_RUN_ID, + GSON.toJson(programRunId), + ProgramOptionConstants.PROGRAM_STATUS, ProgramStatus.COMPLETED.name()); + Notification notification = new Notification(Notification.Type.PROGRAM_STATUS, properties); + List notificationList = ImmutableList.of(notification); + BasicWorkflowToken validWorkflowToken = new BasicWorkflowToken(1); + validWorkflowToken.setCurrentNode("node"); + validWorkflowToken.put("resolved.plugin.properties.map", ""); + + Mockito.when(mockMetadataStore.getWorkflowToken(Mockito.any(), Mockito.any())) + .thenReturn(validWorkflowToken); + boolean result = trigger.isSatisfied(null, + new NotificationContext(notificationList, mockMetadataStore)); + + Assert.assertTrue(result); + } + + @Test + public void testIsSatisfiedFalseNullWorkflowToken() throws Exception { + Map properties = ImmutableMap.of(ProgramOptionConstants.PROGRAM_RUN_ID, + GSON.toJson(programRunId), + ProgramOptionConstants.PROGRAM_STATUS, ProgramStatus.COMPLETED.name()); + Notification notification = new Notification(Notification.Type.PROGRAM_STATUS, properties); + List notificationList = ImmutableList.of(notification); + + Mockito.when(mockMetadataStore.getWorkflowToken(Mockito.any(), Mockito.any())) + .thenReturn(null); + boolean result = trigger.isSatisfied(null, + new NotificationContext(notificationList, mockMetadataStore)); + + Assert.assertFalse(result); + } + + @Test + public void testIsSatisfiedFalseEmptyWorkflowToken() throws Exception { + Map properties = ImmutableMap.of(ProgramOptionConstants.PROGRAM_RUN_ID, + GSON.toJson(programRunId), + ProgramOptionConstants.PROGRAM_STATUS, ProgramStatus.COMPLETED.name()); + Notification notification = new Notification(Notification.Type.PROGRAM_STATUS, properties); + List notificationList = ImmutableList.of(notification); + BasicWorkflowToken emptyWorkflowToken = new BasicWorkflowToken(0); + + Mockito.when(mockMetadataStore.getWorkflowToken(Mockito.any(), Mockito.any())) + .thenReturn(emptyWorkflowToken); + boolean result = trigger.isSatisfied(null, + new NotificationContext(notificationList, mockMetadataStore)); + + Assert.assertFalse(result); + } + + @Test + public void testIsSatisfiedFalseNonMatchingStatus() throws Exception { + Map properties = ImmutableMap.of(ProgramOptionConstants.PROGRAM_RUN_ID, + GSON.toJson(programRunId), + ProgramOptionConstants.PROGRAM_STATUS, ProgramStatus.KILLED.name()); + Notification notification = new Notification(Notification.Type.PROGRAM_STATUS, properties); + List notificationList = ImmutableList.of(notification); + + boolean result = trigger.isSatisfied(null, + new NotificationContext(notificationList, mockMetadataStore)); + + Assert.assertFalse(result); + } +} \ No newline at end of file