Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 3602 pod network bandwidth usage action #3621

Merged
merged 11 commits into from
Aug 21, 2024
Merged
2 changes: 2 additions & 0 deletions api/profiles/dev/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ commit.run.script.starter.url=
docker.registry.login.script=
container.layers.script.url=
container.size.script.url=
limit.run.bandwidth.script.url=


#pause/resume run scripts
pause.run.script.url=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,4 +398,9 @@ public void archiveRuns() {
public void archiveRuns(final String name, final boolean principal, final Integer days) {
archiveRunService.archiveRuns(name, principal, days);
}

@PreAuthorize(ADMIN_ONLY)
public void setLimitBoundary(final Long runId, final Boolean enable, final Integer boundary) {
runManager.setLimitBoundary(runId, enable, boundary);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public final class MessageConstants {
public static final String ERROR_RUN_PIPELINES_COMMIT_FAILED = "error.run.pipeline.commit.failed";
public static final String ERROR_GET_CONTAINER_LAYERS_COUNT_FAILED = "error.container.layers.count.failed";
public static final String ERROR_GET_CONTAINER_SIZE_FAILED = "error.container.size.failed";
public static final String ERROR_LIMIT_NETWORK_BANDWIDTH_FAILED = "error.limit.network.bandwidth.failed";
public static final String ERROR_CONTAINER_ID_FOR_RUN_NOT_FOUND = "error.container.id.for.run.not.found";
public static final String INFO_EXECUTE_COMMIT_RUN_PIPELINES = "info.execute.ssh.run.pipeline.command";
public static final String ERROR_RUN_PIPELINES_PAUSE_FAILED = "error.run.pipeline.pause.failed";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,4 +620,18 @@ public Result<Boolean> archiveRunsByOwner(@RequestParam final String ownerSid,
runApiService.archiveRuns(ownerSid, principal, days);
return Result.success(true);
}

@PostMapping("/run/{runId}/network/limit")
@ApiOperation(
value = "Set limit boundary",
notes = "Sets a special tag for a run based on boundary param: NETWORK_LIMIT: <boundary> (Bytes/s) " +
"in case of enable = true, otherwise removes the tag.",
produces = MediaType.APPLICATION_JSON_VALUE)
@ApiResponses(value = {@ApiResponse(code = HTTP_STATUS_OK, message = API_STATUS_DESCRIPTION)})
public Result<Boolean> setLimitBoundary(@PathVariable(value = RUN_ID) final Long runId,
@RequestParam(defaultValue = "true") final Boolean enable,
@RequestParam(required = false) final Integer boundary) {
runApiService.setLimitBoundary(runId, enable, boundary);
return Result.success();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public class DockerContainerOperationManager {
private static final String GLOBAL_KNOWN_HOSTS_FILE = "GlobalKnownHostsFile=/dev/null";
private static final String USER_KNOWN_HOSTS_FILE = "UserKnownHostsFile=/dev/null";
private static final String COMMIT_COMMAND_DESCRIPTION = "ssh session to commit pipeline run";
private static final String LIMIT_NETWORK_BANDWIDTH_COMMAND_DESCRIPTION =
"ssh session to limit pipeline run network bandwidth";
private static final String CONTAINER_LAYERS_COUNT_COMMAND_DESCRIPTION =
"ssh session to get docker container layers count";
private static final String PAUSE_COMMAND_DESCRIPTION = "Error is occurred during to pause pipeline run";
Expand All @@ -103,6 +105,7 @@ public class DockerContainerOperationManager {
private static final String CP_NODE_SSH_PORT = "CP_NODE_SSH_PORT";

public static final String PAUSE_RUN_TASK = "PausePipelineRun";
private static final int BYTES_IN_KB = 1024;

@Autowired
private PipelineRunManager runManager;
Expand Down Expand Up @@ -158,6 +161,9 @@ public class DockerContainerOperationManager {
@Value("${pause.run.script.url}")
private String pauseRunScriptUrl;

@Value("${limit.run.bandwidth.script.url}")
private String limitRunBandwidthScriptUrl;

private Lock resumeLock = new ReentrantLock();

public PipelineRun commitContainer(PipelineRun run, DockerRegistry registry,
Expand Down Expand Up @@ -302,6 +308,43 @@ public long getContainerSize(final PipelineRun run) {
return size;
}

public void limitNetworkBandwidth(final PipelineRun run, final Integer boundary, final Boolean enable) {
final String containerId = kubernetesManager.getContainerIdFromKubernetesPod(run.getPodId(),
run.getActualDockerImage());
try {
Assert.notNull(containerId,
messageHelper.getMessage(MessageConstants.ERROR_CONTAINER_ID_FOR_RUN_NOT_FOUND, run.getId()));

final String apiToken = authManager.issueTokenForCurrentUser(null).getToken();

final int boundaryKBitsPerSec = enable ? boundary * 8 / BYTES_IN_KB : 0;
final String limitNetworkCommand = LimitBandwidthCommand.builder()
.runScriptUrl(limitRunBandwidthScriptUrl)
.runId(String.valueOf(run.getId()))
.api(preferenceManager.getPreference(SystemPreferences.BASE_API_HOST))
.apiToken(apiToken)
.containerId(containerId)
.enable(String.valueOf(enable))
.uploadRate(String.valueOf(boundaryKBitsPerSec))
.downloadRate(String.valueOf(boundaryKBitsPerSec))
.build()
.getCommand();
final Process sshConnection = submitCommandViaSSH(run.getInstance().getNodeIP(), limitNetworkCommand,
getSshPort(run));
final boolean isFinished = sshConnection.waitFor(
preferenceManager.getPreference(SystemPreferences.LIMIT_NETWORK_BANDWIDTH_COMMAND_TIMEOUT),
TimeUnit.SECONDS);
Assert.state(isFinished && sshConnection.exitValue() == 0,
messageHelper.getMessage(MessageConstants.ERROR_LIMIT_NETWORK_BANDWIDTH_FAILED, run.getId()));
} catch (IllegalStateException | IllegalArgumentException | IOException e) {
log.error(e.getMessage());
throw new CmdExecutionException(LIMIT_NETWORK_BANDWIDTH_COMMAND_DESCRIPTION, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CmdExecutionException(LIMIT_NETWORK_BANDWIDTH_COMMAND_DESCRIPTION, e);
}
}

@Async("pauseRunExecutor")
public void pauseRun(final PipelineRun run, final boolean rerunPause) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2024 EPAM Systems, Inc. (https://www.epam.com/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.epam.pipeline.manager.docker;

import lombok.Builder;

import java.util.ArrayList;
import java.util.List;

@Builder
public class LimitBandwidthCommand extends AbstractDockerCommand {
private static final String COMMAND_TEMPLATE = "curl -k -s \"%s\" | sudo -E /bin/bash --login /dev/stdin %s";

private final String runId;
private final String api;
private final String apiToken;
private final String containerId;
private final String enable;
private final String uploadRate;
private final String downloadRate;

private final String runScriptUrl;

@Override
public String getCommand() {
return getDockerCommand(COMMAND_TEMPLATE, runScriptUrl);
}

@Override
protected List<String> buildCommandArguments() {
final List<String> command = new ArrayList<>();
command.add(runId);
command.add(api);
command.add(apiToken);
command.add(containerId);
command.add(enable);
command.add(uploadRate);
command.add(downloadRate);
return command;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
public class NotificationParameterManager {

private static final double PERCENT = 100.0;
private static final int BYTES_IN_KB = 1024;

private final JsonMapper jsonMapper;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2024 EPAM Systems, Inc. (https://www.epam.com/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.epam.pipeline.manager.pipeline;

import com.epam.pipeline.manager.preference.SystemPreferences;
import com.epam.pipeline.manager.scheduling.AbstractSchedulingManager;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;

@Service
@RequiredArgsConstructor
public class BandwidthMonitoringService extends AbstractSchedulingManager {
private final BandwidthMonitoringServiceCore core;

@PostConstruct
public void init() {
scheduleFixedDelaySecured(core::monitor, SystemPreferences.SYSTEM_POD_BANDWIDTH_MONITOR_DELAY,
"BandwidthMonitor");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2024 EPAM Systems, Inc. (https://www.epam.com/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.epam.pipeline.manager.pipeline;

import com.epam.pipeline.controller.vo.TagsVO;
import com.epam.pipeline.entity.pipeline.PipelineRun;
import com.epam.pipeline.entity.utils.DateUtils;
import com.epam.pipeline.manager.docker.DockerContainerOperationManager;
import com.epam.pipeline.manager.preference.PreferenceManager;
import com.epam.pipeline.manager.preference.SystemPreferences;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.core.SchedulerLock;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Map;

import static com.epam.pipeline.manager.pipeline.PipelineRunManager.NETWORK_LIMIT;

@Service
@Slf4j
@RequiredArgsConstructor
public class BandwidthMonitoringServiceCore {
private final PipelineRunManager pipelineRunManager;
private final DockerContainerOperationManager dockerContainerOperationManager;
private final PreferenceManager preferenceManager;

@SchedulerLock(name = "BandwidthMonitoringService_monitor", lockAtMostForString = "PT10M")
public void monitor() {
final List<PipelineRun> runs = pipelineRunManager.loadRunningPipelineRuns();
final String networkLimitDateTag = getNetworkLimitDateTag();
for (PipelineRun run: runs) {
final Map<String, String> tags = run.getTags();
if (shouldSetLimit(tags)) {
final int boundary = Integer.parseInt(tags.get(NETWORK_LIMIT));
dockerContainerOperationManager.limitNetworkBandwidth(run, boundary, true);
run.addTag(networkLimitDateTag, DateUtils.nowUTCStr());
} else if (shouldCleanLimit(tags)) {
dockerContainerOperationManager.limitNetworkBandwidth(run, 0, false);
run.removeTag(networkLimitDateTag);
}
pipelineRunManager.updateTags(run.getId(), new TagsVO(run.getTags()), true);
}
}

private boolean shouldSetLimit(final Map<String, String> tags) {
return tags.containsKey(NETWORK_LIMIT) && !tags.containsKey(getNetworkLimitDateTag());
}

private boolean shouldCleanLimit(final Map<String, String> tags) {
return !tags.containsKey(NETWORK_LIMIT) && tags.containsKey(getNetworkLimitDateTag());
}

private String getNetworkLimitDateTag() {
final String suffix = preferenceManager.getPreference(SystemPreferences.SYSTEM_RUN_TAG_DATE_SUFFIX);
return String.format("%s_%s", NETWORK_LIMIT, suffix);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,14 @@ public class PipelineRunManager {
private static final int DIVIDER_TO_GB = 1024 * 1024 * 1024;
private static final int USER_PRICE_SCALE = 2;
private static final int BILLING_PRICE_SCALE = 5;
public static final String CP_CAP_LIMIT_MOUNTS = "CP_CAP_LIMIT_MOUNTS";
private static final String LIMIT_MOUNTS_NONE = "none";
private static final String CP_REPORT_RUN_STATUS = "CP_REPORT_RUN_STATUS";
private static final String CP_REPORT_RUN_PROCESSED_DATE = "CP_REPORT_RUN_PROCESSED_DATE";
private static final String CP_GPU_COUNT = "CP_GPU_COUNT";

public static final String CP_CAP_LIMIT_MOUNTS = "CP_CAP_LIMIT_MOUNTS";
public static final String NETWORK_LIMIT = "NETWORK_LIMIT";

@Autowired
private PipelineRunDao pipelineRunDao;

Expand Down Expand Up @@ -371,7 +373,6 @@ public void prolongIdleRun(Long runId) {
updateProlongIdleRunAndLastIdleNotificationTime(run);
}


/**
* Internal method for creating a pipeline run,
* it assumes that ACL filtering was already applied to input arguments
Expand Down Expand Up @@ -1340,6 +1341,27 @@ public RunChartInfo loadActiveRunsCharts(final RunChartFilterVO filter) {
.build();
}

@Transactional(propagation = Propagation.REQUIRED)
public void setLimitBoundary(final Long runId, final Boolean enable, final Integer boundary) {
Assert.isTrue(!enable || boundary != null, "Boundary value should be specified to limit network bandwidth");
final PipelineRun run = pipelineRunDao.loadPipelineRun(runId);
Assert.notNull(run,
messageHelper.getMessage(MessageConstants.ERROR_PIPELINE_NOT_FOUND, runId));
final Map<String, String> tags = new HashMap<>(MapUtils.emptyIfNull(run.getTags()));
if (enable) {
tags.put(NETWORK_LIMIT, String.valueOf(boundary));
// Clear tag with timestamp of applied limit, to re-enable it with a new bound.
final String timestampTagSuffix = preferenceManager.getPreference(
SystemPreferences.SYSTEM_RUN_TAG_DATE_SUFFIX
);
tags.remove(String.format("%s_%s", NETWORK_LIMIT, timestampTagSuffix));
} else {
tags.remove(NETWORK_LIMIT);
}
run.setTags(tags);
pipelineRunDao.updateRunTags(run);
}

private int getTotalSize(final List<InstanceDisk> disks) {
return (int) disks.stream().mapToLong(InstanceDisk::getSize).sum();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,7 @@ public class SystemPreferences {
public static final BooleanPreference SYSTEM_NOTIFICATIONS_ENABLE = new BooleanPreference(
"system.notifications.enable", false, SYSTEM_GROUP, pass);

//in bytes per second
public static final DoublePreference SYSTEM_POD_BANDWIDTH_LIMIT = new DoublePreference(
"system.pod.bandwidth.limit", 0.0, SYSTEM_GROUP, isGreaterThanOrEquals(0.0f));

Expand All @@ -1187,6 +1188,12 @@ public class SystemPreferences {
"system.archive.run.owners.chunk.size", 100, SYSTEM_GROUP, isGreaterThan(0));


public static final IntPreference SYSTEM_POD_BANDWIDTH_MONITOR_DELAY = new IntPreference(
"system.pod.bandwidth.monitor.delay", 30000, SYSTEM_GROUP, pass);

public static final IntPreference LIMIT_NETWORK_BANDWIDTH_COMMAND_TIMEOUT = new IntPreference(
"limit.network.bandwidth.command.timeout", 600, SYSTEM_GROUP, isGreaterThan(0));

// FireCloud Integration
public static final ObjectPreference<List<String>> FIRECLOUD_SCOPES = new ObjectPreference<>(
"firecloud.api.scopes", null, new TypeReference<List<String>>() {}, FIRECLOUD_GROUP,
Expand Down
1 change: 1 addition & 0 deletions api/src/main/resources/messages.properties
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ error.run.pipeline.commit.failed=Process of the commit of the run ''{0}'' via ss
error.container.id.for.run.not.found=Container id for run ''{0}'' not found!
error.container.layers.count.failed=Get container layers count for run ''{0}'' failed!
error.container.size.failed=Get container size for run ''{0}'' failed!
error.limit.network.bandwidth.failed=Limit network bandwidth for run ''{0}'' failed!
error.pipeline.run.finished=Pipeline run with id: ''{0}'' already finished.
error.pipeline.run.not.stopped=Pipeline run with id: ''{0}'' not stopped.
error.pipeline.run.not.running=Run {0} shall be in 'RUNNING' status to perform this operation.
Expand Down
1 change: 1 addition & 0 deletions api/src/test/resources/test-application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ commit.run.scripts.root.url=
commit.run.script.starter.url=
container.layers.script.url=
container.size.script.url=
limit.run.bandwidth.script.url=

#pause/resume run scripts
pause.run.script.url=
Expand Down
1 change: 1 addition & 0 deletions deploy/docker/cp-api-srv/config/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ commit.run.scripts.root.url=https://${CP_API_SRV_INTERNAL_HOST}:${CP_API_SRV_INT
commit.run.script.starter.url=https://${CP_API_SRV_INTERNAL_HOST}:${CP_API_SRV_INTERNAL_PORT}/pipeline/commit-run-scripts/commit_run_starter.sh
container.layers.script.url=https://${CP_API_SRV_INTERNAL_HOST}:${CP_API_SRV_INTERNAL_PORT}/pipeline/commit-run-scripts/container_layers_count.sh
container.size.script.url=https://${CP_API_SRV_INTERNAL_HOST}:${CP_API_SRV_INTERNAL_PORT}/pipeline/commit-run-scripts/container_size.sh
limit.run.bandwidth.script.url=https://${CP_API_SRV_INTERNAL_HOST}:${CP_API_SRV_INTERNAL_PORT}/pipeline/commit-run-scripts/limit_bandwidth.sh
launch.script.url.linux=https://${CP_API_SRV_INTERNAL_HOST}:${CP_API_SRV_INTERNAL_PORT}/pipeline/launch.sh
init.script.url.linux=https://${CP_API_SRV_INTERNAL_HOST}:${CP_API_SRV_INTERNAL_PORT}/pipeline/init.sh
launch.script.url.windows=https://${CP_API_SRV_INTERNAL_HOST}:${CP_API_SRV_INTERNAL_PORT}/pipeline/launch.py
Expand Down
Loading
Loading