Skip to content

Commit

Permalink
Handle delayed BOM_PROCESSED notifications
Browse files Browse the repository at this point in the history
Signed-off-by: nscuro <[email protected]>
  • Loading branch information
nscuro committed Mar 28, 2024
1 parent f3ee3a4 commit 0237fa7
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
*/
package org.dependencytrack.event.kafka.processor;

import alpine.Config;
import alpine.common.logging.Logger;
import alpine.event.framework.ChainableEvent;
import alpine.event.framework.Event;
import com.google.protobuf.Any;
import com.google.protobuf.util.Timestamps;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.dependencytrack.event.ComponentMetricsUpdateEvent;
import org.dependencytrack.event.ProjectMetricsUpdateEvent;
Expand All @@ -40,21 +42,27 @@
import org.dependencytrack.persistence.jdbi.NotificationSubjectDao;
import org.dependencytrack.persistence.jdbi.VulnerabilityScanDao;
import org.dependencytrack.persistence.jdbi.WorkflowDao;
import org.dependencytrack.proto.notification.v1.BomConsumedOrProcessedSubject;
import org.dependencytrack.proto.notification.v1.Notification;
import org.dependencytrack.proto.notification.v1.ProjectVulnAnalysisCompleteSubject;
import org.dependencytrack.proto.vulnanalysis.v1.ScanResult;
import org.dependencytrack.proto.vulnanalysis.v1.ScanStatus;
import org.jdbi.v3.core.Handle;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import static java.lang.Math.toIntExact;
import static org.dependencytrack.common.ConfigKey.TMP_DELAY_BOM_PROCESSED_NOTIFICATION;
import static org.dependencytrack.persistence.jdbi.JdbiFactory.jdbi;
import static org.dependencytrack.proto.notification.v1.Group.GROUP_BOM_PROCESSED;
import static org.dependencytrack.proto.notification.v1.Group.GROUP_PROJECT_VULN_ANALYSIS_COMPLETE;
import static org.dependencytrack.proto.notification.v1.Level.LEVEL_INFORMATIONAL;
import static org.dependencytrack.proto.notification.v1.Scope.SCOPE_PORTFOLIO;
Expand All @@ -71,6 +79,15 @@ public class ProcessedVulnerabilityScanResultProcessor implements BatchProcessor
private static final Logger LOGGER = Logger.getLogger(ProcessedVulnerabilityScanResultProcessor.class);

private final KafkaEventDispatcher eventDispatcher = new KafkaEventDispatcher();
private final boolean shouldDispatchBomProcessedNotification;

public ProcessedVulnerabilityScanResultProcessor() {
this(Config.getInstance().getPropertyAsBoolean(TMP_DELAY_BOM_PROCESSED_NOTIFICATION));
}

ProcessedVulnerabilityScanResultProcessor(final boolean shouldDispatchBomProcessedNotification) {
this.shouldDispatchBomProcessedNotification = shouldDispatchBomProcessedNotification;
}

@Override
public void process(final List<ConsumerRecord<String, ScanResult>> records) throws ProcessingException {
Expand All @@ -82,6 +99,10 @@ public void process(final List<ConsumerRecord<String, ScanResult>> records) thro
jdbi(qm).useTransaction(jdbiHandle -> {
completedVulnScans.addAll(processScanResults(jdbiHandle, records));
notifications.addAll(createVulnAnalysisCompleteNotifications(jdbiHandle, completedVulnScans));

if (shouldDispatchBomProcessedNotification) {
notifications.addAll(createBomProcessedNotifications(jdbiHandle, completedVulnScans));
}
});
}

