Skip to content

Commit

Permalink
Merge pull request #631 from DependencyTrack/refactor-kafka-event-dis…
Browse files Browse the repository at this point in the history
…patcher

Refactor `KafkaEventDispatcher` for better support of efficient Kafka producer usage patterns
  • Loading branch information
nscuro authored Mar 26, 2024
2 parents 6bcd82d + 7e8b9d5 commit bf3da71
Show file tree
Hide file tree
Showing 23 changed files with 524 additions and 353 deletions.
7 changes: 2 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@
</ciManagement>

<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>

<!-- Dependency Versions -->
<lib.alpine.version>${project.parent.version}</lib.alpine.version>
<lib.awaitility.version>4.2.0</lib.awaitility.version>
Expand Down Expand Up @@ -568,8 +565,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>17</source>
<target>17</target>
<source>21</source>
<target>21</target>
<compilerArgs>
<arg>-Xlint:all</arg>
<arg>-Xlint:-processing</arg>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,10 @@

import java.util.Map;

record KafkaEvent<K, V>(Topic<K, V> topic, K key, V value, Map<String, String> headers) {
public record KafkaEvent<K, V>(Topic<K, V> topic, K key, V value, Map<String, String> headers) {

public KafkaEvent(final Topic<K, V> topic, final K key, final V value) {
this(topic, key, value, null);
}

}
184 changes: 165 additions & 19 deletions src/main/java/org/dependencytrack/event/kafka/KafkaEventConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,41 @@
*/
package org.dependencytrack.event.kafka;

import alpine.event.framework.Event;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import org.dependencytrack.event.ComponentRepositoryMetaAnalysisEvent;
import org.dependencytrack.event.ComponentVulnerabilityAnalysisEvent;
import org.dependencytrack.event.GitHubAdvisoryMirrorEvent;
import org.dependencytrack.event.NistMirrorEvent;
import org.dependencytrack.event.OsvMirrorEvent;
import org.dependencytrack.event.kafka.KafkaTopics.Topic;
import org.dependencytrack.model.Vulnerability;
import org.dependencytrack.parser.dependencytrack.NotificationModelConverter;
import org.dependencytrack.proto.notification.v1.BomConsumedOrProcessedSubject;
import org.dependencytrack.proto.notification.v1.BomProcessingFailedSubject;
import org.dependencytrack.proto.notification.v1.NewVulnerabilitySubject;
import org.dependencytrack.proto.notification.v1.NewVulnerableDependencySubject;
import org.dependencytrack.proto.notification.v1.Notification;
import org.dependencytrack.proto.notification.v1.PolicyViolationAnalysisDecisionChangeSubject;
import org.dependencytrack.proto.notification.v1.PolicyViolationSubject;
import org.dependencytrack.proto.notification.v1.Project;
import org.dependencytrack.proto.notification.v1.ProjectVulnAnalysisCompleteSubject;
import org.dependencytrack.proto.notification.v1.VexConsumedOrProcessedSubject;
import org.dependencytrack.proto.notification.v1.VulnerabilityAnalysisDecisionChangeSubject;
import org.dependencytrack.proto.repometaanalysis.v1.AnalysisCommand;
import org.dependencytrack.proto.vulnanalysis.v1.ScanCommand;
import org.dependencytrack.proto.vulnanalysis.v1.ScanKey;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import static org.apache.commons.lang3.ObjectUtils.requireNonEmpty;

/**
* Utility class to convert {@link alpine.event.framework.Event}s and {@link alpine.notification.Notification}s
* to {@link KafkaEvent}s.
Expand All @@ -40,6 +62,44 @@ final class KafkaEventConverter {
private KafkaEventConverter() {
}

static KafkaEvent<?, ?> convert(final Event event) {
return switch (event) {
case ComponentRepositoryMetaAnalysisEvent e -> convert(e);
case ComponentVulnerabilityAnalysisEvent e -> convert(e);
case GitHubAdvisoryMirrorEvent e -> convert(e);
case NistMirrorEvent e -> convert(e);
case OsvMirrorEvent e -> convert(e);
default -> throw new IllegalArgumentException("Unable to convert event " + event);
};
}

static KafkaEvent<?, ?> convert(final alpine.notification.Notification notification) {
final Notification protoNotification = NotificationModelConverter.convert(notification);
return convert(protoNotification);
}

static KafkaEvent<?, ?> convert(final Notification notification) {
final Topic<String, Notification> topic = extractDestinationTopic(notification);

final String recordKey;
try {
recordKey = extractEventKey(notification);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}

return new KafkaEvent<>(topic, recordKey, notification);
}

static List<KafkaEvent<?, ?>> convertAllNotificationProtos(final Collection<Notification> notifications) {
final var kafkaEvents = new ArrayList<KafkaEvent<?, ?>>(notifications.size());
for (final Notification notification : notifications) {
kafkaEvents.add(convert(notification));
}

return kafkaEvents;
}

static KafkaEvent<ScanKey, ScanCommand> convert(final ComponentVulnerabilityAnalysisEvent event) {
final var componentBuilder = org.dependencytrack.proto.vulnanalysis.v1.Component.newBuilder()
.setUuid(event.uuid().toString());
Expand Down Expand Up @@ -73,7 +133,7 @@ static KafkaEvent<String, AnalysisCommand> convert(final ComponentRepositoryMeta
final var componentBuilder = org.dependencytrack.proto.repometaanalysis.v1.Component.newBuilder()
.setPurl(event.purlCoordinates());
Optional.ofNullable(event.internal()).ifPresent(componentBuilder::setInternal);
Optional.ofNullable(event.componentUuid()).map(uuid -> uuid.toString()).ifPresent(componentBuilder::setUuid);
Optional.ofNullable(event.componentUuid()).map(UUID::toString).ifPresent(componentBuilder::setUuid);

final var analysisCommand = AnalysisCommand.newBuilder()
.setComponent(componentBuilder)
Expand All @@ -83,37 +143,123 @@ static KafkaEvent<String, AnalysisCommand> convert(final ComponentRepositoryMeta
return new KafkaEvent<>(KafkaTopics.REPO_META_ANALYSIS_COMMAND, event.purlCoordinates(), analysisCommand, null);
}

static KafkaEvent<String, Notification> convert(final String key, final Notification notification) {
final Topic<String, Notification> topic = switch (notification.getGroup()) {
static KafkaEvent<String, String> convert(final GitHubAdvisoryMirrorEvent ignored) {
final String key = Vulnerability.Source.GITHUB.name();
return new KafkaEvent<>(KafkaTopics.VULNERABILITY_MIRROR_COMMAND, key, null);
}

static KafkaEvent<String, String> convert(final NistMirrorEvent ignored) {
final String key = Vulnerability.Source.NVD.name();
return new KafkaEvent<>(KafkaTopics.VULNERABILITY_MIRROR_COMMAND, key, null);
}

static KafkaEvent<String, String> convert(final OsvMirrorEvent event) {
final String key = Vulnerability.Source.OSV.name();
final String value = event.ecosystem();
return new KafkaEvent<>(KafkaTopics.VULNERABILITY_MIRROR_COMMAND, key, value);
}

private static Topic<String, Notification> extractDestinationTopic(final Notification notification) {
return switch (notification.getGroup()) {
case GROUP_ANALYZER -> KafkaTopics.NOTIFICATION_ANALYZER;
case GROUP_BOM_CONSUMED, GROUP_BOM_PROCESSED, GROUP_BOM_PROCESSING_FAILED -> KafkaTopics.NOTIFICATION_BOM;
case GROUP_CONFIGURATION -> KafkaTopics.NOTIFICATION_CONFIGURATION;
case GROUP_DATASOURCE_MIRRORING -> KafkaTopics.NOTIFICATION_DATASOURCE_MIRRORING;
case GROUP_REPOSITORY -> KafkaTopics.NOTIFICATION_REPOSITORY;
case GROUP_INTEGRATION -> KafkaTopics.NOTIFICATION_INTEGRATION;
case GROUP_ANALYZER -> KafkaTopics.NOTIFICATION_ANALYZER;
case GROUP_BOM_CONSUMED -> KafkaTopics.NOTIFICATION_BOM;
case GROUP_BOM_PROCESSED -> KafkaTopics.NOTIFICATION_BOM;
case GROUP_FILE_SYSTEM -> KafkaTopics.NOTIFICATION_FILE_SYSTEM;
case GROUP_INTEGRATION -> KafkaTopics.NOTIFICATION_INTEGRATION;
case GROUP_NEW_VULNERABILITY -> KafkaTopics.NOTIFICATION_NEW_VULNERABILITY;
case GROUP_NEW_VULNERABLE_DEPENDENCY -> KafkaTopics.NOTIFICATION_NEW_VULNERABLE_DEPENDENCY;
case GROUP_POLICY_VIOLATION -> KafkaTopics.NOTIFICATION_POLICY_VIOLATION;
case GROUP_PROJECT_AUDIT_CHANGE -> KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE;
case GROUP_PROJECT_CREATED -> KafkaTopics.NOTIFICATION_PROJECT_CREATED;
case GROUP_VEX_CONSUMED -> KafkaTopics.NOTIFICATION_VEX;
case GROUP_VEX_PROCESSED -> KafkaTopics.NOTIFICATION_VEX;
case GROUP_BOM_PROCESSING_FAILED -> KafkaTopics.NOTIFICATION_BOM;
case GROUP_PROJECT_VULN_ANALYSIS_COMPLETE -> KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE;
default -> null;
case GROUP_REPOSITORY -> KafkaTopics.NOTIFICATION_REPOSITORY;
case GROUP_VEX_CONSUMED, GROUP_VEX_PROCESSED -> KafkaTopics.NOTIFICATION_VEX;
case GROUP_UNSPECIFIED, UNRECOGNIZED -> throw new IllegalArgumentException("""
Unable to determine destination topic because the notification does not \
specify a notification group: %s""".formatted(notification.getGroup()));
// NB: The lack of a default case is intentional. This way, the compiler will fail
// the build when new groups are added, and we don't have a case for it :)
};
if (topic == null) {
return null;
}
}

return new KafkaEvent<>(topic, key, notification, null);
private static String extractEventKey(final Notification notification) throws InvalidProtocolBufferException {
return switch (notification.getGroup()) {
case GROUP_BOM_CONSUMED, GROUP_BOM_PROCESSED -> {
requireSubjectOfTypeAnyOf(notification, List.of(BomConsumedOrProcessedSubject.class));
final var subject = notification.getSubject().unpack(BomConsumedOrProcessedSubject.class);
yield requireNonEmpty(subject.getProject().getUuid());
}
case GROUP_BOM_PROCESSING_FAILED -> {
requireSubjectOfTypeAnyOf(notification, List.of(BomProcessingFailedSubject.class));
final var subject = notification.getSubject().unpack(BomProcessingFailedSubject.class);
yield requireNonEmpty(subject.getProject().getUuid());
}
case GROUP_NEW_VULNERABILITY -> {
requireSubjectOfTypeAnyOf(notification, List.of(NewVulnerabilitySubject.class));
final var subject = notification.getSubject().unpack(NewVulnerabilitySubject.class);
yield requireNonEmpty(subject.getProject().getUuid());
}
case GROUP_NEW_VULNERABLE_DEPENDENCY -> {
requireSubjectOfTypeAnyOf(notification, List.of(NewVulnerableDependencySubject.class));
final var subject = notification.getSubject().unpack(NewVulnerableDependencySubject.class);
yield requireNonEmpty(subject.getProject().getUuid());
}
case GROUP_POLICY_VIOLATION -> {
requireSubjectOfTypeAnyOf(notification, List.of(PolicyViolationSubject.class));
final var subject = notification.getSubject().unpack(PolicyViolationSubject.class);
yield requireNonEmpty(subject.getProject().getUuid());
}
case GROUP_PROJECT_AUDIT_CHANGE -> {
final Class<? extends Message> matchingSubject = requireSubjectOfTypeAnyOf(notification, List.of(
PolicyViolationAnalysisDecisionChangeSubject.class,
VulnerabilityAnalysisDecisionChangeSubject.class
));

if (matchingSubject == PolicyViolationAnalysisDecisionChangeSubject.class) {
final var subject = notification.getSubject().unpack(PolicyViolationAnalysisDecisionChangeSubject.class);
yield requireNonEmpty(subject.getProject().getUuid());
} else {
final var subject = notification.getSubject().unpack(VulnerabilityAnalysisDecisionChangeSubject.class);
yield requireNonEmpty(subject.getProject().getUuid());
}
}
case GROUP_PROJECT_CREATED -> {
requireSubjectOfTypeAnyOf(notification, List.of(Project.class));
final var subject = notification.getSubject().unpack(Project.class);
yield requireNonEmpty(subject.getUuid());
}
case GROUP_PROJECT_VULN_ANALYSIS_COMPLETE -> {
requireSubjectOfTypeAnyOf(notification, List.of(ProjectVulnAnalysisCompleteSubject.class));
final var subject = notification.getSubject().unpack(ProjectVulnAnalysisCompleteSubject.class);
yield requireNonEmpty(subject.getProject().getUuid());
}
case GROUP_VEX_CONSUMED, GROUP_VEX_PROCESSED -> {
requireSubjectOfTypeAnyOf(notification, List.of(VexConsumedOrProcessedSubject.class));
final var subject = notification.getSubject().unpack(VexConsumedOrProcessedSubject.class);
yield requireNonEmpty(subject.getProject().getUuid());
}
case GROUP_ANALYZER, GROUP_CONFIGURATION, GROUP_DATASOURCE_MIRRORING,
GROUP_FILE_SYSTEM, GROUP_INTEGRATION, GROUP_REPOSITORY -> null;
case GROUP_UNSPECIFIED, UNRECOGNIZED -> throw new IllegalArgumentException("""
Unable to determine record key because the notification does not \
specify a notification group: %s""".formatted(notification.getGroup()));
// NB: The lack of a default case is intentional. This way, the compiler will fail
// the build when new groups are added, and we don't have a case for it :)
};
}

static KafkaEvent<String, Notification> convert(final UUID projectUuid, final alpine.notification.Notification alpineNotification) {
final Notification notification = NotificationModelConverter.convert(alpineNotification);
return convert(projectUuid != null ? projectUuid.toString() : null, notification);
private static Class<? extends Message> requireSubjectOfTypeAnyOf(final Notification notification,
final Collection<Class<? extends Message>> subjectClasses) {
if (!notification.hasSubject()) {
throw new IllegalArgumentException("Expected subject of type matching any of %s, but notification has no subject"
.formatted(subjectClasses));
}

return subjectClasses.stream()
.filter(notification.getSubject()::is).findFirst()
.orElseThrow(() -> new IllegalArgumentException("Expected subject of type matching any of %s, but is %s"
.formatted(subjectClasses, notification.getSubject().getTypeUrl())));
}

}
Loading

0 comments on commit bf3da71

Please sign in to comment.