diff --git a/src/main/java/org/dependencytrack/event/kafka/processor/ProcessedVulnerabilityScanResultProcessor.java b/src/main/java/org/dependencytrack/event/kafka/processor/ProcessedVulnerabilityScanResultProcessor.java index 2c98fae37..e046ed198 100644 --- a/src/main/java/org/dependencytrack/event/kafka/processor/ProcessedVulnerabilityScanResultProcessor.java +++ b/src/main/java/org/dependencytrack/event/kafka/processor/ProcessedVulnerabilityScanResultProcessor.java @@ -246,43 +246,35 @@ private static List updateWorkflowStates(final Handle jdbiHandle, } private static List> createBomProcessedNotifications(final Handle jdbiHandle, final List completedVulnScans) { - final int numScans = completedVulnScans.size(); - final var tokens = new ArrayList(numScans); - final var statuses = new ArrayList(numScans); - final var failureReasons = new ArrayList(numScans); - - for (final VulnerabilityScan completedVulnScan : completedVulnScans) { - if (completedVulnScan.getTargetType() != VulnerabilityScan.TargetType.PROJECT) { - // BOM_PROCESSED notifications only make sense when the scan target is a project. - continue; - } - - tokens.add(completedVulnScan.getToken()); - statuses.add(WorkflowStatus.COMPLETED); - failureReasons.add(null); - } - if (tokens.isEmpty()) { - LOGGER.debug("None of the possible %d completed vulnerability scans target a project".formatted(numScans)); + // Collect the workflow tokens for all completed scans, as long as they target a project. + // Dispatching BOM_PROCESSED notifications does not make sense when individual components, + // or even the entire portfolio was scanned. + final Set workflowTokens = completedVulnScans.stream() + .filter(vulnScan -> vulnScan.getTargetType() == VulnerabilityScan.TargetType.PROJECT) + .map(VulnerabilityScan::getToken) + .collect(Collectors.toSet()); + if (workflowTokens.isEmpty()) { + LOGGER.debug("None of the possible %d completed vulnerability scans target a project".formatted(completedVulnScans.size())); return Collections.emptyList(); } + // Ensure that all eligible workflows have a BOM_PROCESSED step with status COMPLETED. + // For example, a scan triggered via "Reanalyze" button in the UI won't have such as step, + // hence it doesn't make sense to dispatch a BOM_PROCESSED notification for it. final var workflowDao = jdbiHandle.attach(WorkflowDao.class); - final List updatedWorkflowStates = - workflowDao.updateAllStatesIfPending(WorkflowStep.BOM_PROCESSING, tokens, statuses, failureReasons); - if (updatedWorkflowStates.isEmpty()) { - LOGGER.debug("None of the possible %d workflow states for %s were transitioned to %s status" - .formatted(tokens.size(), WorkflowStep.BOM_PROCESSING, WorkflowStatus.COMPLETED)); + final Set workflowTokensWithBomProcessed = + workflowDao.getTokensByStepAndStateAndTokenAnyOf(WorkflowStep.BOM_PROCESSING, WorkflowStatus.COMPLETED, workflowTokens); + if (workflowTokensWithBomProcessed.isEmpty()) { + LOGGER.debug("None of the possible %d workflows have %s steps with status %s" + .formatted(workflowTokens.size(), WorkflowStep.BOM_PROCESSING, WorkflowStatus.COMPLETED)); return Collections.emptyList(); } final var notificationSubjectDao = jdbiHandle.attach(NotificationSubjectDao.class); - - final Set updatedWorkflowStateTokens = updatedWorkflowStates.stream() - .map(WorkflowState::getToken).map(UUID::toString).collect(Collectors.toSet()); final List notificationSubjects = - notificationSubjectDao.getForDelayedBomProcessed(updatedWorkflowStateTokens); + notificationSubjectDao.getForDelayedBomProcessed(workflowTokensWithBomProcessed); - final var notifications = new ArrayList>(updatedWorkflowStates.size()); + final var notifications = new ArrayList>(workflowTokensWithBomProcessed.size()); notificationSubjects.stream() .map(subject -> Notification.newBuilder() .setScope(SCOPE_PORTFOLIO) diff --git a/src/main/java/org/dependencytrack/persistence/jdbi/WorkflowDao.java b/src/main/java/org/dependencytrack/persistence/jdbi/WorkflowDao.java index b0ebe76a6..e79b4265d 100644 --- a/src/main/java/org/dependencytrack/persistence/jdbi/WorkflowDao.java +++ b/src/main/java/org/dependencytrack/persistence/jdbi/WorkflowDao.java @@ -25,8 +25,13 @@ import org.jdbi.v3.sqlobject.customizer.Bind; import org.jdbi.v3.sqlobject.statement.GetGeneratedKeys; import org.jdbi.v3.sqlobject.statement.SqlBatch; +import org.jdbi.v3.sqlobject.statement.SqlQuery; +import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Optional; +import java.util.Set; public interface WorkflowDao { @@ -46,22 +51,28 @@ List updateAllStates(@Bind WorkflowStep step, @Bind("status") List statuses, @Bind("failureReason") List failureReasons); - @SqlBatch(""" - UPDATE "WORKFLOW_STATE" - SET "STATUS" = :status - , "FAILURE_REASON" = :failureReason - , "UPDATED_AT" = NOW() - WHERE "TOKEN" = :token - AND "STEP" = :step - AND "STATUS" = 'PENDING' - RETURNING * + default Optional updateState(final WorkflowStep step, + final String token, + final WorkflowStatus status, + final String failureReason) { + final List updatedStates = updateAllStates(step, List.of(token), List.of(status), Collections.singletonList(failureReason)); + if (updatedStates.isEmpty()) { + return Optional.empty(); + } + + return Optional.of(updatedStates.getFirst()); + } + + @SqlQuery(""" + SELECT "TOKEN" + FROM "WORKFLOW_STATE" + WHERE "STEP" = :step + AND "STATUS" = :status + AND "TOKEN" = ANY(:tokens) """) - @GetGeneratedKeys("*") - @RegisterBeanMapper(WorkflowState.class) - List updateAllStatesIfPending(@Bind WorkflowStep step, - @Bind("token") List tokens, - @Bind("status") List statuses, - @Bind("failureReason") List failureReasons); + Set getTokensByStepAndStateAndTokenAnyOf(@Bind WorkflowStep step, + @Bind WorkflowStatus status, + @Bind Collection tokens); @SqlBatch(""" WITH RECURSIVE diff --git a/src/test/java/org/dependencytrack/event/kafka/processor/ProcessedVulnerabilityScanResultProcessorTest.java b/src/test/java/org/dependencytrack/event/kafka/processor/ProcessedVulnerabilityScanResultProcessorTest.java index 5ff6f6a87..b9380f1af 100644 --- a/src/test/java/org/dependencytrack/event/kafka/processor/ProcessedVulnerabilityScanResultProcessorTest.java +++ b/src/test/java/org/dependencytrack/event/kafka/processor/ProcessedVulnerabilityScanResultProcessorTest.java @@ -30,6 +30,7 @@ import org.dependencytrack.model.VulnerabilityScan; import org.dependencytrack.model.WorkflowStatus; import org.dependencytrack.model.WorkflowStep; +import org.dependencytrack.persistence.jdbi.WorkflowDao; import org.dependencytrack.proto.notification.v1.BomConsumedOrProcessedSubject; import org.dependencytrack.proto.notification.v1.Notification; import org.dependencytrack.proto.notification.v1.ProjectVulnAnalysisCompleteSubject; @@ -46,6 +47,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import static org.assertj.core.api.Assertions.assertThat; +import static org.dependencytrack.persistence.jdbi.JdbiFactory.jdbi; import static org.dependencytrack.proto.notification.v1.Group.GROUP_BOM_PROCESSED; import static org.dependencytrack.proto.notification.v1.Group.GROUP_PROJECT_VULN_ANALYSIS_COMPLETE; import static org.dependencytrack.proto.notification.v1.Level.LEVEL_INFORMATIONAL; @@ -259,6 +261,11 @@ public void testProcessWithDelayedBomProcessedNotification() throws Exception { final UUID workflowToken = UUID.randomUUID(); qm.createWorkflowSteps(workflowToken); + // Transition BOM_PROCESSING step to COMPLETED status, + // without this we won't send a BOM_PROCESSED notification. + jdbi(qm).useExtension(WorkflowDao.class, dao -> dao.updateState(WorkflowStep.BOM_PROCESSING, + workflowToken.toString(), WorkflowStatus.COMPLETED, /* failureReason */ null)); + // Create a VulnerabilityScan, and configure it such that no more than 10% // of scanners are allowed to fail in order for the scan to be considered successful. final var vulnScan = new VulnerabilityScan(); @@ -309,6 +316,11 @@ public void testProcessWithDelayedBomProcessedNotificationWhenVulnerabilityScanF final UUID workflowToken = UUID.randomUUID(); qm.createWorkflowSteps(workflowToken); + // Transition BOM_PROCESSING step to COMPLETED status, + // without this we won't send a BOM_PROCESSED notification. + jdbi(qm).useExtension(WorkflowDao.class, dao -> dao.updateState(WorkflowStep.BOM_PROCESSING, + workflowToken.toString(), WorkflowStatus.COMPLETED, /* failureReason */ null)); + final var vulnScan = new VulnerabilityScan(); vulnScan.setToken(workflowToken.toString()); vulnScan.setTargetType(VulnerabilityScan.TargetType.PROJECT); @@ -351,6 +363,41 @@ record -> { ); } + @Test + public void testProcessWithDelayedBomProcessedNotificationWithoutCompletedBomProcessingWorkflowStep() throws Exception { + final var project = new Project(); + project.setName("acme-app"); + qm.persist(project); + + final UUID workflowToken = UUID.randomUUID(); + qm.createWorkflowSteps(workflowToken); + + // NB: BOM_PROCESSING workflow step remains in status PENDING + + // Create a VulnerabilityScan, and configure it such that no more than 10% + // of scanners are allowed to fail in order for the scan to be considered successful. + final var vulnScan = new VulnerabilityScan(); + vulnScan.setToken(workflowToken.toString()); + vulnScan.setTargetType(VulnerabilityScan.TargetType.PROJECT); + vulnScan.setTargetIdentifier(project.getUuid()); + vulnScan.setStatus(VulnerabilityScan.Status.IN_PROGRESS); + vulnScan.setExpectedResults(1); + vulnScan.setStartedAt(new Date()); + vulnScan.setUpdatedAt(vulnScan.getStartedAt()); + qm.persist(vulnScan); + + // Create a ScanResult without any ScannerResults attached to it. + // This might happen when no scanner is capable of scanning a component, + // or when all scanners are disabled. + final var scanResult = ScanResult.newBuilder().build(); + + final var processor = new ProcessedVulnerabilityScanResultProcessor(/* shouldDispatchBomProcessedNotification */ true); + processor.process(List.of(aConsumerRecord(vulnScan.getToken(), scanResult).build())); + + assertThat(kafkaMockProducer.history()).satisfiesExactly(record -> + assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name())); + } + private static final ConcurrentLinkedQueue EVENTS = new ConcurrentLinkedQueue<>(); public static class EventSubscriber implements Subscriber {