From 4aa68ef125e5b100bb22e627d8ec4f4d1f0fe922 Mon Sep 17 00:00:00 2001 From: nscuro Date: Mon, 5 Feb 2024 13:50:38 +0100 Subject: [PATCH] Migrate `MirrorVulnerabilityProcessor` from Kafka Streams to Parallel Consumer Depends on https://github.com/DependencyTrack/hyades-apiserver/pull/552 Relates to https://github.com/DependencyTrack/hyades/issues/346 Relates to https://github.com/DependencyTrack/hyades/issues/901 Relates to https://github.com/DependencyTrack/hyades/issues/907 Signed-off-by: nscuro --- .../kafka/processor/ProcessorInitializer.java | 4 +- .../VulnerabilityMirrorProcessor.java} | 38 ++++---- .../streams/KafkaStreamsTopologyFactory.java | 7 -- src/main/resources/application.properties | 13 +++ .../VulnerabilityMirrorProcessorTest.java} | 90 +++++++++---------- 5 files changed, 72 insertions(+), 80 deletions(-) rename src/main/java/org/dependencytrack/event/kafka/{streams/processor/MirrorVulnerabilityProcessor.java => processor/VulnerabilityMirrorProcessor.java} (91%) rename src/test/java/org/dependencytrack/event/kafka/{streams/processor/MirrorVulnerabilityProcessorTest.java => processor/VulnerabilityMirrorProcessorTest.java} (95%) diff --git a/src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java b/src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java index 81aa599cf..fc47ab994 100644 --- a/src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java +++ b/src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java @@ -1,6 +1,7 @@ package org.dependencytrack.event.kafka.processor; import alpine.common.logging.Logger; +import org.dependencytrack.event.kafka.KafkaTopics; import org.dependencytrack.event.kafka.processor.api.ProcessorManager; import javax.servlet.ServletContextEvent; @@ -16,7 +17,8 @@ public class ProcessorInitializer implements ServletContextListener { public void contextInitialized(final ServletContextEvent event) { LOGGER.info("Initializing processors"); - // TODO: Register processor here! + PROCESSOR_MANAGER.registerProcessor(VulnerabilityMirrorProcessor.PROCESSOR_NAME, + KafkaTopics.NEW_VULNERABILITY, new VulnerabilityMirrorProcessor()); PROCESSOR_MANAGER.startAll(); } diff --git a/src/main/java/org/dependencytrack/event/kafka/streams/processor/MirrorVulnerabilityProcessor.java b/src/main/java/org/dependencytrack/event/kafka/processor/VulnerabilityMirrorProcessor.java similarity index 91% rename from src/main/java/org/dependencytrack/event/kafka/streams/processor/MirrorVulnerabilityProcessor.java rename to src/main/java/org/dependencytrack/event/kafka/processor/VulnerabilityMirrorProcessor.java index 0b99c2420..323a62ca7 100644 --- a/src/main/java/org/dependencytrack/event/kafka/streams/processor/MirrorVulnerabilityProcessor.java +++ b/src/main/java/org/dependencytrack/event/kafka/processor/VulnerabilityMirrorProcessor.java @@ -1,16 +1,14 @@ -package org.dependencytrack.event.kafka.streams.processor; +package org.dependencytrack.event.kafka.processor; import alpine.common.logging.Logger; -import alpine.common.metrics.Metrics; import com.github.packageurl.MalformedPackageURLException; import com.github.packageurl.PackageURL; -import io.micrometer.core.instrument.Timer; import org.apache.commons.lang3.StringUtils; -import org.apache.kafka.streams.processor.api.Processor; -import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.cyclonedx.proto.v1_4.Bom; import org.cyclonedx.proto.v1_4.Component; import org.cyclonedx.proto.v1_4.VulnerabilityAffects; +import org.dependencytrack.event.kafka.processor.api.Processor; import org.dependencytrack.model.Vulnerability; import org.dependencytrack.model.VulnerableSoftware; import org.dependencytrack.parser.dependencytrack.ModelConverterCdxToVuln; @@ -27,19 +25,18 @@ import java.util.List; import java.util.Optional; +/** + * A {@link Processor} that ingests vulnerability data from CycloneDX Bill of Vulnerabilities. + */ +public class VulnerabilityMirrorProcessor implements Processor { -public class MirrorVulnerabilityProcessor implements Processor { + static final String PROCESSOR_NAME = "vuln.mirror"; - private static final Logger LOGGER = Logger.getLogger(MirrorVulnerabilityProcessor.class); - private static final Timer TIMER = Timer.builder("vuln_mirror_processing") - .description("Time taken to process mirrored vulnerabilities") - .register(Metrics.getRegistry()); + private static final Logger LOGGER = Logger.getLogger(VulnerabilityMirrorProcessor.class); @Override - public void process(final Record record) { - final Timer.Sample timerSample = Timer.start(); - - try (QueryManager qm = new QueryManager().withL2CacheDisabled()) { + public void process(final ConsumerRecord record) { + try (QueryManager qm = new QueryManager()) { LOGGER.debug("Synchronizing Mirrored Vulnerability : " + record.key()); Bom bom = record.value(); String key = record.key(); @@ -112,11 +109,6 @@ public void process(final Record record) { synchronizedVulnerability.setVulnerableSoftware(reconciledVsList); } qm.persist(synchronizedVulnerability); - } catch (Exception e) { - // TODO: Send record to a dead letter topic. - LOGGER.error("Synchronizing vulnerability %s failed".formatted(record.key()), e); - } finally { - timerSample.stop(TIMER); } } @@ -230,8 +222,8 @@ public VulnerableSoftware mapAffectedRangeToVulnerableSoftware(final QueryManage for (final Constraint constraint : vers.constraints()) { if (constraint.version() == null - || constraint.version().equals("0") - || constraint.version().equals("*")) { + || constraint.version().equals("0") + || constraint.version().equals("*")) { // Semantically, ">=0" is equivalent to versionStartIncluding=null, // and ">0" is equivalent to versionStartExcluding=null. // @@ -253,12 +245,12 @@ public VulnerableSoftware mapAffectedRangeToVulnerableSoftware(final QueryManage } if (versionStartIncluding == null && versionStartExcluding == null - && versionEndIncluding == null && versionEndExcluding == null) { + && versionEndIncluding == null && versionEndExcluding == null) { LOGGER.warn("Unable to assemble a version range from %s for %s".formatted(vers, vulnId)); return null; } if ((versionStartIncluding != null || versionStartExcluding != null) - && (versionEndIncluding == null && versionEndExcluding == null)) { + && (versionEndIncluding == null && versionEndExcluding == null)) { LOGGER.warn("Skipping indefinite version range assembled from %s for %s".formatted(vers, vulnId)); return null; } diff --git a/src/main/java/org/dependencytrack/event/kafka/streams/KafkaStreamsTopologyFactory.java b/src/main/java/org/dependencytrack/event/kafka/streams/KafkaStreamsTopologyFactory.java index 96f446392..f971841df 100644 --- a/src/main/java/org/dependencytrack/event/kafka/streams/KafkaStreamsTopologyFactory.java +++ b/src/main/java/org/dependencytrack/event/kafka/streams/KafkaStreamsTopologyFactory.java @@ -23,7 +23,6 @@ import org.dependencytrack.event.ProjectPolicyEvaluationEvent; import org.dependencytrack.event.kafka.KafkaTopics; import org.dependencytrack.event.kafka.streams.processor.DelayedBomProcessedNotificationProcessor; -import org.dependencytrack.event.kafka.streams.processor.MirrorVulnerabilityProcessor; import org.dependencytrack.event.kafka.streams.processor.RepositoryMetaResultProcessor; import org.dependencytrack.event.kafka.streams.processor.VulnerabilityScanResultProcessor; import org.dependencytrack.model.VulnerabilityScan; @@ -224,12 +223,6 @@ Topology createTopology() { .withName("consume_from_%s_topic".formatted(KafkaTopics.REPO_META_ANALYSIS_RESULT.name()))) .process(RepositoryMetaResultProcessor::new, Named.as("process_repo_meta_analysis_result")); - streamsBuilder - .stream(KafkaTopics.NEW_VULNERABILITY.name(), - Consumed.with(KafkaTopics.NEW_VULNERABILITY.keySerde(), KafkaTopics.NEW_VULNERABILITY.valueSerde()) - .withName("consume_from_%s_topic".formatted(KafkaTopics.NEW_VULNERABILITY.name()))) - .process(MirrorVulnerabilityProcessor::new, Named.as("process_mirror_vulnerability")); - return streamsBuilder.build(streamsProperties); } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index ef044624c..1fac3c5bc 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -491,6 +491,19 @@ kafka.streams.transient.processing.exception.threshold.interval=PT30M # Refer to https://kafka.apache.org/documentation/#consumerconfigs for available options. # alpine.kafka.processor..consumer.= +# Required +# Configures the Kafka processor responsible for ingesting mirrored vulnerability +# data from the dtrack.vulnerability topic. The processor only occasionally receives +# records, such that high concurrency is usually not justified. +alpine.kafka.processor.vuln.mirror.max.concurrency=-1 +alpine.kafka.processor.vuln.mirror.processing.order=partition +alpine.kafka.processor.vuln.mirror.retry.initial.delay.ms=3000 +alpine.kafka.processor.vuln.mirror.retry.multiplier=2 +alpine.kafka.processor.vuln.mirror.retry.randomization.factor=0.3 +alpine.kafka.processor.vuln.mirror.retry.max.delay.ms=180000 +alpine.kafka.processor.vuln.mirror.consumer.group.id=dtrack-apiserver-processor +alpine.kafka.processor.vuln.mirror.consumer.auto.offset.reset=earliest + # Scheduling tasks after 3 minutes (3*60*1000) of starting application task.scheduler.initial.delay=180000 diff --git a/src/test/java/org/dependencytrack/event/kafka/streams/processor/MirrorVulnerabilityProcessorTest.java b/src/test/java/org/dependencytrack/event/kafka/processor/VulnerabilityMirrorProcessorTest.java similarity index 95% rename from src/test/java/org/dependencytrack/event/kafka/streams/processor/MirrorVulnerabilityProcessorTest.java rename to src/test/java/org/dependencytrack/event/kafka/processor/VulnerabilityMirrorProcessorTest.java index b8a57778c..afbb6149d 100644 --- a/src/test/java/org/dependencytrack/event/kafka/streams/processor/MirrorVulnerabilityProcessorTest.java +++ b/src/test/java/org/dependencytrack/event/kafka/processor/VulnerabilityMirrorProcessorTest.java @@ -1,55 +1,26 @@ -package org.dependencytrack.event.kafka.streams.processor; +package org.dependencytrack.event.kafka.processor; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.TestInputTopic; -import org.apache.kafka.streams.TopologyTestDriver; -import org.apache.kafka.streams.kstream.Consumed; -import org.cyclonedx.proto.v1_4.Bom; -import org.dependencytrack.PersistenceCapableTest; -import org.dependencytrack.event.kafka.serialization.KafkaProtobufSerde; -import org.dependencytrack.event.kafka.serialization.KafkaProtobufSerializer; import org.dependencytrack.model.Severity; import org.dependencytrack.model.Vulnerability; import org.dependencytrack.persistence.CweImporter; -import org.dependencytrack.util.KafkaTestUtil; -import org.junit.After; import org.junit.Before; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; +import static org.dependencytrack.util.KafkaTestUtil.generateBomFromJson; -public class MirrorVulnerabilityProcessorTest extends PersistenceCapableTest { - - private TopologyTestDriver testDriver; - private TestInputTopic inputTopic; +public class VulnerabilityMirrorProcessorTest extends AbstractProcessorTest { @Before - public void setUp() throws Exception { - final var streamsBuilder = new StreamsBuilder(); - streamsBuilder - .stream("input-topic", Consumed - .with(Serdes.String(), new KafkaProtobufSerde<>(Bom.parser()))) - .process(MirrorVulnerabilityProcessor::new); - - testDriver = new TopologyTestDriver(streamsBuilder.build()); - inputTopic = testDriver.createInputTopic("input-topic", - new StringSerializer(), new KafkaProtobufSerializer<>()); + public void before() throws Exception { + super.before(); new CweImporter().processCweDefinitions(); // Required for CWE mapping } - @After - public void tearDown() { - if (testDriver != null) { - testDriver.close(); - } - } - @Test public void testProcessNvdVuln() throws Exception { - inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson(""" + final var bovJson = """ { "components": [ { @@ -91,7 +62,10 @@ public void testProcessNvdVuln() throws Exception { { "url": "https://github.com/thinkcmf/thinkcmf/issues/736" } ] } - """)); + """; + + final var processor = new VulnerabilityMirrorProcessor(); + processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build()); final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489"); assertThat(vuln).isNotNull(); @@ -160,7 +134,7 @@ public void testProcessNvdVuln() throws Exception { @Test public void testProcessGitHubVuln() throws Exception { - inputTopic.pipeInput("GITHUB/GHSA-fxwm-579q-49qq", KafkaTestUtil.generateBomFromJson(""" + final var bovJson = """ { "components": [ { @@ -223,7 +197,10 @@ public void testProcessGitHubVuln() throws Exception { { "url": "https://github.com/advisories/GHSA-fxwm-579q-49qq" } ] } - """)); + """; + + final var processor = new VulnerabilityMirrorProcessor(); + processor.process(aConsumerRecord("GITHUB/GHSA-fxwm-579q-49qq", generateBomFromJson(bovJson)).build()); final Vulnerability vuln = qm.getVulnerabilityByVulnId("GITHUB", "GHSA-fxwm-579q-49qq"); assertThat(vuln).isNotNull(); @@ -375,7 +352,7 @@ public void testProcessGitHubVuln() throws Exception { @Test public void testProcessOsvVuln() throws Exception { - inputTopic.pipeInput("OSV/GHSA-2cc5-23r7-vc4v", KafkaTestUtil.generateBomFromJson(""" + final var bovJson = """ { "components": [ { @@ -427,7 +404,10 @@ public void testProcessOsvVuln() throws Exception { { "url": "https://github.com/ratpack/ratpack/blob/29434f7ac6fd4b36a4495429b70f4c8163100332/ratpack-session/src/main/java/ratpack/session/clientside/ClientSideSessionConfig.java#L29" } ] } - """)); + """; + + final var processor = new VulnerabilityMirrorProcessor(); + processor.process(aConsumerRecord("OSV/GHSA-2cc5-23r7-vc4v", generateBomFromJson(bovJson)).build()); final Vulnerability vuln = qm.getVulnerabilityByVulnId("GITHUB", "GHSA-2cc5-23r7-vc4v"); assertThat(vuln).isNotNull(); @@ -555,7 +535,7 @@ public void testProcessOsvVuln() throws Exception { @Test public void testProcessVulnWithoutAffects() throws Exception { - inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson(""" + final var bovJson = """ { "components": [ { @@ -573,7 +553,10 @@ public void testProcessVulnWithoutAffects() throws Exception { } ] } - """)); + """; + + final var processor = new VulnerabilityMirrorProcessor(); + processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build()); final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489"); assertThat(vuln).isNotNull(); @@ -613,7 +596,7 @@ public void testProcessVulnWithoutAffects() throws Exception { @Test public void testProcessVulnWithUnmatchedAffectsBomRef() throws Exception { - inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson(""" + final var bovJson = """ { "components": [ { @@ -639,7 +622,10 @@ public void testProcessVulnWithUnmatchedAffectsBomRef() throws Exception { } ] } - """)); + """; + + final var processor = new VulnerabilityMirrorProcessor(); + processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build()); final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489"); assertThat(vuln).isNotNull(); @@ -679,7 +665,7 @@ public void testProcessVulnWithUnmatchedAffectsBomRef() throws Exception { @Test public void testProcessVulnWithVersConstraints() throws Exception { - inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson(""" + final var bovJson = """ { "components": [ { @@ -731,7 +717,10 @@ public void testProcessVulnWithVersConstraints() throws Exception { } ] } - """)); + """; + + final var processor = new VulnerabilityMirrorProcessor(); + processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build()); final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489"); assertThat(vuln).isNotNull(); @@ -935,7 +924,7 @@ public void testProcessVulnWithVersConstraints() throws Exception { @Test public void testProcessVulnWithInvalidCpeOrPurl() throws Exception { - inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson(""" + final var bovJson = """ { "components": [ { @@ -997,7 +986,10 @@ public void testProcessVulnWithInvalidCpeOrPurl() throws Exception { } ] } - """)); + """; + + final var processor = new VulnerabilityMirrorProcessor(); + processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build()); final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489"); assertThat(vuln).isNotNull(); @@ -1035,4 +1027,4 @@ public void testProcessVulnWithInvalidCpeOrPurl() throws Exception { assertThat(vuln.getVulnerableSoftware()).isEmpty(); } -} +} \ No newline at end of file