-
-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: mehab <[email protected]>
- Loading branch information
Showing
14 changed files
with
562 additions
and
87 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
97 changes: 97 additions & 0 deletions
97
src/main/java/org/dependencytrack/event/IntegrityMetaInitializer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
package org.dependencytrack.event; | ||
|
||
import alpine.Config; | ||
import alpine.common.logging.Logger; | ||
import net.javacrumbs.shedlock.core.LockConfiguration; | ||
import net.javacrumbs.shedlock.core.LockExtender; | ||
import net.javacrumbs.shedlock.core.LockingTaskExecutor; | ||
import org.dependencytrack.common.ConfigKey; | ||
import org.dependencytrack.event.kafka.KafkaEventDispatcher; | ||
import org.dependencytrack.model.IntegrityMetaComponent; | ||
import org.dependencytrack.persistence.QueryManager; | ||
import org.dependencytrack.util.LockProvider; | ||
|
||
import javax.servlet.ServletContextEvent; | ||
import javax.servlet.ServletContextListener; | ||
import java.time.Duration; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
import static org.dependencytrack.tasks.LockName.INTEGRITY_META_INITIALIZER_TASK_LOCK; | ||
import static org.dependencytrack.util.LockProvider.isLockToBeExtended; | ||
|
||
public class IntegrityMetaInitializer implements ServletContextListener { | ||
|
||
private static final Logger LOGGER = Logger.getLogger(IntegrityMetaInitializer.class); | ||
private final KafkaEventDispatcher kafkaEventDispatcher = new KafkaEventDispatcher(); | ||
private final boolean integrityInitializerEnabled; | ||
|
||
public IntegrityMetaInitializer() { | ||
this(Config.getInstance().getPropertyAsBoolean(ConfigKey.INTEGRITY_INITIALIZER_ENABLED)); | ||
} | ||
|
||
IntegrityMetaInitializer(final boolean integrityInitializerEnabled) { | ||
this.integrityInitializerEnabled = integrityInitializerEnabled; | ||
} | ||
|
||
|
||
@Override | ||
public void contextInitialized(final ServletContextEvent event) { | ||
if (integrityInitializerEnabled) { | ||
try { | ||
LockProvider.executeWithLock(INTEGRITY_META_INITIALIZER_TASK_LOCK, (LockingTaskExecutor.Task) () -> process()); | ||
} catch (Throwable e) { | ||
throw new RuntimeException("An unexpected error occurred while running Initializer for integrity meta", e); | ||
} | ||
} else { | ||
LOGGER.info("Component integrity initializer is disabled."); | ||
} | ||
} | ||
|
||
private void process() { | ||
LOGGER.info("Initializing integrity meta component sync"); | ||
try (final var qm = new QueryManager()) { | ||
if (qm.getIntegrityMetaComponentCount() == 0) { | ||
// Sync purls from Component only if IntegrityMetaComponent is empty | ||
qm.synchronizeIntegrityMetaComponent(); | ||
} | ||
// dispatch purls not processed yet | ||
batchProcessPurls(qm); | ||
} | ||
} | ||
|
||
private void batchProcessPurls(QueryManager qm) { | ||
LockConfiguration lockConfiguration = LockProvider.getLockConfigurationByLockName(INTEGRITY_META_INITIALIZER_TASK_LOCK); | ||
long offset = 0; | ||
long startTime = System.currentTimeMillis(); | ||
List<String> purls = qm.fetchNextPurlsPage(offset); | ||
while (!purls.isEmpty()) { | ||
long cumulativeProcessingTime = System.currentTimeMillis() - startTime; | ||
if(isLockToBeExtended(cumulativeProcessingTime, INTEGRITY_META_INITIALIZER_TASK_LOCK)) { | ||
LockExtender.extendActiveLock(Duration.ofMinutes(5).plus(lockConfiguration.getLockAtLeastFor()), lockConfiguration.getLockAtLeastFor()); | ||
} | ||
dispatchPurls(qm, purls); | ||
updateIntegrityMetaForPurls(qm, purls); | ||
offset += purls.size(); | ||
purls = qm.fetchNextPurlsPage(offset); | ||
} | ||
} | ||
|
||
private void updateIntegrityMetaForPurls(QueryManager qm, List<String> purls) { | ||
List<IntegrityMetaComponent> purlRecords = new ArrayList<>(); | ||
for (var purl : purls) { | ||
purlRecords.add(qm.getIntegrityMetaComponent(purl)); | ||
} | ||
qm.batchUpdateIntegrityMetaComponent(purlRecords); | ||
} | ||
|
||
private void dispatchPurls(QueryManager qm, List<String> purls) { | ||
for (final var purl : purls) { | ||
ComponentProjection componentProjection = qm.getComponentByPurl(purl); | ||
kafkaEventDispatcher.dispatchAsync(new ComponentRepositoryMetaAnalysisEvent(componentProjection.purlCoordinates, componentProjection.internal)); | ||
} | ||
} | ||
|
||
public record ComponentProjection(String purlCoordinates, Boolean internal) { | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
162 changes: 162 additions & 0 deletions
162
src/main/java/org/dependencytrack/persistence/IntegrityMetaQueryManager.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
package org.dependencytrack.persistence; | ||
|
||
import alpine.common.logging.Logger; | ||
import alpine.resources.AlpineRequest; | ||
import alpine.server.util.DbUtil; | ||
import org.dependencytrack.model.FetchStatus; | ||
import org.dependencytrack.model.IntegrityMetaComponent; | ||
|
||
import javax.jdo.PersistenceManager; | ||
import javax.jdo.Query; | ||
import java.sql.Connection; | ||
import java.sql.PreparedStatement; | ||
import java.sql.Timestamp; | ||
import java.time.Instant; | ||
import java.time.temporal.ChronoUnit; | ||
import java.util.Date; | ||
import java.util.List; | ||
|
||
public class IntegrityMetaQueryManager extends QueryManager implements IQueryManager { | ||
|
||
private static final Logger LOGGER = Logger.getLogger(IntegrityMetaQueryManager.class); | ||
|
||
/** | ||
* Constructs a new QueryManager. | ||
* @param pm a PersistenceManager object | ||
*/ | ||
IntegrityMetaQueryManager(final PersistenceManager pm) { | ||
super(pm); | ||
} | ||
|
||
/** | ||
* Constructs a new QueryManager. | ||
* @param pm a PersistenceManager object | ||
* @param request an AlpineRequest object | ||
*/ | ||
IntegrityMetaQueryManager(final PersistenceManager pm, final AlpineRequest request) { | ||
super(pm, request); | ||
} | ||
|
||
/** | ||
* Returns a IntegrityMetaComponent object from the specified purl. | ||
* | ||
* @param purl the Package URL string of the component | ||
* @return a IntegrityMetaComponent object, or null if not found | ||
*/ | ||
public IntegrityMetaComponent getIntegrityMetaComponent(String purl) { | ||
final Query<IntegrityMetaComponent> query = pm.newQuery(IntegrityMetaComponent.class, "purl == :purl"); | ||
query.setParameters(purl); | ||
return query.executeUnique(); | ||
} | ||
|
||
/** | ||
* Updates a IntegrityMetaComponent record. | ||
* | ||
* @param transientIntegrityMetaComponent the IntegrityMetaComponent object to synchronize | ||
* @return a synchronized IntegrityMetaComponent object | ||
*/ | ||
public synchronized IntegrityMetaComponent updateIntegrityMetaComponent(final IntegrityMetaComponent transientIntegrityMetaComponent) { | ||
final IntegrityMetaComponent integrityMeta = getIntegrityMetaComponent(transientIntegrityMetaComponent.getPurl()); | ||
if (integrityMeta != null) { | ||
integrityMeta.setMd5(transientIntegrityMetaComponent.getMd5()); | ||
integrityMeta.setSha1(transientIntegrityMetaComponent.getSha1()); | ||
integrityMeta.setSha256(transientIntegrityMetaComponent.getSha256()); | ||
integrityMeta.setPublishedAt(transientIntegrityMetaComponent.getPublishedAt()); | ||
integrityMeta.setStatus(transientIntegrityMetaComponent.getStatus()); | ||
integrityMeta.setLastFetch(Date.from(Instant.now())); | ||
return persist(integrityMeta); | ||
} else { | ||
LOGGER.debug("No record found in IntegrityMetaComponent for purl " + transientIntegrityMetaComponent.getPurl()); | ||
return null; | ||
} | ||
} | ||
|
||
/** | ||
* Synchronizes IntegrityMetaComponent with purls from COMPONENT. This is part of initializer. | ||
*/ | ||
public synchronized void synchronizeIntegrityMetaComponent() { | ||
final String purlSyncQuery = """ | ||
INSERT INTO "INTEGRITY_META_COMPONENT" ("PURL") | ||
SELECT DISTINCT "PURL" | ||
FROM "COMPONENT" | ||
WHERE "PURL" IS NOT NULL | ||
"""; | ||
Connection connection = null; | ||
PreparedStatement preparedStatement = null; | ||
try { | ||
connection = (Connection) pm.getDataStoreConnection(); | ||
preparedStatement = connection.prepareStatement(purlSyncQuery); | ||
var purlCount = preparedStatement.executeUpdate(); | ||
LOGGER.info("Number of component purls synchronized for integrity check : " + purlCount); | ||
} catch (Exception ex) { | ||
LOGGER.error("Error in synchronizing component purls for integrity meta.", ex); | ||
throw new RuntimeException(ex); | ||
} finally { | ||
DbUtil.close(preparedStatement); | ||
DbUtil.close(connection); | ||
} | ||
} | ||
|
||
/** | ||
* Returns the count of records in IntegrityMetaComponent. | ||
* | ||
* @return the count of records | ||
*/ | ||
public long getIntegrityMetaComponentCount() { | ||
try (final Query<IntegrityMetaComponent> query = pm.newQuery(IntegrityMetaComponent.class)) { | ||
query.setResult("count(this)"); | ||
return query.executeResultUnique(Long.class); | ||
} catch (Exception e) { | ||
LOGGER.error("Error in getting count of integrity meta.", e); | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
/** | ||
* Returns the list of purls in IntegrityMetaComponent which are not yet processed. | ||
* | ||
* @return the list of purls | ||
*/ | ||
public List<String> fetchNextPurlsPage(long offset) { | ||
try (final Query<IntegrityMetaComponent> query = | ||
pm.newQuery(IntegrityMetaComponent.class, "status == null || (status == :inProgress && lastFetch < :latest)")) { | ||
query.setParameters(FetchStatus.IN_PROGRESS, Date.from(Instant.now().minus(1, ChronoUnit.HOURS))); | ||
query.setRange(offset, offset + 5000); | ||
query.setResult("purl"); | ||
return List.copyOf(query.executeResultList(String.class)); | ||
} catch (Exception e) { | ||
LOGGER.error("Error in getting purls from integrity meta.", e); | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
/** | ||
* Batch updates IntegrityMetaComponent records | ||
*/ | ||
public void batchUpdateIntegrityMetaComponent(List<IntegrityMetaComponent> purls) { | ||
final String updateQuery = """ | ||
UPDATE "INTEGRITY_META_COMPONENT" | ||
SET "LAST_FETCH" = ?, "STATUS" = ? | ||
WHERE "ID" = ? | ||
"""; | ||
Connection connection = null; | ||
PreparedStatement preparedStatement = null; | ||
try { | ||
connection = (Connection) pm.getDataStoreConnection(); | ||
preparedStatement = connection.prepareStatement(updateQuery); | ||
for (var purlRecord : purls) { | ||
preparedStatement.setTimestamp(1, new Timestamp(Date.from(Instant.now()).getTime())); | ||
preparedStatement.setString(2, FetchStatus.IN_PROGRESS.toString()); | ||
preparedStatement.setLong(3, purlRecord.getId()); | ||
preparedStatement.addBatch(); | ||
} | ||
preparedStatement.executeBatch(); | ||
} catch (Exception ex) { | ||
LOGGER.error("Error in batch updating integrity meta.", ex); | ||
throw new RuntimeException(ex); | ||
} finally { | ||
DbUtil.close(preparedStatement); | ||
DbUtil.close(connection); | ||
} | ||
} | ||
} |
Oops, something went wrong.