Skip to content

Commit

Permalink
Merge pull request #553 from DependencyTrack/issue-907-2
Browse files Browse the repository at this point in the history
Migrate `MirrorVulnerabilityProcessor` from Kafka Streams to Parallel Consumer
  • Loading branch information
nscuro committed Mar 22, 2024
2 parents 570e0cc + 4aa68ef commit 8ff235b
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 80 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.dependencytrack.event.kafka.processor;

import alpine.common.logging.Logger;
import org.dependencytrack.event.kafka.KafkaTopics;
import org.dependencytrack.event.kafka.processor.api.ProcessorManager;

import javax.servlet.ServletContextEvent;
Expand All @@ -16,7 +17,8 @@ public class ProcessorInitializer implements ServletContextListener {
public void contextInitialized(final ServletContextEvent event) {
LOGGER.info("Initializing processors");

// TODO: Register processor here!
PROCESSOR_MANAGER.registerProcessor(VulnerabilityMirrorProcessor.PROCESSOR_NAME,
KafkaTopics.NEW_VULNERABILITY, new VulnerabilityMirrorProcessor());

PROCESSOR_MANAGER.startAll();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package org.dependencytrack.event.kafka.streams.processor;
package org.dependencytrack.event.kafka.processor;

import alpine.common.logging.Logger;
import alpine.common.metrics.Metrics;
import com.github.packageurl.MalformedPackageURLException;
import com.github.packageurl.PackageURL;
import io.micrometer.core.instrument.Timer;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.cyclonedx.proto.v1_4.Bom;
import org.cyclonedx.proto.v1_4.Component;
import org.cyclonedx.proto.v1_4.VulnerabilityAffects;
import org.dependencytrack.event.kafka.processor.api.Processor;
import org.dependencytrack.model.Vulnerability;
import org.dependencytrack.model.VulnerableSoftware;
import org.dependencytrack.parser.dependencytrack.ModelConverterCdxToVuln;
Expand All @@ -27,19 +25,18 @@
import java.util.List;
import java.util.Optional;

/**
* A {@link Processor} that ingests vulnerability data from CycloneDX Bill of Vulnerabilities.
*/
public class VulnerabilityMirrorProcessor implements Processor<String, Bom> {

public class MirrorVulnerabilityProcessor implements Processor<String, Bom, Void, Void> {
static final String PROCESSOR_NAME = "vuln.mirror";

private static final Logger LOGGER = Logger.getLogger(MirrorVulnerabilityProcessor.class);
private static final Timer TIMER = Timer.builder("vuln_mirror_processing")
.description("Time taken to process mirrored vulnerabilities")
.register(Metrics.getRegistry());
private static final Logger LOGGER = Logger.getLogger(VulnerabilityMirrorProcessor.class);

@Override
public void process(final Record<String, Bom> record) {
final Timer.Sample timerSample = Timer.start();

try (QueryManager qm = new QueryManager().withL2CacheDisabled()) {
public void process(final ConsumerRecord<String, Bom> record) {
try (QueryManager qm = new QueryManager()) {
LOGGER.debug("Synchronizing Mirrored Vulnerability : " + record.key());
Bom bom = record.value();
String key = record.key();
Expand Down Expand Up @@ -112,11 +109,6 @@ public void process(final Record<String, Bom> record) {
synchronizedVulnerability.setVulnerableSoftware(reconciledVsList);
}
qm.persist(synchronizedVulnerability);
} catch (Exception e) {
// TODO: Send record to a dead letter topic.
LOGGER.error("Synchronizing vulnerability %s failed".formatted(record.key()), e);
} finally {
timerSample.stop(TIMER);
}
}

Expand Down Expand Up @@ -230,8 +222,8 @@ public VulnerableSoftware mapAffectedRangeToVulnerableSoftware(final QueryManage

for (final Constraint constraint : vers.constraints()) {
if (constraint.version() == null
|| constraint.version().equals("0")
|| constraint.version().equals("*")) {
|| constraint.version().equals("0")
|| constraint.version().equals("*")) {
// Semantically, ">=0" is equivalent to versionStartIncluding=null,
// and ">0" is equivalent to versionStartExcluding=null.
//
Expand All @@ -253,12 +245,12 @@ public VulnerableSoftware mapAffectedRangeToVulnerableSoftware(final QueryManage
}

if (versionStartIncluding == null && versionStartExcluding == null
&& versionEndIncluding == null && versionEndExcluding == null) {
&& versionEndIncluding == null && versionEndExcluding == null) {
LOGGER.warn("Unable to assemble a version range from %s for %s".formatted(vers, vulnId));
return null;
}
if ((versionStartIncluding != null || versionStartExcluding != null)
&& (versionEndIncluding == null && versionEndExcluding == null)) {
&& (versionEndIncluding == null && versionEndExcluding == null)) {
LOGGER.warn("Skipping indefinite version range assembled from %s for %s".formatted(vers, vulnId));
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.dependencytrack.event.ProjectPolicyEvaluationEvent;
import org.dependencytrack.event.kafka.KafkaTopics;
import org.dependencytrack.event.kafka.streams.processor.DelayedBomProcessedNotificationProcessor;
import org.dependencytrack.event.kafka.streams.processor.MirrorVulnerabilityProcessor;
import org.dependencytrack.event.kafka.streams.processor.RepositoryMetaResultProcessor;
import org.dependencytrack.event.kafka.streams.processor.VulnerabilityScanResultProcessor;
import org.dependencytrack.model.VulnerabilityScan;
Expand Down Expand Up @@ -224,12 +223,6 @@ Topology createTopology() {
.withName("consume_from_%s_topic".formatted(KafkaTopics.REPO_META_ANALYSIS_RESULT.name())))
.process(RepositoryMetaResultProcessor::new, Named.as("process_repo_meta_analysis_result"));

streamsBuilder
.stream(KafkaTopics.NEW_VULNERABILITY.name(),
Consumed.with(KafkaTopics.NEW_VULNERABILITY.keySerde(), KafkaTopics.NEW_VULNERABILITY.valueSerde())
.withName("consume_from_%s_topic".formatted(KafkaTopics.NEW_VULNERABILITY.name())))
.process(MirrorVulnerabilityProcessor::new, Named.as("process_mirror_vulnerability"));

return streamsBuilder.build(streamsProperties);
}

Expand Down
13 changes: 13 additions & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,19 @@ kafka.streams.transient.processing.exception.threshold.interval=PT30M
# Refer to https://kafka.apache.org/documentation/#consumerconfigs for available options.
# alpine.kafka.processor.<name>.consumer.<consumer.config.name>=

# Required
# Configures the Kafka processor responsible for ingesting mirrored vulnerability
# data from the dtrack.vulnerability topic. The processor only occasionally receives
# records, such that high concurrency is usually not justified.
alpine.kafka.processor.vuln.mirror.max.concurrency=-1
alpine.kafka.processor.vuln.mirror.processing.order=partition
alpine.kafka.processor.vuln.mirror.retry.initial.delay.ms=3000
alpine.kafka.processor.vuln.mirror.retry.multiplier=2
alpine.kafka.processor.vuln.mirror.retry.randomization.factor=0.3
alpine.kafka.processor.vuln.mirror.retry.max.delay.ms=180000
alpine.kafka.processor.vuln.mirror.consumer.group.id=dtrack-apiserver-processor
alpine.kafka.processor.vuln.mirror.consumer.auto.offset.reset=earliest

# Scheduling tasks after 3 minutes (3*60*1000) of starting application
task.scheduler.initial.delay=180000

Expand Down
Original file line number Diff line number Diff line change
@@ -1,55 +1,26 @@
package org.dependencytrack.event.kafka.streams.processor;
package org.dependencytrack.event.kafka.processor;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.cyclonedx.proto.v1_4.Bom;
import org.dependencytrack.PersistenceCapableTest;
import org.dependencytrack.event.kafka.serialization.KafkaProtobufSerde;
import org.dependencytrack.event.kafka.serialization.KafkaProtobufSerializer;
import org.dependencytrack.model.Severity;
import org.dependencytrack.model.Vulnerability;
import org.dependencytrack.persistence.CweImporter;
import org.dependencytrack.util.KafkaTestUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.dependencytrack.util.KafkaTestUtil.generateBomFromJson;

public class MirrorVulnerabilityProcessorTest extends PersistenceCapableTest {

private TopologyTestDriver testDriver;
private TestInputTopic<String, Bom> inputTopic;
public class VulnerabilityMirrorProcessorTest extends AbstractProcessorTest {

@Before
public void setUp() throws Exception {
final var streamsBuilder = new StreamsBuilder();
streamsBuilder
.stream("input-topic", Consumed
.with(Serdes.String(), new KafkaProtobufSerde<>(Bom.parser())))
.process(MirrorVulnerabilityProcessor::new);

testDriver = new TopologyTestDriver(streamsBuilder.build());
inputTopic = testDriver.createInputTopic("input-topic",
new StringSerializer(), new KafkaProtobufSerializer<>());
public void before() throws Exception {
super.before();

new CweImporter().processCweDefinitions(); // Required for CWE mapping
}

@After
public void tearDown() {
if (testDriver != null) {
testDriver.close();
}
}

@Test
public void testProcessNvdVuln() throws Exception {
inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson("""
final var bovJson = """
{
"components": [
{
Expand Down Expand Up @@ -91,7 +62,10 @@ public void testProcessNvdVuln() throws Exception {
{ "url": "https://github.com/thinkcmf/thinkcmf/issues/736" }
]
}
"""));
""";

final var processor = new VulnerabilityMirrorProcessor();
processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build());

final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489");
assertThat(vuln).isNotNull();
Expand Down Expand Up @@ -160,7 +134,7 @@ public void testProcessNvdVuln() throws Exception {

@Test
public void testProcessGitHubVuln() throws Exception {
inputTopic.pipeInput("GITHUB/GHSA-fxwm-579q-49qq", KafkaTestUtil.generateBomFromJson("""
final var bovJson = """
{
"components": [
{
Expand Down Expand Up @@ -223,7 +197,10 @@ public void testProcessGitHubVuln() throws Exception {
{ "url": "https://github.com/advisories/GHSA-fxwm-579q-49qq" }
]
}
"""));
""";

final var processor = new VulnerabilityMirrorProcessor();
processor.process(aConsumerRecord("GITHUB/GHSA-fxwm-579q-49qq", generateBomFromJson(bovJson)).build());

final Vulnerability vuln = qm.getVulnerabilityByVulnId("GITHUB", "GHSA-fxwm-579q-49qq");
assertThat(vuln).isNotNull();
Expand Down Expand Up @@ -375,7 +352,7 @@ public void testProcessGitHubVuln() throws Exception {

@Test
public void testProcessOsvVuln() throws Exception {
inputTopic.pipeInput("OSV/GHSA-2cc5-23r7-vc4v", KafkaTestUtil.generateBomFromJson("""
final var bovJson = """
{
"components": [
{
Expand Down Expand Up @@ -427,7 +404,10 @@ public void testProcessOsvVuln() throws Exception {
{ "url": "https://github.com/ratpack/ratpack/blob/29434f7ac6fd4b36a4495429b70f4c8163100332/ratpack-session/src/main/java/ratpack/session/clientside/ClientSideSessionConfig.java#L29" }
]
}
"""));
""";

final var processor = new VulnerabilityMirrorProcessor();
processor.process(aConsumerRecord("OSV/GHSA-2cc5-23r7-vc4v", generateBomFromJson(bovJson)).build());

final Vulnerability vuln = qm.getVulnerabilityByVulnId("GITHUB", "GHSA-2cc5-23r7-vc4v");
assertThat(vuln).isNotNull();
Expand Down Expand Up @@ -555,7 +535,7 @@ public void testProcessOsvVuln() throws Exception {

@Test
public void testProcessVulnWithoutAffects() throws Exception {
inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson("""
final var bovJson = """
{
"components": [
{
Expand All @@ -573,7 +553,10 @@ public void testProcessVulnWithoutAffects() throws Exception {
}
]
}
"""));
""";

final var processor = new VulnerabilityMirrorProcessor();
processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build());

final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489");
assertThat(vuln).isNotNull();
Expand Down Expand Up @@ -613,7 +596,7 @@ public void testProcessVulnWithoutAffects() throws Exception {

@Test
public void testProcessVulnWithUnmatchedAffectsBomRef() throws Exception {
inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson("""
final var bovJson = """
{
"components": [
{
Expand All @@ -639,7 +622,10 @@ public void testProcessVulnWithUnmatchedAffectsBomRef() throws Exception {
}
]
}
"""));
""";

final var processor = new VulnerabilityMirrorProcessor();
processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build());

final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489");
assertThat(vuln).isNotNull();
Expand Down Expand Up @@ -679,7 +665,7 @@ public void testProcessVulnWithUnmatchedAffectsBomRef() throws Exception {

@Test
public void testProcessVulnWithVersConstraints() throws Exception {
inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson("""
final var bovJson = """
{
"components": [
{
Expand Down Expand Up @@ -731,7 +717,10 @@ public void testProcessVulnWithVersConstraints() throws Exception {
}
]
}
"""));
""";

final var processor = new VulnerabilityMirrorProcessor();
processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build());

final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489");
assertThat(vuln).isNotNull();
Expand Down Expand Up @@ -935,7 +924,7 @@ public void testProcessVulnWithVersConstraints() throws Exception {

@Test
public void testProcessVulnWithInvalidCpeOrPurl() throws Exception {
inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson("""
final var bovJson = """
{
"components": [
{
Expand Down Expand Up @@ -997,7 +986,10 @@ public void testProcessVulnWithInvalidCpeOrPurl() throws Exception {
}
]
}
"""));
""";

final var processor = new VulnerabilityMirrorProcessor();
processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build());

final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489");
assertThat(vuln).isNotNull();
Expand Down Expand Up @@ -1035,4 +1027,4 @@ public void testProcessVulnWithInvalidCpeOrPurl() throws Exception {
assertThat(vuln.getVulnerableSoftware()).isEmpty();
}

}
}

0 comments on commit 8ff235b

Please sign in to comment.