Skip to content

Commit

Permalink
Merge pull request #554 from DependencyTrack/issue-907-3
Browse files Browse the repository at this point in the history
Migrate `RepositoryMetaResultProcessor` from Kafka Streams to Parallel Consumer
  • Loading branch information
nscuro authored Mar 26, 2024
2 parents bf3da71 + e6bf334 commit b250ac6
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 202 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public void contextInitialized(final ServletContextEvent event) {

PROCESSOR_MANAGER.registerProcessor(VulnerabilityMirrorProcessor.PROCESSOR_NAME,
KafkaTopics.NEW_VULNERABILITY, new VulnerabilityMirrorProcessor());
PROCESSOR_MANAGER.registerProcessor(RepositoryMetaResultProcessor.PROCESSOR_NAME,
KafkaTopics.REPO_META_ANALYSIS_RESULT, new RepositoryMetaResultProcessor());

PROCESSOR_MANAGER.startAll();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,25 @@
* SPDX-License-Identifier: Apache-2.0
* Copyright (c) OWASP Foundation. All Rights Reserved.
*/
package org.dependencytrack.event.kafka.streams.processor;
package org.dependencytrack.event.kafka.processor;

import alpine.common.logging.Logger;
import alpine.common.metrics.Metrics;
import com.github.packageurl.MalformedPackageURLException;
import com.github.packageurl.PackageURL;
import io.micrometer.core.instrument.Timer;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.dependencytrack.event.kafka.processor.api.Processor;
import org.dependencytrack.event.kafka.processor.exception.ProcessingException;
import org.dependencytrack.model.FetchStatus;
import org.dependencytrack.model.IntegrityMetaComponent;
import org.dependencytrack.model.RepositoryMetaComponent;
import org.dependencytrack.model.RepositoryType;
import org.dependencytrack.persistence.QueryManager;
import org.dependencytrack.proto.repometaanalysis.v1.AnalysisResult;
import org.postgresql.util.PSQLState;
import org.dependencytrack.util.PersistenceUtil;

import javax.jdo.JDODataStoreException;
import javax.jdo.PersistenceManager;
import javax.jdo.Query;
import javax.jdo.Transaction;
import java.sql.SQLException;
import java.util.Date;
import java.util.Optional;

Expand All @@ -47,16 +43,14 @@
/**
* A {@link Processor} responsible for processing result of component repository meta analyses.
*/
public class RepositoryMetaResultProcessor implements Processor<String, AnalysisResult, Void, Void> {
public class RepositoryMetaResultProcessor implements Processor<String, AnalysisResult> {

static final String PROCESSOR_NAME = "repo.meta.analysis.result";

private static final Logger LOGGER = Logger.getLogger(RepositoryMetaResultProcessor.class);
private static final Timer TIMER = Timer.builder("repo_meta_result_processing")
.description("Time taken to process repository meta analysis results")
.register(Metrics.getRegistry());

@Override
public void process(final Record<String, AnalysisResult> record) {
final Timer.Sample timerSample = Timer.start();
public void process(final ConsumerRecord<String, AnalysisResult> record) throws ProcessingException {
if (!isRecordValid(record)) {
return;
}
Expand All @@ -67,13 +61,11 @@ public void process(final Record<String, AnalysisResult> record) {
performIntegrityCheck(integrityMetaComponent, record.value(), qm);
}
} catch (Exception e) {
LOGGER.error("An unexpected error occurred while processing record %s".formatted(record), e);
} finally {
timerSample.stop(TIMER);
throw new ProcessingException(e);
}
}

private IntegrityMetaComponent synchronizeIntegrityMetadata(final QueryManager queryManager, final Record<String, AnalysisResult> record) throws MalformedPackageURLException {
private IntegrityMetaComponent synchronizeIntegrityMetadata(final QueryManager queryManager, final ConsumerRecord<String, AnalysisResult> record) throws MalformedPackageURLException {
final AnalysisResult result = record.value();
PackageURL purl = new PackageURL(result.getComponent().getPurl());
if (result.hasIntegrityMeta()) {
Expand All @@ -84,86 +76,73 @@ private IntegrityMetaComponent synchronizeIntegrityMetadata(final QueryManager q
}
}

private void synchronizeRepositoryMetadata(final QueryManager queryManager, final Record<String, AnalysisResult> record) throws Exception {
PersistenceManager pm = queryManager.getPersistenceManager();
private void synchronizeRepositoryMetadata(final QueryManager qm, final ConsumerRecord<String, AnalysisResult> record) throws Exception {
final PersistenceManager pm = qm.getPersistenceManager();
final AnalysisResult result = record.value();
PackageURL purl = new PackageURL(result.getComponent().getPurl());
final var purl = new PackageURL(result.getComponent().getPurl());

// It is possible that the same meta info is reported for multiple components in parallel,
// causing unique constraint violations when attempting to insert into the REPOSITORY_META_COMPONENT table.
// In such cases, we can get away with simply retrying to SELECT+UPDATE or INSERT again. We'll attempt
// up to 3 times before giving up.
for (int i = 0; i < 3; i++) {
final Transaction trx = pm.currentTransaction();
try {
RepositoryMetaComponent repositoryMetaComponentResult = createRepositoryMetaResult(record, pm, purl);
if (repositoryMetaComponentResult != null) {
trx.begin();
pm.makePersistent(repositoryMetaComponentResult);
trx.commit();
break; // this means that transaction was successful and we do not need to retry
}
} catch (JDODataStoreException e) {
// TODO: DataNucleus doesn't map constraint violation exceptions very well,
// so we have to depend on the exception of the underlying JDBC driver to
// tell us what happened. We currently only handle PostgreSQL, but we'll have
// to do the same for at least H2 and MSSQL.
if (ExceptionUtils.getRootCause(e) instanceof final SQLException se
&& PSQLState.UNIQUE_VIOLATION.getState().equals(se.getSQLState())) {
continue; // Retry
}

throw e;
} finally {
if (trx.isActive()) {
trx.rollback();
}
qm.runInRetryableTransaction(() -> {
final RepositoryMetaComponent repositoryMetaComponentResult = createRepositoryMetaResult(record, pm, purl);
if (repositoryMetaComponentResult != null) {
pm.makePersistent(repositoryMetaComponentResult);
}
}

return null;
}, PersistenceUtil::isUniqueConstraintViolation);
}

private RepositoryMetaComponent createRepositoryMetaResult(Record<String, AnalysisResult> incomingAnalysisResultRecord, PersistenceManager pm, PackageURL purl) throws Exception {
private RepositoryMetaComponent createRepositoryMetaResult(ConsumerRecord<String, AnalysisResult> incomingAnalysisResultRecord, PersistenceManager pm, PackageURL purl) {
final AnalysisResult result = incomingAnalysisResultRecord.value();
if (result.hasLatestVersion()) {
try (final Query<RepositoryMetaComponent> query = pm.newQuery(RepositoryMetaComponent.class)) {
query.setFilter("repositoryType == :repositoryType && namespace == :namespace && name == :name");
query.setParameters(
RepositoryType.resolve(purl),
purl.getNamespace(),
purl.getName()
);
RepositoryMetaComponent persistentRepoMetaComponent = query.executeUnique();
if (persistentRepoMetaComponent == null) {
persistentRepoMetaComponent = new RepositoryMetaComponent();
}

if (persistentRepoMetaComponent.getLastCheck() != null
&& persistentRepoMetaComponent.getLastCheck().after(new Date(incomingAnalysisResultRecord.timestamp()))) {
LOGGER.warn("""
Received repository meta information for %s that is older\s
than what's already in the database; Discarding
""".formatted(purl));
return null;
}

persistentRepoMetaComponent.setRepositoryType(RepositoryType.resolve(purl));
persistentRepoMetaComponent.setNamespace(purl.getNamespace());
persistentRepoMetaComponent.setName(purl.getName());
if (result.hasLatestVersion()) {
persistentRepoMetaComponent.setLatestVersion(result.getLatestVersion());
}
if (result.hasPublished()) {
persistentRepoMetaComponent.setPublished(new Date(result.getPublished().getSeconds() * 1000));
}
persistentRepoMetaComponent.setLastCheck(new Date(incomingAnalysisResultRecord.timestamp()));
return persistentRepoMetaComponent;
}
} else {
if (!result.hasLatestVersion()) {
return null;
}

final Query<RepositoryMetaComponent> query = pm.newQuery(RepositoryMetaComponent.class);
query.setFilter("repositoryType == :repositoryType && namespace == :namespace && name == :name");
query.setParameters(
RepositoryType.resolve(purl),
purl.getNamespace(),
purl.getName()
);

RepositoryMetaComponent persistentRepoMetaComponent;
try {
persistentRepoMetaComponent = query.executeUnique();
} finally {
query.closeAll();
}

if (persistentRepoMetaComponent == null) {
persistentRepoMetaComponent = new RepositoryMetaComponent();
}

if (persistentRepoMetaComponent.getLastCheck() != null
&& persistentRepoMetaComponent.getLastCheck().after(new Date(incomingAnalysisResultRecord.timestamp()))) {
LOGGER.warn("""
Received repository meta information for %s that is older\s
than what's already in the database; Discarding
""".formatted(purl));
return null;
}

persistentRepoMetaComponent.setRepositoryType(RepositoryType.resolve(purl));
persistentRepoMetaComponent.setNamespace(purl.getNamespace());
persistentRepoMetaComponent.setName(purl.getName());
if (result.hasLatestVersion()) {
persistentRepoMetaComponent.setLatestVersion(result.getLatestVersion());
}
if (result.hasPublished()) {
persistentRepoMetaComponent.setPublished(new Date(result.getPublished().getSeconds() * 1000));
}
persistentRepoMetaComponent.setLastCheck(new Date(incomingAnalysisResultRecord.timestamp()));
return persistentRepoMetaComponent;
}

private IntegrityMetaComponent synchronizeIntegrityMetaResult(final Record<String, AnalysisResult> incomingAnalysisResultRecord, QueryManager queryManager, PackageURL purl) {
private IntegrityMetaComponent synchronizeIntegrityMetaResult(final ConsumerRecord<String, AnalysisResult> incomingAnalysisResultRecord, QueryManager queryManager, PackageURL purl) {
final AnalysisResult result = incomingAnalysisResultRecord.value();
IntegrityMetaComponent persistentIntegrityMetaComponent = queryManager.getIntegrityMetaComponent(purl.toString());
if (persistentIntegrityMetaComponent != null && persistentIntegrityMetaComponent.getStatus() != null && persistentIntegrityMetaComponent.getStatus().equals(FetchStatus.PROCESSED)) {
Expand All @@ -178,10 +157,10 @@ private IntegrityMetaComponent synchronizeIntegrityMetaResult(final Record<Strin

if (result.getIntegrityMeta().hasMd5() || result.getIntegrityMeta().hasSha1() || result.getIntegrityMeta().hasSha256()
|| result.getIntegrityMeta().hasSha512() || result.getIntegrityMeta().hasCurrentVersionLastModified()) {
Optional.ofNullable(result.getIntegrityMeta().getMd5()).ifPresent(persistentIntegrityMetaComponent::setMd5);
Optional.ofNullable(result.getIntegrityMeta().getSha1()).ifPresent(persistentIntegrityMetaComponent::setSha1);
Optional.ofNullable(result.getIntegrityMeta().getSha256()).ifPresent(persistentIntegrityMetaComponent::setSha256);
Optional.ofNullable(result.getIntegrityMeta().getSha512()).ifPresent(persistentIntegrityMetaComponent::setSha512);
Optional.of(result.getIntegrityMeta().getMd5()).filter(StringUtils::isNotBlank).ifPresent(persistentIntegrityMetaComponent::setMd5);
Optional.of(result.getIntegrityMeta().getSha1()).filter(StringUtils::isNotBlank).ifPresent(persistentIntegrityMetaComponent::setSha1);
Optional.of(result.getIntegrityMeta().getSha256()).filter(StringUtils::isNotBlank).ifPresent(persistentIntegrityMetaComponent::setSha256);
Optional.of(result.getIntegrityMeta().getSha512()).filter(StringUtils::isNotBlank).ifPresent(persistentIntegrityMetaComponent::setSha512);
persistentIntegrityMetaComponent.setPurl(result.getComponent().getPurl());
persistentIntegrityMetaComponent.setRepositoryUrl(result.getIntegrityMeta().getMetaSourceUrl());
persistentIntegrityMetaComponent.setPublishedAt(result.getIntegrityMeta().hasCurrentVersionLastModified() ? new Date(result.getIntegrityMeta().getCurrentVersionLastModified().getSeconds() * 1000) : null);
Expand All @@ -198,7 +177,7 @@ private IntegrityMetaComponent synchronizeIntegrityMetaResult(final Record<Strin
return queryManager.updateIntegrityMetaComponent(persistentIntegrityMetaComponent);
}

private static boolean isRecordValid(final Record<String, AnalysisResult> record) {
private static boolean isRecordValid(final ConsumerRecord<String, AnalysisResult> record) {
final AnalysisResult result = record.value();
if (!result.hasComponent()) {
LOGGER.warn("""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.dependencytrack.event.ProjectPolicyEvaluationEvent;
import org.dependencytrack.event.kafka.KafkaTopics;
import org.dependencytrack.event.kafka.streams.processor.DelayedBomProcessedNotificationProcessor;
import org.dependencytrack.event.kafka.streams.processor.RepositoryMetaResultProcessor;
import org.dependencytrack.event.kafka.streams.processor.VulnerabilityScanResultProcessor;
import org.dependencytrack.model.VulnerabilityScan;
import org.dependencytrack.model.WorkflowState;
Expand Down Expand Up @@ -235,12 +234,6 @@ Topology createTopology() {
Event.dispatch(policyEvaluationEvent);
}, Named.as("trigger_policy_evaluation"));

streamsBuilder
.stream(KafkaTopics.REPO_META_ANALYSIS_RESULT.name(),
Consumed.with(KafkaTopics.REPO_META_ANALYSIS_RESULT.keySerde(), KafkaTopics.REPO_META_ANALYSIS_RESULT.valueSerde())
.withName("consume_from_%s_topic".formatted(KafkaTopics.REPO_META_ANALYSIS_RESULT.name())))
.process(RepositoryMetaResultProcessor::new, Named.as("process_repo_meta_analysis_result"));

return streamsBuilder.build(streamsProperties);
}

Expand Down
12 changes: 12 additions & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,18 @@ alpine.kafka.processor.vuln.mirror.retry.max.delay.ms=180000
alpine.kafka.processor.vuln.mirror.consumer.group.id=dtrack-apiserver-processor
alpine.kafka.processor.vuln.mirror.consumer.auto.offset.reset=earliest

# Required
# Configures the Kafka processor responsible for ingesting repository metadata
# analysis results from the dtrack.repo-meta-analysis.result topic.
alpine.kafka.processor.repo.meta.analysis.result.max.concurrency=-1
alpine.kafka.processor.repo.meta.analysis.result.processing.order=key
alpine.kafka.processor.repo.meta.analysis.result.retry.initial.delay.ms=1000
alpine.kafka.processor.repo.meta.analysis.result.retry.multiplier=2
alpine.kafka.processor.repo.meta.analysis.result.retry.randomization.factor=0.3
alpine.kafka.processor.repo.meta.analysis.result.retry.max.delay.ms=180000
alpine.kafka.processor.repo.meta.analysis.result.consumer.group.id=dtrack-apiserver-processor
alpine.kafka.processor.repo.meta.analysis.result.consumer.auto.offset.reset=earliest

# Scheduling tasks after 3 minutes (3*60*1000) of starting application
task.scheduler.initial.delay=180000

Expand Down
Loading

0 comments on commit b250ac6

Please sign in to comment.