Skip to content

Commit

Permalink
Issue 3602 pod network bandwidth usage action (#3621)
Browse files Browse the repository at this point in the history
  • Loading branch information
okolesn authored Aug 21, 2024
1 parent e6b4a1a commit 9d77b53
Show file tree
Hide file tree
Showing 16 changed files with 442 additions and 3 deletions.
2 changes: 2 additions & 0 deletions api/profiles/dev/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ commit.run.script.starter.url=
docker.registry.login.script=
container.layers.script.url=
container.size.script.url=
limit.run.bandwidth.script.url=


#pause/resume run scripts
pause.run.script.url=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,4 +398,9 @@ public void archiveRuns() {
public void archiveRuns(final String name, final boolean principal, final Integer days) {
archiveRunService.archiveRuns(name, principal, days);
}

@PreAuthorize(ADMIN_ONLY)
public void setLimitBoundary(final Long runId, final Boolean enable, final Integer boundary) {
runManager.setLimitBoundary(runId, enable, boundary);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public final class MessageConstants {
public static final String ERROR_RUN_PIPELINES_COMMIT_FAILED = "error.run.pipeline.commit.failed";
public static final String ERROR_GET_CONTAINER_LAYERS_COUNT_FAILED = "error.container.layers.count.failed";
public static final String ERROR_GET_CONTAINER_SIZE_FAILED = "error.container.size.failed";
public static final String ERROR_LIMIT_NETWORK_BANDWIDTH_FAILED = "error.limit.network.bandwidth.failed";
public static final String ERROR_CONTAINER_ID_FOR_RUN_NOT_FOUND = "error.container.id.for.run.not.found";
public static final String INFO_EXECUTE_COMMIT_RUN_PIPELINES = "info.execute.ssh.run.pipeline.command";
public static final String ERROR_RUN_PIPELINES_PAUSE_FAILED = "error.run.pipeline.pause.failed";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,4 +620,18 @@ public Result<Boolean> archiveRunsByOwner(@RequestParam final String ownerSid,
runApiService.archiveRuns(ownerSid, principal, days);
return Result.success(true);
}

@PostMapping("/run/{runId}/network/limit")
@ApiOperation(
value = "Set limit boundary",
notes = "Sets a special tag for a run based on boundary param: NETWORK_LIMIT: <boundary> (Bytes/s) " +
"in case of enable = true, otherwise removes the tag.",
produces = MediaType.APPLICATION_JSON_VALUE)
@ApiResponses(value = {@ApiResponse(code = HTTP_STATUS_OK, message = API_STATUS_DESCRIPTION)})
public Result<Boolean> setLimitBoundary(@PathVariable(value = RUN_ID) final Long runId,
@RequestParam(defaultValue = "true") final Boolean enable,
@RequestParam(required = false) final Integer boundary) {
runApiService.setLimitBoundary(runId, enable, boundary);
return Result.success();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public class DockerContainerOperationManager {
private static final String GLOBAL_KNOWN_HOSTS_FILE = "GlobalKnownHostsFile=/dev/null";
private static final String USER_KNOWN_HOSTS_FILE = "UserKnownHostsFile=/dev/null";
private static final String COMMIT_COMMAND_DESCRIPTION = "ssh session to commit pipeline run";
private static final String LIMIT_NETWORK_BANDWIDTH_COMMAND_DESCRIPTION =
"ssh session to limit pipeline run network bandwidth";
private static final String CONTAINER_LAYERS_COUNT_COMMAND_DESCRIPTION =
"ssh session to get docker container layers count";
private static final String PAUSE_COMMAND_DESCRIPTION = "Error is occurred during to pause pipeline run";
Expand All @@ -103,6 +105,7 @@ public class DockerContainerOperationManager {
private static final String CP_NODE_SSH_PORT = "CP_NODE_SSH_PORT";

public static final String PAUSE_RUN_TASK = "PausePipelineRun";
private static final int BYTES_IN_KB = 1024;

@Autowired
private PipelineRunManager runManager;
Expand Down Expand Up @@ -158,6 +161,9 @@ public class DockerContainerOperationManager {
@Value("${pause.run.script.url}")
private String pauseRunScriptUrl;

@Value("${limit.run.bandwidth.script.url}")
private String limitRunBandwidthScriptUrl;

private Lock resumeLock = new ReentrantLock();

public PipelineRun commitContainer(PipelineRun run, DockerRegistry registry,
Expand Down Expand Up @@ -302,6 +308,43 @@ public long getContainerSize(final PipelineRun run) {
return size;
}

public void limitNetworkBandwidth(final PipelineRun run, final Integer boundary, final Boolean enable) {
final String containerId = kubernetesManager.getContainerIdFromKubernetesPod(run.getPodId(),
run.getActualDockerImage());
try {
Assert.notNull(containerId,
messageHelper.getMessage(MessageConstants.ERROR_CONTAINER_ID_FOR_RUN_NOT_FOUND, run.getId()));

final String apiToken = authManager.issueTokenForCurrentUser(null).getToken();

final int boundaryKBitsPerSec = enable ? boundary * 8 / BYTES_IN_KB : 0;
final String limitNetworkCommand = LimitBandwidthCommand.builder()
.runScriptUrl(limitRunBandwidthScriptUrl)
.runId(String.valueOf(run.getId()))
.api(preferenceManager.getPreference(SystemPreferences.BASE_API_HOST))
.apiToken(apiToken)
.containerId(containerId)
.enable(String.valueOf(enable))
.uploadRate(String.valueOf(boundaryKBitsPerSec))
.downloadRate(String.valueOf(boundaryKBitsPerSec))
.build()
.getCommand();
final Process sshConnection = submitCommandViaSSH(run.getInstance().getNodeIP(), limitNetworkCommand,
getSshPort(run));
final boolean isFinished = sshConnection.waitFor(
preferenceManager.getPreference(SystemPreferences.LIMIT_NETWORK_BANDWIDTH_COMMAND_TIMEOUT),
TimeUnit.SECONDS);
Assert.state(isFinished && sshConnection.exitValue() == 0,
messageHelper.getMessage(MessageConstants.ERROR_LIMIT_NETWORK_BANDWIDTH_FAILED, run.getId()));
} catch (IllegalStateException | IllegalArgumentException | IOException e) {
log.error(e.getMessage());
throw new CmdExecutionException(LIMIT_NETWORK_BANDWIDTH_COMMAND_DESCRIPTION, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CmdExecutionException(LIMIT_NETWORK_BANDWIDTH_COMMAND_DESCRIPTION, e);
}
}

@Async("pauseRunExecutor")
public void pauseRun(final PipelineRun run, final boolean rerunPause) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2024 EPAM Systems, Inc. (https://www.epam.com/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.epam.pipeline.manager.docker;

import lombok.Builder;

import java.util.ArrayList;
import java.util.List;

@Builder
public class LimitBandwidthCommand extends AbstractDockerCommand {
private static final String COMMAND_TEMPLATE = "curl -k -s \"%s\" | sudo -E /bin/bash --login /dev/stdin %s";

private final String runId;
private final String api;
private final String apiToken;
private final String containerId;
private final String enable;
private final String uploadRate;
private final String downloadRate;

private final String runScriptUrl;

@Override
public String getCommand() {
return getDockerCommand(COMMAND_TEMPLATE, runScriptUrl);
}

@Override
protected List<String> buildCommandArguments() {
final List<String> command = new ArrayList<>();
command.add(runId);
command.add(api);
command.add(apiToken);
command.add(containerId);
command.add(enable);
command.add(uploadRate);
command.add(downloadRate);
return command;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
public class NotificationParameterManager {

private static final double PERCENT = 100.0;
private static final int BYTES_IN_KB = 1024;

private final JsonMapper jsonMapper;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2024 EPAM Systems, Inc. (https://www.epam.com/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.epam.pipeline.manager.pipeline;

import com.epam.pipeline.manager.preference.SystemPreferences;
import com.epam.pipeline.manager.scheduling.AbstractSchedulingManager;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;

@Service
@RequiredArgsConstructor
public class BandwidthMonitoringService extends AbstractSchedulingManager {
private final BandwidthMonitoringServiceCore core;

@PostConstruct
public void init() {
scheduleFixedDelaySecured(core::monitor, SystemPreferences.SYSTEM_POD_BANDWIDTH_MONITOR_DELAY,
"BandwidthMonitor");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2024 EPAM Systems, Inc. (https://www.epam.com/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.epam.pipeline.manager.pipeline;

import com.epam.pipeline.controller.vo.TagsVO;
import com.epam.pipeline.entity.pipeline.PipelineRun;
import com.epam.pipeline.entity.utils.DateUtils;
import com.epam.pipeline.manager.docker.DockerContainerOperationManager;
import com.epam.pipeline.manager.preference.PreferenceManager;
import com.epam.pipeline.manager.preference.SystemPreferences;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.core.SchedulerLock;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Map;

import static com.epam.pipeline.manager.pipeline.PipelineRunManager.NETWORK_LIMIT;

@Service
@Slf4j
@RequiredArgsConstructor
public class BandwidthMonitoringServiceCore {
private final PipelineRunManager pipelineRunManager;
private final DockerContainerOperationManager dockerContainerOperationManager;
private final PreferenceManager preferenceManager;

@SchedulerLock(name = "BandwidthMonitoringService_monitor", lockAtMostForString = "PT10M")
public void monitor() {
final List<PipelineRun> runs = pipelineRunManager.loadRunningPipelineRuns();
final String networkLimitDateTag = getNetworkLimitDateTag();
for (PipelineRun run: runs) {
final Map<String, String> tags = run.getTags();
if (shouldSetLimit(tags)) {
final int boundary = Integer.parseInt(tags.get(NETWORK_LIMIT));
dockerContainerOperationManager.limitNetworkBandwidth(run, boundary, true);
run.addTag(networkLimitDateTag, DateUtils.nowUTCStr());
} else if (shouldCleanLimit(tags)) {
dockerContainerOperationManager.limitNetworkBandwidth(run, 0, false);
run.removeTag(networkLimitDateTag);
}
pipelineRunManager.updateTags(run.getId(), new TagsVO(run.getTags()), true);
}
}

private boolean shouldSetLimit(final Map<String, String> tags) {
return tags.containsKey(NETWORK_LIMIT) && !tags.containsKey(getNetworkLimitDateTag());
}

private boolean shouldCleanLimit(final Map<String, String> tags) {
return !tags.containsKey(NETWORK_LIMIT) && tags.containsKey(getNetworkLimitDateTag());
}

private String getNetworkLimitDateTag() {
final String suffix = preferenceManager.getPreference(SystemPreferences.SYSTEM_RUN_TAG_DATE_SUFFIX);
return String.format("%s_%s", NETWORK_LIMIT, suffix);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,14 @@ public class PipelineRunManager {
private static final int DIVIDER_TO_GB = 1024 * 1024 * 1024;
private static final int USER_PRICE_SCALE = 2;
private static final int BILLING_PRICE_SCALE = 5;
public static final String CP_CAP_LIMIT_MOUNTS = "CP_CAP_LIMIT_MOUNTS";
private static final String LIMIT_MOUNTS_NONE = "none";
private static final String CP_REPORT_RUN_STATUS = "CP_REPORT_RUN_STATUS";
private static final String CP_REPORT_RUN_PROCESSED_DATE = "CP_REPORT_RUN_PROCESSED_DATE";
private static final String CP_GPU_COUNT = "CP_GPU_COUNT";

public static final String CP_CAP_LIMIT_MOUNTS = "CP_CAP_LIMIT_MOUNTS";
public static final String NETWORK_LIMIT = "NETWORK_LIMIT";

@Autowired
private PipelineRunDao pipelineRunDao;

Expand Down Expand Up @@ -371,7 +373,6 @@ public void prolongIdleRun(Long runId) {
updateProlongIdleRunAndLastIdleNotificationTime(run);
}


/**
* Internal method for creating a pipeline run,
* it assumes that ACL filtering was already applied to input arguments
Expand Down Expand Up @@ -1340,6 +1341,27 @@ public RunChartInfo loadActiveRunsCharts(final RunChartFilterVO filter) {
.build();
}

@Transactional(propagation = Propagation.REQUIRED)
public void setLimitBoundary(final Long runId, final Boolean enable, final Integer boundary) {
Assert.isTrue(!enable || boundary != null, "Boundary value should be specified to limit network bandwidth");
final PipelineRun run = pipelineRunDao.loadPipelineRun(runId);
Assert.notNull(run,
messageHelper.getMessage(MessageConstants.ERROR_PIPELINE_NOT_FOUND, runId));
final Map<String, String> tags = new HashMap<>(MapUtils.emptyIfNull(run.getTags()));
if (enable) {
tags.put(NETWORK_LIMIT, String.valueOf(boundary));
// Clear tag with timestamp of applied limit, to re-enable it with a new bound.
final String timestampTagSuffix = preferenceManager.getPreference(
SystemPreferences.SYSTEM_RUN_TAG_DATE_SUFFIX
);
tags.remove(String.format("%s_%s", NETWORK_LIMIT, timestampTagSuffix));
} else {
tags.remove(NETWORK_LIMIT);
}
run.setTags(tags);
pipelineRunDao.updateRunTags(run);
}

private int getTotalSize(final List<InstanceDisk> disks) {
return (int) disks.stream().mapToLong(InstanceDisk::getSize).sum();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,7 @@ public class SystemPreferences {
public static final BooleanPreference SYSTEM_NOTIFICATIONS_ENABLE = new BooleanPreference(
"system.notifications.enable", false, SYSTEM_GROUP, pass);

//in bytes per second
public static final DoublePreference SYSTEM_POD_BANDWIDTH_LIMIT = new DoublePreference(
"system.pod.bandwidth.limit", 0.0, SYSTEM_GROUP, isGreaterThanOrEquals(0.0f));

Expand All @@ -1187,6 +1188,12 @@ public class SystemPreferences {
"system.archive.run.owners.chunk.size", 100, SYSTEM_GROUP, isGreaterThan(0));


public static final IntPreference SYSTEM_POD_BANDWIDTH_MONITOR_DELAY = new IntPreference(
"system.pod.bandwidth.monitor.delay", 30000, SYSTEM_GROUP, pass);

public static final IntPreference LIMIT_NETWORK_BANDWIDTH_COMMAND_TIMEOUT = new IntPreference(
"limit.network.bandwidth.command.timeout", 600, SYSTEM_GROUP, isGreaterThan(0));

// FireCloud Integration
public static final ObjectPreference<List<String>> FIRECLOUD_SCOPES = new ObjectPreference<>(
"firecloud.api.scopes", null, new TypeReference<List<String>>() {}, FIRECLOUD_GROUP,
Expand Down
1 change: 1 addition & 0 deletions api/src/main/resources/messages.properties
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ error.run.pipeline.commit.failed=Process of the commit of the run ''{0}'' via ss
error.container.id.for.run.not.found=Container id for run ''{0}'' not found!
error.container.layers.count.failed=Get container layers count for run ''{0}'' failed!
error.container.size.failed=Get container size for run ''{0}'' failed!
error.limit.network.bandwidth.failed=Limit network bandwidth for run ''{0}'' failed!
error.pipeline.run.finished=Pipeline run with id: ''{0}'' already finished.
error.pipeline.run.not.stopped=Pipeline run with id: ''{0}'' not stopped.
error.pipeline.run.not.running=Run {0} shall be in 'RUNNING' status to perform this operation.
Expand Down
1 change: 1 addition & 0 deletions api/src/test/resources/test-application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ commit.run.scripts.root.url=
commit.run.script.starter.url=
container.layers.script.url=
container.size.script.url=
limit.run.bandwidth.script.url=

#pause/resume run scripts
pause.run.script.url=
Expand Down
1 change: 1 addition & 0 deletions deploy/docker/cp-api-srv/config/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ commit.run.scripts.root.url=https://${CP_API_SRV_INTERNAL_HOST}:${CP_API_SRV_INT
commit.run.script.starter.url=https://${CP_API_SRV_INTERNAL_HOST}:${CP_API_SRV_INTERNAL_PORT}/pipeline/commit-run-scripts/commit_run_starter.sh
container.layers.script.url=https://${CP_API_SRV_INTERNAL_HOST}:${CP_API_SRV_INTERNAL_PORT}/pipeline/commit-run-scripts/container_layers_count.sh
container.size.script.url=https://${CP_API_SRV_INTERNAL_HOST}:${CP_API_SRV_INTERNAL_PORT}/pipeline/commit-run-scripts/container_size.sh
limit.run.bandwidth.script.url=https://${CP_API_SRV_INTERNAL_HOST}:${CP_API_SRV_INTERNAL_PORT}/pipeline/commit-run-scripts/limit_bandwidth.sh
launch.script.url.linux=https://${CP_API_SRV_INTERNAL_HOST}:${CP_API_SRV_INTERNAL_PORT}/pipeline/launch.sh
init.script.url.linux=https://${CP_API_SRV_INTERNAL_HOST}:${CP_API_SRV_INTERNAL_PORT}/pipeline/init.sh
launch.script.url.windows=https://${CP_API_SRV_INTERNAL_HOST}:${CP_API_SRV_INTERNAL_PORT}/pipeline/launch.py
Expand Down
Loading

0 comments on commit 9d77b53

Please sign in to comment.