Skip to content

Commit

Permalink
Issue #3619: Migrate runs to archive table according to user/group co…
Browse files Browse the repository at this point in the history
…nfiguration - billing service
  • Loading branch information
ekazachkova committed Aug 6, 2024
1 parent cc18b39 commit 6748684
Show file tree
Hide file tree
Showing 11 changed files with 242 additions and 21 deletions.
13 changes: 10 additions & 3 deletions api/src/main/java/com/epam/pipeline/acl/run/RunApiService.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@
import com.epam.pipeline.manager.cluster.InstanceOfferManager;
import com.epam.pipeline.manager.filter.FilterManager;
import com.epam.pipeline.manager.filter.WrongFilterException;
import com.epam.pipeline.manager.pipeline.*;
import com.epam.pipeline.manager.pipeline.ArchiveRunService;
import com.epam.pipeline.manager.pipeline.PipelineRunAsManager;
import com.epam.pipeline.manager.pipeline.PipelineRunCRUDService;
import com.epam.pipeline.manager.pipeline.PipelineRunDockerOperationManager;
import com.epam.pipeline.manager.pipeline.PipelineRunKubernetesManager;
import com.epam.pipeline.manager.pipeline.PipelineRunManager;
import com.epam.pipeline.manager.pipeline.RunLogManager;
import com.epam.pipeline.manager.pipeline.runner.ConfigurationRunner;
import com.epam.pipeline.manager.security.acl.AclFilter;
import com.epam.pipeline.manager.security.acl.AclMask;
Expand Down Expand Up @@ -224,8 +230,9 @@ public PipelineRun updateTags(final Long runId, final TagsVO tagsVO, final boole

@PreAuthorize(ADMIN_ONLY)
@AclMask
public List<PipelineRun> loadRunsActivityStats(final LocalDateTime start, final LocalDateTime end) {
return runManager.loadRunsActivityStats(start, end);
public List<PipelineRun> loadRunsActivityStats(final LocalDateTime start, final LocalDateTime end,
final boolean archive) {
return runManager.loadRunsActivityStats(start, end, archive);
}

@AclFilter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,13 @@
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.http.MediaType;
import org.springframework.util.Assert;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
Expand Down Expand Up @@ -504,8 +510,9 @@ public Result<List<PipelineRun>> loadRunsActivityStats(
@RequestParam(value = "from") @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
final LocalDateTime start,
@RequestParam(value = "to") @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
final LocalDateTime end) {
return Result.success(runApiService.loadRunsActivityStats(start, end));
final LocalDateTime end,
@RequestParam(defaultValue = "false", required = false) final boolean archive) {
return Result.success(runApiService.loadRunsActivityStats(start, end, archive));
}

@PostMapping(value = "/run/cmd")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public class PipelineRunDao extends NamedParameterJdbcDaoSupport {
private String loadRunByPrettyUrlQuery;
private String updateTagsQuery;
private String loadAllRunsPossiblyActiveInPeriodQuery;
private String loadAllRunsPossiblyActiveInPeriodWithArchiveQuery;
private String loadAllRunsByStatusQuery;
private String loadAllRunsByIdsQuery;
private String loadRunByPodIPQuery;
Expand Down Expand Up @@ -189,7 +190,8 @@ public PipelineRun loadPipelineRun(Long id) {
}

@Transactional(propagation = Propagation.REQUIRED)
public List<PipelineRun> loadPipelineRunsActiveInPeriod(final LocalDateTime start, final LocalDateTime end) {
public List<PipelineRun> loadPipelineRunsActiveInPeriod(final LocalDateTime start, final LocalDateTime end,
final boolean archive) {
final MapSqlParameterSource params = new MapSqlParameterSource();
params.addValue("PERIOD_START", start);
params.addValue("PERIOD_END", end);
Expand All @@ -198,8 +200,11 @@ public List<PipelineRun> loadPipelineRunsActiveInPeriod(final LocalDateTime star
TaskStatus.PAUSED.getId(),
TaskStatus.RESUMING.getId());
params.addValue("TARGET_LAST_STATUSES", targetLastStatuses);
return addServiceUrls(getNamedParameterJdbcTemplate().query(loadAllRunsPossiblyActiveInPeriodQuery,
params, PipelineRunParameters.getRowMapper()));
final String query = archive
? loadAllRunsPossiblyActiveInPeriodWithArchiveQuery
: loadAllRunsPossiblyActiveInPeriodQuery;
return addServiceUrls(getNamedParameterJdbcTemplate()
.query(query, params, PipelineRunParameters.getRowMapper()));
}

public String loadSshPassword(Long id) {
Expand Down Expand Up @@ -1594,6 +1599,12 @@ public void setLoadAllRunsPossiblyActiveInPeriodQuery(final String loadAllRunsPo
this.loadAllRunsPossiblyActiveInPeriodQuery = loadAllRunsPossiblyActiveInPeriodQuery;
}

@Required
public void setLoadAllRunsPossiblyActiveInPeriodWithArchiveQuery(
final String loadAllRunsPossiblyActiveInPeriodWithArchiveQuery) {
this.loadAllRunsPossiblyActiveInPeriodWithArchiveQuery = loadAllRunsPossiblyActiveInPeriodWithArchiveQuery;
}

@Required
public void setLoadAllRunsByStatusQuery(final String loadAllRunsByStatusQuery) {
this.loadAllRunsByStatusQuery = loadAllRunsByStatusQuery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1072,11 +1072,13 @@ public PipelineRun updateTags(final Long runId, final TagsVO newTags, final bool
*
* @param start beginning of evaluating period
* @param end ending of evaluating period
* @param archive optional archived runs loading
* @return run with statuses adjusted
*/
@Transactional(propagation = Propagation.REQUIRED)
public List<PipelineRun> loadRunsActivityStats(final LocalDateTime start, final LocalDateTime end) {
final List<PipelineRun> runs = pipelineRunDao.loadPipelineRunsActiveInPeriod(start, end);
public List<PipelineRun> loadRunsActivityStats(final LocalDateTime start, final LocalDateTime end,
final boolean archive) {
final List<PipelineRun> runs = pipelineRunDao.loadPipelineRunsActiveInPeriod(start, end, archive);
final List<Long> runIds = runs.stream()
.map(BaseEntity::getId)
.collect(Collectors.toList());
Expand Down
130 changes: 130 additions & 0 deletions api/src/main/resources/dao/pipeline-run-dao.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1767,6 +1767,136 @@
]]>
</value>
</property>
<property name="loadAllRunsPossiblyActiveInPeriodWithArchiveQuery">
<value>
<![CDATA[
SELECT
r.run_id as run_id,
pipeline_id,
version,
start_date,
end_date,
parameters,
r.status as status,
terminating,
pod_id,
node_type,
node_disk,
node_ip,
node_name,
node_id,
node_image,
node_cloud_region,
node_platform,
docker_image,
actual_docker_image,
platform,
cmd_template,
actual_cmd,
timeout,
owner,
pod_ip,
commit_status,
last_change_commit_time,
config_name,
node_count,
parent_id,
entities_ids,
is_spot,
configuration_id,
pod_status,
prolonged_at_time,
last_notification_time,
last_idle_notification_time,
last_network_consumption_notification_time,
exec_preferences,
pretty_url,
price_per_hour,
compute_price_per_hour,
disk_price_per_hour,
state_reason,
non_pause,
node_real_disk,
node_cloud_provider,
tags,
sensitive,
kube_service_enabled,
pipeline_name,
cluster_price,
node_pool_id,
node_start_date
FROM
pipeline.pipeline_run r
INNER JOIN (SELECT DISTINCT ON (run_id) run_id, status
FROM pipeline.run_status_change
WHERE date < :PERIOD_END
ORDER BY run_id, date DESC
) as last_statuses
ON r.run_id = last_statuses.run_id
WHERE last_statuses.status IN (:TARGET_LAST_STATUSES)
OR r.end_date BETWEEN :PERIOD_START AND :PERIOD_END
UNION ALL
SELECT
ar.run_id,
ar.pipeline_id,
ar.version,
ar.start_date,
ar.end_date,
ar.parameters,
ar.status,
ar.terminating,
ar.pod_id,
ar.node_type,
ar.node_disk,
ar.node_ip,
ar.node_name,
ar.node_id,
ar.node_image,
ar.node_cloud_region,
ar.node_platform,
ar.docker_image,
ar.actual_docker_image,
ar.platform,
ar.cmd_template,
ar.actual_cmd,
ar.timeout,
ar.owner,
ar.pod_ip,
ar.commit_status,
ar.last_change_commit_time,
ar.config_name,
ar.node_count,
ar.parent_id,
ar.entities_ids,
ar.is_spot,
ar.configuration_id,
ar.pod_status,
ar.prolonged_at_time,
ar.last_notification_time,
ar.last_idle_notification_time,
ar.last_network_consumption_notification_time,
ar.exec_preferences,
ar.pretty_url,
ar.price_per_hour,
ar.compute_price_per_hour,
ar.disk_price_per_hour,
ar.state_reason,
ar.non_pause,
ar.node_real_disk,
ar.node_cloud_provider,
ar.tags,
ar.sensitive,
ar.kube_service_enabled,
ar.pipeline_name,
ar.cluster_price,
ar.node_pool_id,
ar.node_start_date
FROM
pipeline.archive_run ar
WHERE ar.end_date BETWEEN :PERIOD_START AND :PERIOD_END
]]>
</value>
</property>
<property name="updateTagsQuery">
<value>
<![CDATA[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,22 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.epam.pipeline.test.creator.CommonCreatorConstants.*;
import static com.epam.pipeline.test.creator.CommonCreatorConstants.ID;
import static com.epam.pipeline.test.creator.CommonCreatorConstants.ID_2;
import static com.epam.pipeline.utils.PasswordGenerator.generateRandomString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItems;
Expand Down Expand Up @@ -158,6 +168,9 @@ public class PipelineRunDaoTest extends AbstractJdbcTest {
@Autowired
private RunServiceUrlDao runServiceUrlDao;

@Autowired
private ArchiveRunDao archiveRunDao;

@Value("${run.pipeline.init.task.name?:InitializeEnvironment}")
private String initTaskName;

Expand Down Expand Up @@ -211,6 +224,7 @@ public void testLoadPipelineRunsActiveInPeriod() {
createRunWithStartEndDates(afterSyncStart, afterSyncStart.plusHours(6));
createRunWithStartEndDates(beforeSyncStart, null);
createRunWithStartEndDates(afterSyncStart, null);
archiveRunWithStartEndDates(beforeSyncStart, afterSyncStart.plusHours(6));

pipelineRunDao.loadAllRunsForPipeline(testPipeline.getId())
.forEach(run -> {
Expand All @@ -224,8 +238,15 @@ public void testLoadPipelineRunsActiveInPeriod() {
}
});
final List<PipelineRun> pipelineRuns = pipelineRunDao.loadPipelineRunsActiveInPeriod(SYNC_PERIOD_START,
SYNC_PERIOD_END);
SYNC_PERIOD_END,
false);
assertEquals(4, pipelineRuns.size());

final List<PipelineRun> pipelineRunsWithArchive = pipelineRunDao.loadPipelineRunsActiveInPeriod(
SYNC_PERIOD_START,
SYNC_PERIOD_END,
true);
assertEquals(5, pipelineRunsWithArchive.size());
}

@Test
Expand Down Expand Up @@ -1535,6 +1556,15 @@ private void createRunWithStartEndDates(final LocalDateTime startDate, final Loc
pipelineRunDao.createPipelineRun(run);
}

private void archiveRunWithStartEndDates(final LocalDateTime startDate, final LocalDateTime endDate) {
final PipelineRun run = buildPipelineRun(testPipeline.getId(),
TestUtils.convertLocalDateTimeToDate(startDate),
TestUtils.convertLocalDateTimeToDate(endDate));
run.setStatus(TaskStatus.STOPPED);
run.setId(pipelineRunDao.createRunId());
archiveRunDao.batchInsertArchiveRuns(Collections.singletonList(run));
}

private PipelineRun buildRunWithTool(final Long pipelineId, final String prettyUrl, final List<RunSid> sids) {
final PipelineRun pipelineRun = buildPipelineRun(pipelineId);
pipelineRun.setDockerImage(DOCKER_IMAGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,16 +399,18 @@ public void launchPipelineShouldRegisterNotificationsRequestsIfSpecified() {
@Test
public void shouldLoadRunsActivityStats() {
doReturn(asList(getPipelineRun(ID, TEST_USER), getPipelineRun(ID_2, TEST_USER)))
.when(pipelineRunDao).loadPipelineRunsActiveInPeriod(eq(TEST_PERIOD), eq(TEST_PERIOD_18));
.when(pipelineRunDao).loadPipelineRunsActiveInPeriod(eq(TEST_PERIOD), eq(TEST_PERIOD_18), eq(false));
doReturn(getStatusMap()).when(runStatusManager).loadRunStatus(anyListOf(Long.class));

Map<Long, PipelineRun> runMap = pipelineRunManager.loadRunsActivityStats(TEST_PERIOD, TEST_PERIOD_18).stream()
Map<Long, PipelineRun> runMap = pipelineRunManager
.loadRunsActivityStats(TEST_PERIOD, TEST_PERIOD_18, false).stream()
.collect(toMap(BaseEntity::getId, identity()));

assertEquals(asList(TEST_STATUS_1, TEST_STATUS_2), runMap.get(ID).getRunStatuses());
assertEquals(singletonList(TEST_STATUS_3), runMap.get(ID_2).getRunStatuses());

verify(pipelineRunDao).loadPipelineRunsActiveInPeriod(any(LocalDateTime.class), any(LocalDateTime.class));
verify(pipelineRunDao).loadPipelineRunsActiveInPeriod(
any(LocalDateTime.class), any(LocalDateTime.class), anyBoolean());
verify(runStatusManager).loadRunStatus(anyListOf(Long.class));
}

Expand Down
24 changes: 23 additions & 1 deletion api/src/test/java/com/epam/pipeline/test/acl/AclTestBeans.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,29 @@
import com.epam.pipeline.manager.notification.SystemNotificationManager;
import com.epam.pipeline.manager.notification.UserNotificationManager;
import com.epam.pipeline.manager.ontology.OntologyManager;
import com.epam.pipeline.manager.pipeline.*;
import com.epam.pipeline.manager.pipeline.ArchiveRunService;
import com.epam.pipeline.manager.pipeline.DocumentGenerationPropertyManager;
import com.epam.pipeline.manager.pipeline.FolderCrudManager;
import com.epam.pipeline.manager.pipeline.FolderManager;
import com.epam.pipeline.manager.pipeline.FolderTemplateManager;
import com.epam.pipeline.manager.pipeline.ParameterMapper;
import com.epam.pipeline.manager.pipeline.PipelineConfigurationManager;
import com.epam.pipeline.manager.pipeline.PipelineFileGenerationManager;
import com.epam.pipeline.manager.pipeline.PipelineManager;
import com.epam.pipeline.manager.pipeline.PipelineRunAsManager;
import com.epam.pipeline.manager.pipeline.PipelineRunCRUDService;
import com.epam.pipeline.manager.pipeline.PipelineRunDockerOperationManager;
import com.epam.pipeline.manager.pipeline.PipelineRunKubernetesManager;
import com.epam.pipeline.manager.pipeline.PipelineRunManager;
import com.epam.pipeline.manager.pipeline.PipelineVersionManager;
import com.epam.pipeline.manager.pipeline.RestartRunManager;
import com.epam.pipeline.manager.pipeline.RunLogManager;
import com.epam.pipeline.manager.pipeline.RunScheduleManager;
import com.epam.pipeline.manager.pipeline.RunStatusManager;
import com.epam.pipeline.manager.pipeline.StopServerlessRunManager;
import com.epam.pipeline.manager.pipeline.ToolGroupManager;
import com.epam.pipeline.manager.pipeline.ToolManager;
import com.epam.pipeline.manager.pipeline.ToolScanInfoManager;
import com.epam.pipeline.manager.pipeline.runner.ConfigurationProviderManager;
import com.epam.pipeline.manager.pipeline.runner.ConfigurationRunner;
import com.epam.pipeline.manager.preference.PreferenceManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,16 @@
import com.epam.pipeline.dao.notification.NotificationDao;
import com.epam.pipeline.dao.notification.NotificationSettingsDao;
import com.epam.pipeline.dao.notification.NotificationTemplateDao;
import com.epam.pipeline.dao.pipeline.*;
import com.epam.pipeline.dao.pipeline.ArchiveRunDao;
import com.epam.pipeline.dao.pipeline.DocumentGenerationPropertyDao;
import com.epam.pipeline.dao.pipeline.FolderDao;
import com.epam.pipeline.dao.pipeline.PipelineDao;
import com.epam.pipeline.dao.pipeline.PipelineRunDao;
import com.epam.pipeline.dao.pipeline.RestartRunDao;
import com.epam.pipeline.dao.pipeline.RunLogDao;
import com.epam.pipeline.dao.pipeline.RunScheduleDao;
import com.epam.pipeline.dao.pipeline.RunStatusDao;
import com.epam.pipeline.dao.pipeline.StopServerlessRunDao;
import com.epam.pipeline.dao.preference.PreferenceDao;
import com.epam.pipeline.dao.region.CloudRegionDao;
import com.epam.pipeline.dao.run.RunServiceUrlDao;
Expand Down
Loading

0 comments on commit 6748684

Please sign in to comment.