Expand Down Expand Up @@ -174,7 +195,8 @@ private static List<WorkflowState> updateWorkflowStates(final Handle jdbiHandle,
statuses.add(switch (completedVulnScan.getStatus()) {
case COMPLETED -> WorkflowStatus.COMPLETED;
case FAILED -> WorkflowStatus.FAILED;
default -> throw new IllegalStateException("");
default -> throw new IllegalStateException("""
Unexpected vulnerability scan status %s""".formatted(completedVulnScan.getStatus()));
});
failureReasons.add(completedVulnScan.getFailureReason());
}
Expand Down Expand Up @@ -212,6 +234,7 @@ private static List<WorkflowState> updateWorkflowStates(final Handle jdbiHandle,
.setScope(SCOPE_PORTFOLIO)
.setGroup(GROUP_PROJECT_VULN_ANALYSIS_COMPLETE)
.setLevel(LEVEL_INFORMATIONAL)
.setTimestamp(Timestamps.now())
.setTitle(NotificationConstants.Title.PROJECT_VULN_ANALYSIS_COMPLETE)
.setSubject(Any.pack(optionalSubject.get()))
.build();
Expand All @@ -222,6 +245,60 @@ private static List<WorkflowState> updateWorkflowStates(final Handle jdbiHandle,
return notifications;
}

private static List<KafkaEvent<?, ?>> createBomProcessedNotifications(final Handle jdbiHandle, final List<VulnerabilityScan> completedVulnScans) {
final int numScans = completedVulnScans.size();
final var tokens = new ArrayList<String>(numScans);
final var statuses = new ArrayList<WorkflowStatus>(numScans);
final var failureReasons = new ArrayList<String>(numScans);

for (final VulnerabilityScan completedVulnScan : completedVulnScans) {
if (completedVulnScan.getTargetType() != VulnerabilityScan.TargetType.PROJECT) {
// BOM_PROCESSED notifications only make sense when the scan target is a project.
continue;
}

tokens.add(completedVulnScan.getToken());
statuses.add(WorkflowStatus.COMPLETED);
failureReasons.add(null);
}
if (tokens.isEmpty()) {
LOGGER.debug("None of the possible %d completed vulnerability scans target a project".formatted(numScans));
return Collections.emptyList();
}

final var workflowDao = jdbiHandle.attach(WorkflowDao.class);
final List<WorkflowState> updatedWorkflowStates =
workflowDao.updateAllStatesIfPending(WorkflowStep.BOM_PROCESSING, tokens, statuses, failureReasons);
if (updatedWorkflowStates.isEmpty()) {
LOGGER.debug("None of the possible %d workflow states for %s were transitioned to %s status"
.formatted(tokens.size(), WorkflowStep.BOM_PROCESSING, WorkflowStatus.COMPLETED));
return Collections.emptyList();
}

final var notificationSubjectDao = jdbiHandle.attach(NotificationSubjectDao.class);

final Set<String> updatedWorkflowStateTokens = updatedWorkflowStates.stream()
.map(WorkflowState::getToken).map(UUID::toString).collect(Collectors.toSet());
final List<BomConsumedOrProcessedSubject> notificationSubjects =
notificationSubjectDao.getForDelayedBomProcessed(updatedWorkflowStateTokens);

final var notifications = new ArrayList<KafkaEvent<?, ?>>(updatedWorkflowStates.size());
notificationSubjects.stream()
.map(subject -> Notification.newBuilder()
.setScope(SCOPE_PORTFOLIO)
.setGroup(GROUP_BOM_PROCESSED)
.setLevel(LEVEL_INFORMATIONAL)
.setTimestamp(Timestamps.now())
.setTitle(NotificationConstants.Title.BOM_PROCESSED)
.setContent("A %s BOM was processed".formatted(subject.getBom().getFormat()))
.setSubject(Any.pack(subject))
.build())
.map(KafkaEventConverter::convert)
.forEach(notifications::add);

return notifications;
}

private static class Aggregate {
private int resultsTotal;
private int scannerResultsTotal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.dependencytrack.model.VulnerabilityScan;
import org.dependencytrack.persistence.jdbi.mapping.NotificationComponentRowMapper;
import org.dependencytrack.persistence.jdbi.mapping.NotificationProjectRowMapper;
import org.dependencytrack.persistence.jdbi.mapping.NotificationSubjectBomConsumedOrProcessedRowMapper;
import org.dependencytrack.persistence.jdbi.mapping.NotificationSubjectNewVulnerabilityRowMapper;
import org.dependencytrack.persistence.jdbi.mapping.NotificationSubjectNewVulnerableDependencyRowReducer;
import org.dependencytrack.persistence.jdbi.mapping.NotificationSubjectProjectAuditChangeRowMapper;
import org.dependencytrack.persistence.jdbi.mapping.NotificationVulnerabilityRowMapper;
import org.dependencytrack.proto.notification.v1.BomConsumedOrProcessedSubject;
import org.dependencytrack.proto.notification.v1.Component;
import org.dependencytrack.proto.notification.v1.ComponentVulnAnalysisCompleteSubject;
import org.dependencytrack.proto.notification.v1.NewVulnerabilitySubject;
Expand Down Expand Up @@ -376,6 +378,33 @@ LEFT JOIN LATERAL (
@RegisterRowMapper(NotificationSubjectProjectAuditChangeRowMapper.class)
Optional<VulnerabilityAnalysisDecisionChangeSubject> getForProjectAuditChange(final UUID componentUuid, final UUID vulnUuid, AnalysisState analysisState, boolean isSuppressed);

@SqlQuery("""
SELECT "P"."UUID" AS "projectUuid"
, "P"."NAME" AS "projectName"
, "P"."VERSION" AS "projectVersion"
, "P"."DESCRIPTION" AS "projectDescription"
, "P"."PURL" AS "projectPurl"
, (SELECT ARRAY_AGG(DISTINCT "T"."NAME")
FROM "TAG" AS "T"
INNER JOIN "PROJECTS_TAGS" AS "PT"
ON "PT"."TAG_ID" = "T"."ID"
WHERE "PT"."PROJECT_ID" = "P"."ID"
) AS "projectTags"
, 'CycloneDX' AS "bomFormat"
, '(Unknown)' AS "bomSpecVersion"
, '(Omitted)' AS "bomContent"
FROM "VULNERABILITYSCAN" AS "VS"
INNER JOIN "PROJECT" AS "P"
ON "P"."UUID" = "VS"."TARGET_IDENTIFIER"
INNER JOIN "WORKFLOW_STATE" AS "WFS"
ON "WFS"."TOKEN" = "VS"."TOKEN"
AND "WFS"."STEP" = 'BOM_PROCESSING'
AND "WFS"."STATUS" = 'COMPLETED'
WHERE "VS"."TOKEN" = ANY(:tokens)
""")
@RegisterRowMapper(NotificationSubjectBomConsumedOrProcessedRowMapper.class)
List<BomConsumedOrProcessedSubject> getForDelayedBomProcessed(Collection<String> workflowTokens);

@SqlQuery("""
SELECT "P"."UUID" AS "projectUuid"
, "P"."NAME" AS "projectName"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@ List<WorkflowState> updateAllStates(@Bind WorkflowStep step,
@Bind("status") List<WorkflowStatus> statuses,
@Bind("failureReason") List<String> failureReasons);

@SqlBatch("""
UPDATE "WORKFLOW_STATE"
SET "STATUS" = :status
, "FAILURE_REASON" = :failureReason
, "UPDATED_AT" = NOW()
WHERE "TOKEN" = :token
AND "STEP" = :step
AND "STATUS" = 'PENDING'
RETURNING *
""")
@GetGeneratedKeys("*")
@RegisterBeanMapper(WorkflowState.class)
List<WorkflowState> updateAllStatesIfPending(@Bind WorkflowStep step,
@Bind("token") List<String> tokens,
@Bind("status") List<WorkflowStatus> statuses,
@Bind("failureReason") List<String> failureReasons);

@SqlBatch("""
WITH RECURSIVE
"CTE_PARENT" ("ID") AS (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* This file is part of Dependency-Track.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
* Copyright (c) OWASP Foundation. All Rights Reserved.
*/
package org.dependencytrack.persistence.jdbi.mapping;

import org.dependencytrack.proto.notification.v1.Bom;
import org.dependencytrack.proto.notification.v1.BomConsumedOrProcessedSubject;
import org.dependencytrack.proto.notification.v1.Project;
import org.jdbi.v3.core.mapper.NoSuchMapperException;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

import java.sql.ResultSet;
import java.sql.SQLException;

import static org.dependencytrack.persistence.jdbi.mapping.RowMapperUtil.maybeSet;

public class NotificationSubjectBomConsumedOrProcessedRowMapper implements RowMapper<BomConsumedOrProcessedSubject> {

@Override
public BomConsumedOrProcessedSubject map(final ResultSet rs, final StatementContext ctx) throws SQLException {
final RowMapper<Project> projectRowMapper = ctx.findRowMapperFor(Project.class)
.orElseThrow(() -> new NoSuchMapperException("No mapper registered for %s".formatted(Project.class)));
final RowMapper<Bom> bomRowMapper = ctx.findRowMapperFor(Bom.class)
.orElseThrow(() -> new NoSuchMapperException("No mapper registered for %s".formatted(Bom.class)));

final BomConsumedOrProcessedSubject.Builder builder = BomConsumedOrProcessedSubject.newBuilder()
.setProject(projectRowMapper.map(rs, ctx))
.setBom(bomRowMapper.map(rs, ctx));
maybeSet(rs, "token", ResultSet::getString, builder::setToken);

return builder.build();
}

}

0 comments on commit 0237fa7

Please sign in to comment.