Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 3602 Pod nework bandwidth usage #3606

Merged
merged 8 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ public final class MessageConstants {
public static final String DEBUG_RUN_NOT_IDLED = "debug.run.not.idled";
public static final String DEBUG_RUN_HAS_NOT_NODE_NAME = "debug.run.has.not.node.name";
public static final String DEBUG_MEMORY_METRICS = "debug.memory.metrics.received";
public static final String INFO_RUN_HIGH_NETWORK_CONSUMPTION_NOTIFY = "info.run.high.network.consumption.notify";
public static final String DEBUG_NETWORK_RUN_METRICS_RECEIVED = "debug.network.run.metrics.received";
public static final String DEBUG_RUN_NOT_NETWORK_CONSUMING = "debug.run.not.network.consuming";


// Kubernetes messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,7 @@ public enum PipelineRunParameters {
LAST_NOTIFICATION_TIME,
PROLONGED_AT_TIME,
LAST_IDLE_NOTIFICATION_TIME,
LAST_NETWORK_CONSUMPTION_NOTIFICATION_TIME,
PROJECT_PIPELINES,
PROJECT_CONFIGS,
EXEC_PREFERENCES,
Expand Down Expand Up @@ -1070,6 +1071,8 @@ static MapSqlParameterSource getParameters(PipelineRun run, Connection connectio
params.addValue(PROLONGED_AT_TIME.name(), run.getProlongedAtTime());
params.addValue(LAST_NOTIFICATION_TIME.name(), run.getLastNotificationTime());
params.addValue(LAST_IDLE_NOTIFICATION_TIME.name(), run.getLastIdleNotificationTime());
params.addValue(LAST_NETWORK_CONSUMPTION_NOTIFICATION_TIME.name(),
run.getLastNetworkConsumptionNotificationTime());
params.addValue(EXEC_PREFERENCES.name(),
JsonMapper.convertDataToJsonStringForQuery(run.getExecutionPreferences()));
params.addValue(PRETTY_URL.name(), run.getPrettyUrl());
Expand Down Expand Up @@ -1214,9 +1217,16 @@ public static PipelineRun parsePipelineRun(ResultSet rs) throws SQLException {
run.setLastNotificationTime(new Date(lastNotificationTime.getTime()));
}

Timestamp lastIdleNotifiactionTime = rs.getTimestamp(LAST_IDLE_NOTIFICATION_TIME.name());
Timestamp lastIdleNotificationTime = rs.getTimestamp(LAST_IDLE_NOTIFICATION_TIME.name());
if (!rs.wasNull()) {
run.setLastIdleNotificationTime(lastIdleNotifiactionTime.toLocalDateTime()); // convert to UTC
run.setLastIdleNotificationTime(lastIdleNotificationTime.toLocalDateTime()); // convert to UTC
}

Timestamp lastNetworkConsumptionNotificationTime = rs.getTimestamp(
LAST_NETWORK_CONSUMPTION_NOTIFICATION_TIME.name());
if (!rs.wasNull()) {
// convert to UTC
run.setLastNetworkConsumptionNotificationTime(lastNetworkConsumptionNotificationTime.toLocalDateTime());
}

Timestamp idleNotificationStartingTime = rs.getTimestamp(PROLONGED_AT_TIME.name());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2024 EPAM Systems, Inc. (https://www.epam.com/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.epam.pipeline.entity.monitoring;

import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Describes actions, that can be taken if a PipelineRun is consuming network resources for a configured period of time,
* controlled by
* {@code SystemPreferences.SYSTEM_MAX_POD_BANDWIDTH_LIMIT_TIMEOUT_MINUTES} and
* {@code SystemPreferences.SYSTEM_POD_BANDWIDTH_ACTION_BACKOFF_PERIOD}
*/
public enum NetworkConsumingRunAction {
LIMIT_BANDWIDTH,
/**
* Just notify the owner of a Run
*/
NOTIFY;

private static final Set<String> VALUE_SET = Arrays.stream(NetworkConsumingRunAction.values())
.map(Enum::name)
.collect(Collectors.toSet());

public static boolean contains(String value) {
return VALUE_SET.contains(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,13 @@ private LocalDateTime fallbackMonitoringStart() {

private List<MonitoringStats> getStats(final String nodeName, final LocalDateTime start, final LocalDateTime end,
final Duration interval) {
return Stream.of(MONITORING_METRICS)
return getStats(MONITORING_METRICS, nodeName, start, end, interval);
}

private List<MonitoringStats> getStats(final ELKUsageMetric[] monitoringMetrics, final String nodeName,
final LocalDateTime start, final LocalDateTime end,
final Duration interval) {
return Stream.of(monitoringMetrics)
.map(it -> AbstractMetricRequester.getStatsRequester(it, client))
.map(it -> it.requestStats(nodeName, start, end, interval))
.flatMap(List::stream)
Expand All @@ -179,6 +185,12 @@ private List<MonitoringStats> getStats(final String nodeName, final LocalDateTim
.collect(Collectors.toList());
}

public List<MonitoringStats> getStats(final ELKUsageMetric[] monitoringMetrics, final String nodeName,
final LocalDateTime start, final LocalDateTime end) {
final Duration interval = interval(start, end);
return getStats(monitoringMetrics, nodeName, start, end, interval);
}

private Duration interval(final LocalDateTime start, final LocalDateTime end) {
final Duration requested = Duration.between(start, end).dividedBy(Math.max(1, numberOfIntervals() - 1));
final Duration minimal = minimalDuration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.Collection;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.epam.pipeline.entity.cluster.monitoring.ELKUsageMetric;
import com.epam.pipeline.entity.cluster.monitoring.MonitoringStats;
import com.epam.pipeline.entity.monitoring.NetworkConsumingRunAction;
import com.epam.pipeline.entity.monitoring.LongPausedRunAction;
import com.epam.pipeline.entity.pipeline.StopServerlessRun;
import com.epam.pipeline.entity.pipeline.TaskStatus;
Expand Down Expand Up @@ -86,6 +89,7 @@
public class ResourceMonitoringManager extends AbstractSchedulingManager {

public static final String UTILIZATION_LEVEL_LOW = "IDLE";
public static final String NETWORK_CONSUMING_LEVEL_HIGH = "NETWORK_CONSUMING";
public static final String UTILIZATION_LEVEL_HIGH = "PRESSURE";
public static final String TRUE_VALUE_STRING = "true";

Expand Down Expand Up @@ -113,6 +117,7 @@ static class ResourceMonitoringManagerCore {
private static final double PERCENT = 100.0;
private static final double ONE_THOUSANDTH = 0.001;
private static final long ONE = 1L;
private static final int BYTES_IN_KB = 1024;

private final PipelineRunManager pipelineRunManager;
private final RunStatusManager runStatusManager;
Expand All @@ -123,6 +128,7 @@ static class ResourceMonitoringManagerCore {
private final PreferenceManager preferenceManager;
private final StopServerlessRunManager stopServerlessRunManager;
private final InstanceOfferManager instanceOfferManager;
private final ESMonitoringManager monitoringManager;

@Autowired
ResourceMonitoringManagerCore(final PipelineRunManager pipelineRunManager,
Expand All @@ -133,7 +139,8 @@ static class ResourceMonitoringManagerCore {
final PreferenceManager preferenceManager,
final StopServerlessRunManager stopServerlessRunManager,
final InstanceOfferManager instanceOfferManager,
final RunStatusManager runStatusManager) {
final RunStatusManager runStatusManager,
final ESMonitoringManager monitoringManager) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use already existing way to request metrics (same as for IdleRuns). For this we need implement a method for NetworkRequester.java which now is unimplemented + AbstractMetricRequester.getRequester should be expanded to return NetworkRequester if such metric is requested

this.pipelineRunManager = pipelineRunManager;
this.pipelineRunDockerOperationManager = pipelineRunDockerOperationManager;
this.messageHelper = messageHelper;
Expand All @@ -143,6 +150,7 @@ static class ResourceMonitoringManagerCore {
this.stopServerlessRunManager = stopServerlessRunManager;
this.instanceOfferManager = instanceOfferManager;
this.runStatusManager = runStatusManager;
this.monitoringManager = monitoringManager;
}

@Scheduled(cron = "0 0 0 ? * *")
Expand All @@ -156,6 +164,7 @@ public void removeOldIndices() {
public void monitorResourceUsage() {
List<PipelineRun> runs = pipelineRunManager.loadRunningPipelineRuns();
processIdleRuns(runs);
processHighNetworkConsumingRuns(runs);
processOverloadedRuns(runs);
processPausingResumingRuns();
processServerlessRuns();
Expand Down Expand Up @@ -417,6 +426,136 @@ private void performNotify(PipelineRun run, double cpuUsageRate,
pipelinesToNotify.add(new ImmutablePair<>(run, cpuUsageRate));
}

private void processHighNetworkConsumingRuns(final List<PipelineRun> runs) {
final Map<String, PipelineRun> running = groupedByNode(runs);

final int bandwidthLimitTimeout = preferenceManager.getPreference(
SystemPreferences.SYSTEM_MAX_POD_BANDWIDTH_LIMIT_TIMEOUT_MINUTES);

log.debug(messageHelper.getMessage(MessageConstants.DEBUG_RUN_METRICS_REQUEST,
"NETWORK", running.size(), String.join(", ", running.keySet())));

final LocalDateTime now = DateUtils.nowUTC();
final Map<String, List<MonitoringStats>> networkMetrics = running.keySet().stream()
.collect(Collectors.toMap(nodeName -> nodeName, nodeName ->
monitoringManager.getStats(new ELKUsageMetric[]{ELKUsageMetric.NETWORK},
nodeName, now.minusMinutes(bandwidthLimitTimeout + ONE), now)));

final long bandwidthLimit = preferenceManager.getPreference(
SystemPreferences.SYSTEM_POD_BANDWIDTH_LIMIT);
final int actionTimeout = preferenceManager.getPreference(
SystemPreferences.SYSTEM_POD_BANDWIDTH_ACTION_BACKOFF_PERIOD);
final NetworkConsumingRunAction action = NetworkConsumingRunAction.valueOf(preferenceManager
.getPreference(SystemPreferences.SYSTEM_POD_BANDWIDTH_ACTION));

processHighNetworkConsumingRuns(running, networkMetrics, bandwidthLimit, actionTimeout, action);
}

private void processHighNetworkConsumingRun(PipelineRun run, int actionTimeout,
NetworkConsumingRunAction action,
List<Pair<PipelineRun, Double>> pipelinesToNotify,
List<PipelineRun> runsToUpdateNotificationTime,
Double networkBandwidthLevel, List<PipelineRun> runsToUpdateTags) {
if (shouldPerformActionOnNetworkConsumingRun(run, actionTimeout)) {
performActionOnNetworkConsumingRun(run, action, networkBandwidthLevel, pipelinesToNotify,
runsToUpdateNotificationTime);
return;
}
if (Objects.isNull(run.getLastNetworkConsumptionNotificationTime())) {
run.addTag(NETWORK_CONSUMING_LEVEL_HIGH, TRUE_VALUE_STRING);
Optional.ofNullable(getTimestampTag(NETWORK_CONSUMING_LEVEL_HIGH))
.ifPresent(tag -> run.addTag(tag, DateUtils.nowUTCStr()));
runsToUpdateTags.add(run);
run.setLastNetworkConsumptionNotificationTime(DateUtils.nowUTC());
runsToUpdateNotificationTime.add(run);
}
pipelinesToNotify.add(new ImmutablePair<>(run, networkBandwidthLevel));
log.info(messageHelper.getMessage(MessageConstants.INFO_RUN_HIGH_NETWORK_CONSUMPTION_NOTIFY,
run.getPodId(), networkBandwidthLevel));
}

private void processHighNetworkConsumingRuns(Map<String, PipelineRun> running,
Map<String, List<MonitoringStats>> networkMetrics,
long bandwidthLimit,
int actionTimeout, NetworkConsumingRunAction action) {
final List<PipelineRun> runsToUpdateNotificationTime = new ArrayList<>(running.size());
final List<Pair<PipelineRun, Double>> runsToNotify = new ArrayList<>(running.size());
final List<PipelineRun> runsToUpdateTags = new ArrayList<>(running.size());
for (Map.Entry<String, PipelineRun> entry : running.entrySet()) {
PipelineRun run = entry.getValue();
List<MonitoringStats> metric = networkMetrics.get(entry.getKey());
if (metric != null) {
List<Long> rxBytes = metric.stream()
.map(m -> m.getNetworkUsage().getStatsByInterface().values().stream()
.map(MonitoringStats.NetworkUsage.NetworkStats::getRxBytes)
.collect(Collectors.toList()))
.flatMap(Collection::stream).collect(Collectors.toList());
boolean rxExceeds = rxBytes.stream().allMatch(v -> v >= bandwidthLimit);
List<Long> txBytes = metric.stream()
.map(m -> m.getNetworkUsage().getStatsByInterface().values().stream()
.map(MonitoringStats.NetworkUsage.NetworkStats::getTxBytes)
.collect(Collectors.toList()))
.flatMap(Collection::stream).collect(Collectors.toList());
boolean txExceeds = txBytes.stream().allMatch(v -> v >= bandwidthLimit);
if (rxExceeds || txExceeds) {
long rxMax = Collections.max(rxBytes);
long txMax = Collections.max(txBytes);
processHighNetworkConsumingRun(run, actionTimeout, action, runsToNotify,
runsToUpdateNotificationTime, (double) (Math.max(rxMax, txMax) / BYTES_IN_KB),
runsToUpdateTags);
} else if (run.getLastNetworkConsumptionNotificationTime() != null) {
// No action is longer needed, clear timeout
log.debug(messageHelper.getMessage(MessageConstants.DEBUG_RUN_NOT_NETWORK_CONSUMING,
run.getPodId(), metric));
processFormerHighNetworkConsumingRun(run, runsToUpdateNotificationTime, runsToUpdateTags);
}
}
}
notificationManager.notifyHighNetworkConsumingRuns(runsToNotify,
NotificationType.HIGH_CONSUMED_NETWORK_BANDWIDTH);
pipelineRunManager.updatePipelineRunsLastNotification(runsToUpdateNotificationTime);
pipelineRunManager.updateRunsTags(runsToUpdateTags);
}

private void processFormerHighNetworkConsumingRun(final PipelineRun run,
final List<PipelineRun> runsToUpdateNotificationTime,
final List<PipelineRun> runsToUpdateTags) {
run.setLastNetworkConsumptionNotificationTime(null);
run.removeTag(NETWORK_CONSUMING_LEVEL_HIGH);
run.removeTag(getTimestampTag(NETWORK_CONSUMING_LEVEL_HIGH));
runsToUpdateNotificationTime.add(run);
runsToUpdateTags.add(run);
}

private boolean shouldPerformActionOnNetworkConsumingRun(final PipelineRun run, final int actionTimeout) {
return Objects.nonNull(run.getLastNetworkConsumptionNotificationTime()) &&
run.getLastNetworkConsumptionNotificationTime()
.isBefore(DateUtils.nowUTC().minusMinutes(actionTimeout));
}

private void performActionOnNetworkConsumingRun(final PipelineRun run,
final NetworkConsumingRunAction action,
final double networkBandwidthLevel,
final List<Pair<PipelineRun, Double>> pipelinesToNotify,
final List<PipelineRun> runsToUpdate) {
log.info(messageHelper.getMessage(MessageConstants.INFO_RUN_IDLE_ACTION, run.getPodId(),
networkBandwidthLevel, action));
switch (action) {
case LIMIT_BANDWIDTH:
// TODO
break;
default:
performHighNetworkConsumingNotify(run, networkBandwidthLevel, pipelinesToNotify);
}
runsToUpdate.add(run);
}

private void performHighNetworkConsumingNotify(PipelineRun run, double networkBandwidthLevel,
List<Pair<PipelineRun, Double>> pipelinesToNotify) {
run.setLastNetworkConsumptionNotificationTime(DateUtils.nowUTC());
pipelinesToNotify.add(new ImmutablePair<>(run, networkBandwidthLevel));
}

private void performStop(final PipelineRun run,
final double cpuUsageRate,
final List<PipelineRun> runsToUpdateTags) {
Expand Down
Loading
Loading