Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send integrity meta events outside of transaction and handle integrity violation #380

Merged
merged 2 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.dependencytrack.model.Component;
import org.dependencytrack.model.ComponentIdentity;
import org.dependencytrack.model.ConfigPropertyConstants;
import org.dependencytrack.model.IntegrityMetaComponent;
import org.dependencytrack.model.Project;
import org.dependencytrack.model.RepositoryMetaComponent;
import org.dependencytrack.model.RepositoryType;
Expand Down Expand Up @@ -760,8 +759,4 @@ private void getDirectDependenciesForPathDependencies(Map<String, Component> dep
}
dependencyGraph.putAll(addToDependencyGraph);
}

public IntegrityMetaComponent createIntegrityMetaComponent(IntegrityMetaComponent integrityMetaComponent) {
return persist(integrityMetaComponent);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,34 @@ public synchronized IntegrityMetaComponent updateIntegrityMetaComponent(final In
}
}

public IntegrityMetaComponent createIntegrityMetaComponent(IntegrityMetaComponent integrityMetaComponent) {
return persist(integrityMetaComponent);
}

public void createIntegrityMetaHandlingConflict(IntegrityMetaComponent integrityMetaComponent) {
final String createQuery = """
INSERT INTO "INTEGRITY_META_COMPONENT" ("PURL", "STATUS", "LAST_FETCH")
VALUES (?, ?, ?)
ON CONFLICT DO NOTHING
""";
Connection connection = null;
PreparedStatement preparedStatement = null;
try {
connection = (Connection) pm.getDataStoreConnection();
preparedStatement = connection.prepareStatement(createQuery);
preparedStatement.setString(1, integrityMetaComponent.getPurl().toString());
preparedStatement.setString(2, integrityMetaComponent.getStatus().toString());
preparedStatement.setTimestamp(3, new java.sql.Timestamp(integrityMetaComponent.getLastFetch().getTime()));
preparedStatement.execute();
} catch (Exception ex) {
LOGGER.error("Error in creating integrity meta component", ex);
throw new RuntimeException(ex);
} finally {
DbUtil.close(preparedStatement);
DbUtil.close(connection);
}
}

