Skip to content

Commit

Permalink
Issue #3619: Migrate runs to archive table according to user/group co…
Browse files Browse the repository at this point in the history
…nfiguration - archive run statuses
  • Loading branch information
ekazachkova committed Aug 8, 2024
1 parent 410d2f8 commit 011302b
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.epam.pipeline.dao.pipeline;

import com.epam.pipeline.entity.pipeline.PipelineRun;
import com.epam.pipeline.entity.pipeline.run.RunStatus;
import lombok.RequiredArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Required;
Expand All @@ -25,26 +26,48 @@
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.util.Collection;
import java.util.List;

@RequiredArgsConstructor
public class ArchiveRunDao extends NamedParameterJdbcDaoSupport {

private final PipelineRunDao pipelineRunDao;

private String createArchiveRunsQuery;
private String createArchiveRunQuery;
private String createArchiveRunStatusChangeQuery;

@Transactional(propagation = Propagation.MANDATORY)
public void batchInsertArchiveRuns(final List<PipelineRun> runs) {
if (CollectionUtils.isEmpty(runs)) {
return;
}
final MapSqlParameterSource[] params = pipelineRunDao.getParamsForBatchUpdate(runs);
getNamedParameterJdbcTemplate().batchUpdate(createArchiveRunsQuery, params);
getNamedParameterJdbcTemplate().batchUpdate(createArchiveRunQuery, params);
}

@Transactional(propagation = Propagation.MANDATORY)
public void batchInsertArchiveRunsStatusChange(final List<RunStatus> runStatuses) {
if (CollectionUtils.isEmpty(runStatuses)) {
return;
}
final MapSqlParameterSource[] params = getRunStatusParamsForBatchUpdate(runStatuses);
getNamedParameterJdbcTemplate().batchUpdate(createArchiveRunStatusChangeQuery, params);
}

private MapSqlParameterSource[] getRunStatusParamsForBatchUpdate(final Collection<RunStatus> runStatuses) {
return runStatuses.stream()
.map(RunStatusDao.RunStatusParameters::getParameters)
.toArray(MapSqlParameterSource[]::new);
}

@Required
public void setCreateArchiveRunQuery(final String createArchiveRunQuery) {
this.createArchiveRunQuery = createArchiveRunQuery;
}

@Required
public void setCreateArchiveRunsQuery(final String createArchiveRunsQuery) {
this.createArchiveRunsQuery = createArchiveRunsQuery;
public void setCreateArchiveRunStatusChangeQuery(final String createArchiveRunStatusChangeQuery) {
this.createArchiveRunStatusChangeQuery = createArchiveRunStatusChangeQuery;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public void archiveRunsForOwners(final Map<String, Date> ownersAndDates) {

log.debug("Transferring '{}' runs to archive.", runsToArchive.size());
archiveRunDao.batchInsertArchiveRuns(runsToArchive);
archiveRunDao.batchInsertArchiveRunsStatusChange(runStatusDao.loadRunStatus(runIds));
deleteRunsAndDependents(runIds);
totalArchivedRuns += runIds.size();

Expand Down
18 changes: 17 additions & 1 deletion api/src/main/resources/dao/archive-run-dao.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean class="com.epam.pipeline.dao.pipeline.ArchiveRunDao" id="archiveRunDao" autowire="byName">
<property name="createArchiveRunsQuery">
<property name="createArchiveRunQuery">
<value>
<![CDATA[
INSERT INTO pipeline.archive_run (
Expand Down Expand Up @@ -129,5 +129,21 @@
]]>
</value>
</property>
<property name="createArchiveRunStatusChangeQuery">
<value>
<![CDATA[
INSERT INTO pipeline.archive_run_status_change (
run_id,
status,
reason,
date)
VALUES (
:RUN_ID,
:STATUS,
:REASON,
:DATE)
]]>
</value>
</property>
</bean>
</beans>

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE IF NOT EXISTS pipeline.archive_run (LIKE pipeline.pipeline_run INCLUDING ALL);
CREATE SEQUENCE pipeline.s_archive_run START WITH 1 INCREMENT BY 1;
ALTER TABLE pipeline.archive_run DROP CONSTRAINT archive_run_pkey;
CREATE INDEX archive_run_end_date_idx ON pipeline.archive_run (end_date);
ALTER TABLE pipeline.archive_run ADD COLUMN ID BIGINT NOT NULL PRIMARY KEY DEFAULT NEXTVAL('pipeline.s_archive_run');

CREATE TABLE IF NOT EXISTS pipeline.archive_run_status_change (LIKE pipeline.run_status_change INCLUDING ALL);
CREATE SEQUENCE pipeline.s_archive_run_status_change START WITH 1 INCREMENT BY 1;
ALTER TABLE pipeline.archive_run_status_change ADD COLUMN ID BIGINT NOT NULL PRIMARY KEY DEFAULT NEXTVAL('pipeline.s_archive_run_status_change');
CREATE INDEX archive_run_status_change_run_id_idx ON pipeline.archive_run_status_change (run_id);
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,28 @@

import com.epam.pipeline.entity.pipeline.PipelineRun;
import com.epam.pipeline.entity.pipeline.TaskStatus;
import com.epam.pipeline.entity.pipeline.run.RunStatus;
import com.epam.pipeline.test.jdbc.AbstractJdbcTest;
import com.epam.pipeline.util.TestUtils;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;
import java.util.Arrays;

@Transactional
public class ArchiveRunDaoTest extends AbstractJdbcTest {
private static final String USER = "OWNER";
private static final String POD = "pod-id";
private static final String TEST = "test";

@Autowired
private PipelineRunDao pipelineRunDao;
@Autowired
private ArchiveRunDao archiveRunDao;
@Autowired
private RunStatusDao runStatusDao;

@Test
public void shouldBatchInsertArchiveRuns() {
Expand All @@ -46,6 +51,25 @@ public void shouldBatchInsertArchiveRuns() {
archiveRunDao.batchInsertArchiveRuns(Arrays.asList(run1, run2));
}

@Test
public void shouldBatchInsertArchiveRunsStatusChange() {
final PipelineRun run1 = run();
pipelineRunDao.createPipelineRun(run1);
final PipelineRun run2 = run();
pipelineRunDao.createPipelineRun(run2);

final RunStatus.RunStatusBuilder runStatusBuilder = RunStatus.builder()
.status(TaskStatus.PAUSED)
.reason(TEST)
.timestamp(LocalDateTime.now());
final RunStatus runStatus1 = runStatusBuilder.runId(run1.getId()).build();
final RunStatus runStatus2 = runStatusBuilder.runId(run2.getId()).build();
runStatusDao.saveStatus(runStatus1);
runStatusDao.saveStatus(runStatus2);

archiveRunDao.batchInsertArchiveRunsStatusChange(Arrays.asList(runStatus1, runStatus2));
}

private PipelineRun run() {
return TestUtils.createPipelineRun(null, null, TaskStatus.RUNNING, USER,
null, null, true, null, null, POD, 1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
import com.epam.pipeline.common.MessageHelper;
import com.epam.pipeline.controller.vo.EntityVO;
import com.epam.pipeline.dao.metadata.MetadataDao;
import com.epam.pipeline.dao.pipeline.*;
import com.epam.pipeline.dao.pipeline.ArchiveRunDao;
import com.epam.pipeline.dao.pipeline.PipelineRunDao;
import com.epam.pipeline.dao.pipeline.RestartRunDao;
import com.epam.pipeline.dao.pipeline.RunLogDao;
import com.epam.pipeline.dao.pipeline.RunStatusDao;
import com.epam.pipeline.dao.pipeline.StopServerlessRunDao;
import com.epam.pipeline.dao.run.RunServiceUrlDao;
import com.epam.pipeline.entity.metadata.MetadataEntry;
import com.epam.pipeline.entity.metadata.PipeConfValue;
Expand Down

0 comments on commit 011302b

Please sign in to comment.