Skip to content

Commit

Permalink
Issue #3619: Migrate runs to archive table according to user/group co…
Browse files Browse the repository at this point in the history
…nfiguration - fix for async and transactions
  • Loading branch information
ekazachkova committed Aug 9, 2024
1 parent 9cdc072 commit 43e640f
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 75 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2024 EPAM Systems, Inc. (https://www.epam.com/)
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.epam.pipeline.manager.pipeline;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.List;
import java.util.Map;

@Service
@Slf4j
@RequiredArgsConstructor
public class ArchiveRunAsynchronousService {

private final ArchiveRunCoreService archiveRunCoreService;

@Async("archiveRunExecutor")
public void archiveRunsAsynchronous(final Map<String, Date> ownersAndDates, final List<Long> terminalStates,
final Integer chunkSize) {
archiveRunCoreService.archiveRuns(ownersAndDates, terminalStates, chunkSize);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2024 EPAM Systems, Inc. (https://www.epam.com/)
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.epam.pipeline.manager.pipeline;

import com.epam.pipeline.dao.pipeline.ArchiveRunDao;
import com.epam.pipeline.dao.pipeline.PipelineRunDao;
import com.epam.pipeline.dao.pipeline.RestartRunDao;
import com.epam.pipeline.dao.pipeline.RunLogDao;
import com.epam.pipeline.dao.pipeline.RunStatusDao;
import com.epam.pipeline.dao.pipeline.StopServerlessRunDao;
import com.epam.pipeline.dao.run.RunServiceUrlDao;
import com.epam.pipeline.entity.pipeline.PipelineRun;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.util.Date;
import java.util.List;
import java.util.Map;

import static java.util.stream.Collectors.toList;

@Service
@Slf4j
@RequiredArgsConstructor
public class ArchiveRunCoreService {
private final ArchiveRunDao archiveRunDao;
private final PipelineRunDao pipelineRunDao;
private final RunLogDao runLogDao;
private final RestartRunDao restartRunDao;
private final RunServiceUrlDao runServiceUrlDao;
private final RunStatusDao runStatusDao;
private final StopServerlessRunDao stopServerlessRunDao;

@Transactional(propagation = Propagation.REQUIRED)
public void archiveRuns(final Map<String, Date> ownersAndDates, final List<Long> terminalStates,
final Integer chunkSize) {
List<PipelineRun> runsToArchive = ListUtils.emptyIfNull(pipelineRunDao
.loadRunsByOwnerAndEndDateBeforeAndStatusIn(ownersAndDates, terminalStates, chunkSize));

if (CollectionUtils.isEmpty(runsToArchive)) {
log.debug("No runs found to archive.");
return;
}

int totalArchivedRuns = 0;
while (!runsToArchive.isEmpty()) {
final List<Long> runIds = runsToArchive.stream().map(PipelineRun::getId).collect(toList());
final List<PipelineRun> children = ListUtils.emptyIfNull(pipelineRunDao.loadRunsByParentRuns(runIds));
runsToArchive.addAll(children);
runIds.addAll(children.stream().map(PipelineRun::getId).collect(toList()));

log.debug("Transferring '{}' runs to archive.", runsToArchive.size());
archiveRunDao.batchInsertArchiveRuns(runsToArchive);
archiveRunDao.batchInsertArchiveRunsStatusChange(runStatusDao.loadRunStatus(runIds, false));
deleteRunsAndDependents(runIds);
totalArchivedRuns += runIds.size();

runsToArchive = ListUtils.emptyIfNull(pipelineRunDao
.loadRunsByOwnerAndEndDateBeforeAndStatusIn(ownersAndDates, terminalStates, chunkSize));
}
log.debug("Transferring runs to archive completed. Total archived runs count: '{}'", totalArchivedRuns);
}

private void deleteRunsAndDependents(final List<Long> runIds) {
pipelineRunDao.deleteRunSidsByRunIdIn(runIds);
runLogDao.deleteTaskByRunIdsIn(runIds);
restartRunDao.deleteRestartRunByIdsIn(runIds);
runServiceUrlDao.deleteByRunIdsIn(runIds);
runStatusDao.deleteRunStatusByRunIdsIn(runIds);
stopServerlessRunDao.deleteByRunIdIn(runIds);
pipelineRunDao.deleteRunByIdIn(runIds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,8 @@
import com.epam.pipeline.common.MessageHelper;
import com.epam.pipeline.controller.vo.EntityVO;
import com.epam.pipeline.dao.metadata.MetadataDao;
import com.epam.pipeline.dao.pipeline.ArchiveRunDao;
import com.epam.pipeline.dao.pipeline.PipelineRunDao;
import com.epam.pipeline.dao.pipeline.RestartRunDao;
import com.epam.pipeline.dao.pipeline.RunLogDao;
import com.epam.pipeline.dao.pipeline.RunStatusDao;
import com.epam.pipeline.dao.pipeline.StopServerlessRunDao;
import com.epam.pipeline.dao.run.RunServiceUrlDao;
import com.epam.pipeline.entity.metadata.MetadataEntry;
import com.epam.pipeline.entity.metadata.PipeConfValue;
import com.epam.pipeline.entity.pipeline.PipelineRun;
import com.epam.pipeline.entity.pipeline.TaskStatus;
import com.epam.pipeline.entity.security.acl.AclClass;
import com.epam.pipeline.entity.user.ExtendedRole;
Expand All @@ -42,14 +34,10 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.math3.util.Pair;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;

import java.util.ArrayList;
Expand Down Expand Up @@ -78,13 +66,7 @@ public class ArchiveRunService {
private final MetadataDao metadataDao;
private final UserManager userManager;
private final RoleManager roleManager;
private final ArchiveRunDao archiveRunDao;
private final PipelineRunDao pipelineRunDao;
private final RunLogDao runLogDao;
private final RestartRunDao restartRunDao;
private final RunServiceUrlDao runServiceUrlDao;
private final RunStatusDao runStatusDao;
private final StopServerlessRunDao stopServerlessRunDao;
private final ArchiveRunAsynchronousService archiveRunAsyncService;

public void archiveRuns(final String identifier, final boolean principal, final Integer days) {
final String metadataKey = preferenceManager.getPreference(SystemPreferences.SYSTEM_ARCHIVE_RUN_METADATA_KEY);
Expand All @@ -104,9 +86,7 @@ public void archiveRuns() {
archiveRunsForOwners(ownersAndDates);
}

@Transactional(propagation = Propagation.REQUIRED)
@Async("archiveRunExecutor")
public void archiveRunsForOwners(final Map<String, Date> ownersAndDates) {
private void archiveRunsForOwners(final Map<String, Date> ownersAndDates) {
if (MapUtils.isEmpty(ownersAndDates)) {
log.debug("No run owners found to archive runs.");
return;
Expand All @@ -119,31 +99,7 @@ public void archiveRunsForOwners(final Map<String, Date> ownersAndDates) {

final Integer chunkSize = preferenceManager.getPreference(SystemPreferences.SYSTEM_ARCHIVE_RUN_CHUNK_SIZE);

List<PipelineRun> runsToArchive = ListUtils.emptyIfNull(pipelineRunDao
.loadRunsByOwnerAndEndDateBeforeAndStatusIn(ownersAndDates, terminalStates, chunkSize));

if (CollectionUtils.isEmpty(runsToArchive)) {
log.debug("No runs found to archive.");
return;
}

int totalArchivedRuns = 0;
while (!runsToArchive.isEmpty()) {
final List<Long> runIds = runsToArchive.stream().map(PipelineRun::getId).collect(toList());
final List<PipelineRun> children = ListUtils.emptyIfNull(pipelineRunDao.loadRunsByParentRuns(runIds));
runsToArchive.addAll(children);
runIds.addAll(children.stream().map(PipelineRun::getId).collect(toList()));

log.debug("Transferring '{}' runs to archive.", runsToArchive.size());
archiveRunDao.batchInsertArchiveRuns(runsToArchive);
archiveRunDao.batchInsertArchiveRunsStatusChange(runStatusDao.loadRunStatus(runIds, false));
deleteRunsAndDependents(runIds);
totalArchivedRuns += runIds.size();

runsToArchive = ListUtils.emptyIfNull(pipelineRunDao
.loadRunsByOwnerAndEndDateBeforeAndStatusIn(ownersAndDates, terminalStates, chunkSize));
}
log.debug("Transferring runs to archive completed. Total archived runs count: '{}'", totalArchivedRuns);
archiveRunAsyncService.archiveRunsAsynchronous(ownersAndDates, terminalStates, chunkSize);
}

private Integer metadataToDays(final Map<String, PipeConfValue> metadata, final String key,
Expand All @@ -164,16 +120,6 @@ private Date daysToDate(final Optional<Integer> optionalDays) {
return DateUtils.convertLocalDateTimeToDate(DateUtils.nowUTC().minusDays(days));
}

private void deleteRunsAndDependents(final List<Long> runIds) {
pipelineRunDao.deleteRunSidsByRunIdIn(runIds);
runLogDao.deleteTaskByRunIdsIn(runIds);
restartRunDao.deleteRestartRunByIdsIn(runIds);
runServiceUrlDao.deleteByRunIdsIn(runIds);
runStatusDao.deleteRunStatusByRunIdsIn(runIds);
stopServerlessRunDao.deleteByRunIdIn(runIds);
pipelineRunDao.deleteRunByIdIn(runIds);
}

private Map<String, Date> findAllOwnersAndDates(final String metadataKey) {
final Map<Long, Optional<Integer>> daysByUser = findAllOwnersByRole(metadataKey);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@
import com.epam.pipeline.common.MessageHelper;
import com.epam.pipeline.controller.vo.EntityVO;
import com.epam.pipeline.dao.metadata.MetadataDao;
import com.epam.pipeline.dao.pipeline.ArchiveRunDao;
import com.epam.pipeline.dao.pipeline.PipelineRunDao;
import com.epam.pipeline.dao.pipeline.RestartRunDao;
import com.epam.pipeline.dao.pipeline.RunLogDao;
import com.epam.pipeline.dao.pipeline.RunStatusDao;
import com.epam.pipeline.dao.pipeline.StopServerlessRunDao;
import com.epam.pipeline.dao.run.RunServiceUrlDao;
import com.epam.pipeline.entity.metadata.MetadataEntry;
import com.epam.pipeline.entity.metadata.PipeConfValue;
import com.epam.pipeline.entity.security.acl.AclClass;
Expand Down Expand Up @@ -75,16 +68,9 @@ public class ArchiveRunServiceUnitTest {
private final MetadataDao metadataDao = mock(MetadataDao.class);
private final UserManager userManager = mock(UserManager.class);
private final RoleManager roleManager = mock(RoleManager.class);
private final ArchiveRunDao archiveRunDao = mock(ArchiveRunDao.class);
private final PipelineRunDao pipelineRunDao = mock(PipelineRunDao.class);
private final RunLogDao runLogDao = mock(RunLogDao.class);
private final RestartRunDao restartRunDao = mock(RestartRunDao.class);
private final RunServiceUrlDao runServiceUrlDao = mock(RunServiceUrlDao.class);
private final RunStatusDao runStatusDao = mock(RunStatusDao.class);
private final StopServerlessRunDao stopServerlessRunDao = mock(StopServerlessRunDao.class);
private final ArchiveRunAsynchronousService archiveRunAsyncService = mock(ArchiveRunAsynchronousService.class);
private final ArchiveRunService archiveRunService = new ArchiveRunService(preferenceManager, messageHelper,
metadataDao, userManager, roleManager, archiveRunDao, pipelineRunDao, runLogDao, restartRunDao,
runServiceUrlDao, runStatusDao, stopServerlessRunDao);
metadataDao, userManager, roleManager, archiveRunAsyncService);

@Before
public void setUp() {
Expand Down Expand Up @@ -173,7 +159,7 @@ public void shouldNotArchiveRunsForGroupIfNoOwnersFound() {

archiveRunService.archiveRuns(GROUP1, false, INPUT_DAYS);

notInvoked(pipelineRunDao).loadRunsByOwnerAndEndDateBeforeAndStatusIn(any(), any(), anyInt());
notInvoked(archiveRunAsyncService).archiveRunsAsynchronous(any(), any(), anyInt());
}

@Test
Expand Down Expand Up @@ -218,7 +204,7 @@ public void shouldUseUsersDaysToArchiveRuns() {

private void verifyDays(final int expectedDays) {
final ArgumentCaptor<Map<String, Date>> argument = ArgumentCaptor.forClass((Class) Map.class);
verify(pipelineRunDao).loadRunsByOwnerAndEndDateBeforeAndStatusIn(argument.capture(), any(), anyInt());
verify(archiveRunAsyncService).archiveRunsAsynchronous(argument.capture(), any(), any());
final Map<String, Date> results = argument.getValue();
assertThat(results).hasSize(1);
assertDays(results.get(USER1), expectedDays);
Expand Down

0 comments on commit 43e640f

Please sign in to comment.