Skip to content

Commit

Permalink
Fix missing dispatch of BOM_PROCESSED notification
Browse files Browse the repository at this point in the history
The status of the workflow step will always be `COMPLETED`, no matter if delayed dispatch is enabled or not. This is existing behavior and the regression was catched via `BomProcessedNotificationDelayedE2ET`.

Signed-off-by: nscuro <[email protected]>
  • Loading branch information
nscuro committed Mar 28, 2024
1 parent 5fe0ab3 commit 370dd74
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,43 +246,35 @@ private static List<WorkflowState> updateWorkflowStates(final Handle jdbiHandle,
}

private static List<KafkaEvent<?, ?>> createBomProcessedNotifications(final Handle jdbiHandle, final List<VulnerabilityScan> completedVulnScans) {
final int numScans = completedVulnScans.size();
final var tokens = new ArrayList<String>(numScans);
final var statuses = new ArrayList<WorkflowStatus>(numScans);
final var failureReasons = new ArrayList<String>(numScans);

for (final VulnerabilityScan completedVulnScan : completedVulnScans) {
if (completedVulnScan.getTargetType() != VulnerabilityScan.TargetType.PROJECT) {
// BOM_PROCESSED notifications only make sense when the scan target is a project.
continue;
}

tokens.add(completedVulnScan.getToken());
statuses.add(WorkflowStatus.COMPLETED);
failureReasons.add(null);
}
if (tokens.isEmpty()) {
LOGGER.debug("None of the possible %d completed vulnerability scans target a project".formatted(numScans));
// Collect the workflow tokens for all completed scans, as long as they target a project.
// Dispatching BOM_PROCESSED notifications does not make sense when individual components,
// or even the entire portfolio was scanned.
final Set<String> workflowTokens = completedVulnScans.stream()
.filter(vulnScan -> vulnScan.getTargetType() == VulnerabilityScan.TargetType.PROJECT)
.map(VulnerabilityScan::getToken)
.collect(Collectors.toSet());
if (workflowTokens.isEmpty()) {
LOGGER.debug("None of the possible %d completed vulnerability scans target a project".formatted(completedVulnScans.size()));
return Collections.emptyList();
}

// Ensure that all eligible workflows have a BOM_PROCESSED step with status COMPLETED.
// For example, a scan triggered via "Reanalyze" button in the UI won't have such as step,
// hence it doesn't make sense to dispatch a BOM_PROCESSED notification for it.
final var workflowDao = jdbiHandle.attach(WorkflowDao.class);
final List<WorkflowState> updatedWorkflowStates =
workflowDao.updateAllStatesIfPending(WorkflowStep.BOM_PROCESSING, tokens, statuses, failureReasons);
if (updatedWorkflowStates.isEmpty()) {
LOGGER.debug("None of the possible %d workflow states for %s were transitioned to %s status"
.formatted(tokens.size(), WorkflowStep.BOM_PROCESSING, WorkflowStatus.COMPLETED));
final Set<String> workflowTokensWithBomProcessed =
workflowDao.getTokensByStepAndStateAndTokenAnyOf(WorkflowStep.BOM_PROCESSING, WorkflowStatus.COMPLETED, workflowTokens);
if (workflowTokensWithBomProcessed.isEmpty()) {
LOGGER.debug("None of the possible %d workflows have %s steps with status %s"
.formatted(workflowTokens.size(), WorkflowStep.BOM_PROCESSING, WorkflowStatus.COMPLETED));
return Collections.emptyList();
}

final var notificationSubjectDao = jdbiHandle.attach(NotificationSubjectDao.class);

final Set<String> updatedWorkflowStateTokens = updatedWorkflowStates.stream()
.map(WorkflowState::getToken).map(UUID::toString).collect(Collectors.toSet());
final List<BomConsumedOrProcessedSubject> notificationSubjects =
notificationSubjectDao.getForDelayedBomProcessed(updatedWorkflowStateTokens);
notificationSubjectDao.getForDelayedBomProcessed(workflowTokensWithBomProcessed);

final var notifications = new ArrayList<KafkaEvent<?, ?>>(updatedWorkflowStates.size());
final var notifications = new ArrayList<KafkaEvent<?, ?>>(workflowTokensWithBomProcessed.size());
notificationSubjects.stream()
.map(subject -> Notification.newBuilder()
.setScope(SCOPE_PORTFOLIO)
Expand Down
41 changes: 26 additions & 15 deletions src/main/java/org/dependencytrack/persistence/jdbi/WorkflowDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.statement.GetGeneratedKeys;
import org.jdbi.v3.sqlobject.statement.SqlBatch;
import org.jdbi.v3.sqlobject.statement.SqlQuery;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;

public interface WorkflowDao {

Expand All @@ -46,22 +51,28 @@ List<WorkflowState> updateAllStates(@Bind WorkflowStep step,
@Bind("status") List<WorkflowStatus> statuses,
@Bind("failureReason") List<String> failureReasons);

@SqlBatch("""
UPDATE "WORKFLOW_STATE"
SET "STATUS" = :status
, "FAILURE_REASON" = :failureReason
, "UPDATED_AT" = NOW()
WHERE "TOKEN" = :token
AND "STEP" = :step
AND "STATUS" = 'PENDING'
RETURNING *
default Optional<WorkflowState> updateState(final WorkflowStep step,
final String token,
final WorkflowStatus status,
final String failureReason) {
final List<WorkflowState> updatedStates = updateAllStates(step, List.of(token), List.of(status), Collections.singletonList(failureReason));
if (updatedStates.isEmpty()) {
return Optional.empty();
}

return Optional.of(updatedStates.getFirst());
}

@SqlQuery("""
SELECT "TOKEN"
FROM "WORKFLOW_STATE"
WHERE "STEP" = :step
AND "STATUS" = :status
AND "TOKEN" = ANY(:tokens)
""")
@GetGeneratedKeys("*")
@RegisterBeanMapper(WorkflowState.class)
List<WorkflowState> updateAllStatesIfPending(@Bind WorkflowStep step,
@Bind("token") List<String> tokens,
@Bind("status") List<WorkflowStatus> statuses,
@Bind("failureReason") List<String> failureReasons);
Set<String> getTokensByStepAndStateAndTokenAnyOf(@Bind WorkflowStep step,
@Bind WorkflowStatus status,
@Bind Collection<String> tokens);

@SqlBatch("""
WITH RECURSIVE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.dependencytrack.model.VulnerabilityScan;
import org.dependencytrack.model.WorkflowStatus;
import org.dependencytrack.model.WorkflowStep;
import org.dependencytrack.persistence.jdbi.WorkflowDao;
import org.dependencytrack.proto.notification.v1.BomConsumedOrProcessedSubject;
import org.dependencytrack.proto.notification.v1.Notification;
import org.dependencytrack.proto.notification.v1.ProjectVulnAnalysisCompleteSubject;
Expand All @@ -46,6 +47,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;

import static org.assertj.core.api.Assertions.assertThat;
import static org.dependencytrack.persistence.jdbi.JdbiFactory.jdbi;
import static org.dependencytrack.proto.notification.v1.Group.GROUP_BOM_PROCESSED;
import static org.dependencytrack.proto.notification.v1.Group.GROUP_PROJECT_VULN_ANALYSIS_COMPLETE;
import static org.dependencytrack.proto.notification.v1.Level.LEVEL_INFORMATIONAL;
Expand Down Expand Up @@ -259,6 +261,11 @@ public void testProcessWithDelayedBomProcessedNotification() throws Exception {
final UUID workflowToken = UUID.randomUUID();
qm.createWorkflowSteps(workflowToken);

// Transition BOM_PROCESSING step to COMPLETED status,
// without this we won't send a BOM_PROCESSED notification.
jdbi(qm).useExtension(WorkflowDao.class, dao -> dao.updateState(WorkflowStep.BOM_PROCESSING,
workflowToken.toString(), WorkflowStatus.COMPLETED, /* failureReason */ null));

// Create a VulnerabilityScan, and configure it such that no more than 10%
// of scanners are allowed to fail in order for the scan to be considered successful.
final var vulnScan = new VulnerabilityScan();
Expand Down Expand Up @@ -309,6 +316,11 @@ public void testProcessWithDelayedBomProcessedNotificationWhenVulnerabilityScanF
final UUID workflowToken = UUID.randomUUID();
qm.createWorkflowSteps(workflowToken);

// Transition BOM_PROCESSING step to COMPLETED status,
// without this we won't send a BOM_PROCESSED notification.
jdbi(qm).useExtension(WorkflowDao.class, dao -> dao.updateState(WorkflowStep.BOM_PROCESSING,
workflowToken.toString(), WorkflowStatus.COMPLETED, /* failureReason */ null));

final var vulnScan = new VulnerabilityScan();
vulnScan.setToken(workflowToken.toString());
vulnScan.setTargetType(VulnerabilityScan.TargetType.PROJECT);
Expand Down Expand Up @@ -351,6 +363,41 @@ record -> {
);
}

@Test
public void testProcessWithDelayedBomProcessedNotificationWithoutCompletedBomProcessingWorkflowStep() throws Exception {
final var project = new Project();
project.setName("acme-app");
qm.persist(project);

final UUID workflowToken = UUID.randomUUID();
qm.createWorkflowSteps(workflowToken);

// NB: BOM_PROCESSING workflow step remains in status PENDING

// Create a VulnerabilityScan, and configure it such that no more than 10%
// of scanners are allowed to fail in order for the scan to be considered successful.
final var vulnScan = new VulnerabilityScan();
vulnScan.setToken(workflowToken.toString());
vulnScan.setTargetType(VulnerabilityScan.TargetType.PROJECT);
vulnScan.setTargetIdentifier(project.getUuid());
vulnScan.setStatus(VulnerabilityScan.Status.IN_PROGRESS);
vulnScan.setExpectedResults(1);
vulnScan.setStartedAt(new Date());
vulnScan.setUpdatedAt(vulnScan.getStartedAt());
qm.persist(vulnScan);

// Create a ScanResult without any ScannerResults attached to it.
// This might happen when no scanner is capable of scanning a component,
// or when all scanners are disabled.
final var scanResult = ScanResult.newBuilder().build();

final var processor = new ProcessedVulnerabilityScanResultProcessor(/* shouldDispatchBomProcessedNotification */ true);
processor.process(List.of(aConsumerRecord(vulnScan.getToken(), scanResult).build()));

assertThat(kafkaMockProducer.history()).satisfiesExactly(record ->
assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name()));
}

private static final ConcurrentLinkedQueue<Event> EVENTS = new ConcurrentLinkedQueue<>();

Check notice on line 401 in src/test/java/org/dependencytrack/event/kafka/processor/ProcessedVulnerabilityScanResultProcessorTest.java

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/test/java/org/dependencytrack/event/kafka/processor/ProcessedVulnerabilityScanResultProcessorTest.java#L401

Fields should be declared at the top of the class, before any method declarations, constructors, initializers or inner classes.

public static class EventSubscriber implements Subscriber {
Expand Down

0 comments on commit 370dd74

Please sign in to comment.