/**
* Synchronizes IntegrityMetaComponent with purls from COMPONENT. This is part of initializer.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1742,7 +1742,11 @@ public void batchUpdateIntegrityMetaComponent(List<IntegrityMetaComponent> purls
}

public IntegrityMetaComponent createIntegrityMetaComponent(IntegrityMetaComponent integrityMetaComponent) {
return getComponentQueryManager().createIntegrityMetaComponent(integrityMetaComponent);
return getIntegrityMetaQueryManager().createIntegrityMetaComponent(integrityMetaComponent);
}

public void createIntegrityMetaHandlingConflict(IntegrityMetaComponent integrityMetaComponent) {
getIntegrityMetaQueryManager().createIntegrityMetaHandlingConflict(integrityMetaComponent);
}

public IntegrityAnalysis getIntegrityAnalysisByComponentUuid(UUID uuid) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import alpine.event.framework.Subscriber;
import alpine.notification.Notification;
import alpine.notification.NotificationLevel;
import com.google.common.collect.ImmutableMap;
import io.micrometer.core.instrument.Timer;
import org.apache.commons.collections4.MultiValuedMap;
import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
Expand Down Expand Up @@ -263,6 +264,8 @@ private void processBom(final Context ctx, final File bomFile) throws BomConsump

final var vulnAnalysisEvents = new ArrayList<ComponentVulnerabilityAnalysisEvent>();
final var repoMetaAnalysisEvents = new ArrayList<ComponentRepositoryMetaAnalysisEvent>();
final var integrityMetaAnalysisEvents = new ArrayList<ComponentRepositoryMetaAnalysisEvent>();
Map<ComponentIdentity, Component> copyOfPersistentComponents;

try (final var qm = new QueryManager()) {
final PersistenceManager pm = qm.getPersistenceManager();
Expand Down Expand Up @@ -311,6 +314,7 @@ private void processBom(final Context ctx, final File bomFile) throws BomConsump
processServices(qm, project, services, identitiesByBomRef, bomRefsByIdentity);
processDependencyGraph(ctx, pm, cdxBom, project, persistentComponents, persistentServices, identitiesByBomRef);
recordBomImport(ctx, pm, project);
copyOfPersistentComponents = ImmutableMap.copyOf(persistentComponents);

// BOM ref <-> ComponentIdentity indexes are no longer needed.
// Let go of their contents to make it eligible for GC sooner.
Expand All @@ -322,15 +326,8 @@ private void processBom(final Context ctx, final File bomFile) throws BomConsump
// The constructors of ComponentRepositoryMetaAnalysisEvent and ComponentVulnerabilityAnalysisEvent
// merely call a few getters on it, but the component object itself is not passed around.
// Detaching would imply additional database interactions that we'd rather not do.
if (component.getPurl() != null) {
boolean result = SUPPORTED_PACKAGE_URLS_FOR_INTEGRITY_CHECK.contains(component.getPurl().getType());
ComponentRepositoryMetaAnalysisEvent event;
if (result) {
event = createRepoMetaAnalysisEvent(component, qm);
} else {
event = new ComponentRepositoryMetaAnalysisEvent(component.getUuid(), component.getPurlCoordinates().toString(), component.isInternal(), FetchMeta.FETCH_META_LATEST_VERSION);
}
repoMetaAnalysisEvents.add(event);
if(component.getPurl() != null) {
repoMetaAnalysisEvents.add(new ComponentRepositoryMetaAnalysisEvent(component.getUuid(), component.getPurlCoordinates().toString(), component.isInternal(), FetchMeta.FETCH_META_LATEST_VERSION));
}
vulnAnalysisEvents.add(new ComponentVulnerabilityAnalysisEvent(
ctx.uploadToken, component, VulnerabilityAnalysisLevel.BOM_UPLOAD_ANALYSIS, component.isNew()));
Expand All @@ -343,6 +340,23 @@ private void processBom(final Context ctx, final File bomFile) throws BomConsump
}
}

//only integrity metadata events have to be sent because latest version events
//have been sent already
for (final Component component : copyOfPersistentComponents.values()) {
// Note: component does not need to be detached.
// The constructors of ComponentRepositoryMetaAnalysisEvent and ComponentVulnerabilityAnalysisEvent
// merely call a few getters on it, but the component object itself is not passed around.
// Detaching would imply additional database interactions that we'd rather not do
if (component.getPurl() != null) {
if(SUPPORTED_PACKAGE_URLS_FOR_INTEGRITY_CHECK.contains(component.getPurl().getType())) {
ComponentRepositoryMetaAnalysisEvent event = integrityRepoMetaAnalysisEvent(component, qm);
if (event != null) {
integrityMetaAnalysisEvents.add(event);
}
}
}
}

var vulnAnalysisState = qm.getWorkflowStateByTokenAndStep(ctx.uploadToken, WorkflowStep.VULN_ANALYSIS);
if (!vulnAnalysisEvents.isEmpty()) {
qm.createVulnerabilityScan(TargetType.PROJECT, ctx.project.getUuid(), ctx.uploadToken.toString(), vulnAnalysisEvents.size());
Expand Down Expand Up @@ -389,6 +403,13 @@ private void processBom(final Context ctx, final File bomFile) throws BomConsump
}
}));

integrityMetaAnalysisEvents.forEach(event -> kafkaEventDispatcher.dispatchAsync(event, (metadata, exception) -> {
if (exception != null) {
// Include context in the log message to make log correlation easier.
LOGGER.error("Failed to produce %s to Kafka (%s)".formatted(event, ctx), exception);
}
}));

// TODO: Trigger index updates
}
}
Expand Down Expand Up @@ -1013,20 +1034,20 @@ public String toString() {

}

private ComponentRepositoryMetaAnalysisEvent createRepoMetaAnalysisEvent(Component component, QueryManager qm) {

private ComponentRepositoryMetaAnalysisEvent integrityRepoMetaAnalysisEvent(Component component, QueryManager qm) {
IntegrityMetaComponent integrityMetaComponent = qm.getIntegrityMetaComponent(component.getPurl().toString());
if (integrityMetaComponent == null) {
qm.getPersistenceManager().makePersistent(AbstractMetaHandler.createIntegrityMetaComponent(component.getPurl().toString()));
return new ComponentRepositoryMetaAnalysisEvent(component.getUuid(), component.getPurl().canonicalize(), component.isInternal(), FetchMeta.FETCH_META_INTEGRITY_DATA_AND_LATEST_VERSION);
qm.createIntegrityMetaHandlingConflict(AbstractMetaHandler.createIntegrityMetaComponent(component.getPurl().toString()));
return new ComponentRepositoryMetaAnalysisEvent(component.getUuid(), component.getPurl().canonicalize(), component.isInternal(), FetchMeta.FETCH_META_INTEGRITY_DATA);
}
if (integrityMetaComponent.getStatus() == null || (integrityMetaComponent.getStatus() == FetchStatus.IN_PROGRESS && (Date.from(Instant.now()).getTime() - integrityMetaComponent.getLastFetch().getTime()) > TIME_SPAN)) {
else if (integrityMetaComponent.getStatus() == null || (integrityMetaComponent.getStatus() == FetchStatus.IN_PROGRESS && (Date.from(Instant.now()).getTime() - integrityMetaComponent.getLastFetch().getTime()) > TIME_SPAN)) {
integrityMetaComponent.setLastFetch(Date.from(Instant.now()));
qm.getPersistenceManager().makePersistent(integrityMetaComponent);
return new ComponentRepositoryMetaAnalysisEvent(component.getUuid(), component.getPurl().canonicalize(), component.isInternal(), FetchMeta.FETCH_META_INTEGRITY_DATA_AND_LATEST_VERSION);
} else {
return new ComponentRepositoryMetaAnalysisEvent(component.getUuid(), component.getPurlCoordinates().canonicalize(), component.isInternal(), FetchMeta.FETCH_META_LATEST_VERSION);
return new ComponentRepositoryMetaAnalysisEvent(component.getUuid(), component.getPurl().canonicalize(), component.isInternal(), FetchMeta.FETCH_META_INTEGRITY_DATA);
}
//don't send event because integrity metadata would be in db already in Processed state or sent recently
//and don't want to send again
return null;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.dependencytrack.persistence;

import org.dependencytrack.AbstractPostgresEnabledTest;
import org.dependencytrack.model.FetchStatus;
import org.dependencytrack.model.IntegrityMetaComponent;
import org.junit.Test;

import java.util.Date;

import static org.assertj.core.api.Assertions.assertThat;

public class IntegrityMetaQueryManagerPostgresTest extends AbstractPostgresEnabledTest {

@Test
public void testCreateIntegrityMetadataHandlingConflict() {
var integrityMeta = new IntegrityMetaComponent();
integrityMeta.setPurl("pkg:maven/acme/[email protected]?type=jar");
integrityMeta.setStatus(FetchStatus.IN_PROGRESS);
integrityMeta.setLastFetch(new Date());
qm.createIntegrityMetaHandlingConflict(integrityMeta);

var integrityMeta2 = new IntegrityMetaComponent();
//inserting same purl twice should not cause exception
integrityMeta2.setPurl("pkg:maven/acme/[email protected]?type=jar");
integrityMeta2.setStatus(FetchStatus.IN_PROGRESS);
integrityMeta2.setLastFetch(new Date());
assertThat(qm.getIntegrityMetaComponentCount()).isEqualTo(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.github.packageurl.PackageURL;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.awaitility.Awaitility;
import org.dependencytrack.AbstractPostgresEnabledTest;
import org.dependencytrack.event.BomUploadEvent;
import org.dependencytrack.event.kafka.KafkaEventDispatcher;
Expand Down Expand Up @@ -100,6 +99,7 @@ public void informTest() throws Exception {
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()),
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()),
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()),
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()),
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name())
);
qm.getPersistenceManager().refresh(project);
Expand Down Expand Up @@ -185,6 +185,7 @@ public void informTestWithComponentAlreadyExistsForIntegrityCheck() throws Excep
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()),
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()),
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()),
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()),
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name())
);
qm.getPersistenceManager().refresh(project);
Expand Down Expand Up @@ -519,7 +520,7 @@ public void informWithBloatedBomTest() throws Exception {
.map(ProducerRecord::topic)
.filter(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()::equals)
.count();
assertThat(repoMetaAnalysisCommandsSent).isEqualTo(9056);
assertThat(repoMetaAnalysisCommandsSent).isEqualTo(18112);
}

@Test // https://github.com/DependencyTrack/dependency-track/issues/2519
Expand Down Expand Up @@ -647,6 +648,7 @@ public void informWithDelayedBomProcessedNotification() throws Exception {
assertThat(notification.getGroup()).isEqualTo(Group.GROUP_BOM_CONSUMED);
},
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()),
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()),
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name())
// BOM_PROCESSED notification should not have been sent.
);
Expand Down