Skip to content

Commit

Permalink
Issue 3602 Pod nework bandwidth usage: notification (#3606)
Browse files Browse the repository at this point in the history
  • Loading branch information
okolesn authored Jul 18, 2024
1 parent 1f7d07f commit cdb4be4
Show file tree
Hide file tree
Showing 20 changed files with 443 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ public final class MessageConstants {
public static final String DEBUG_RUN_NOT_IDLED = "debug.run.not.idled";
public static final String DEBUG_RUN_HAS_NOT_NODE_NAME = "debug.run.has.not.node.name";
public static final String DEBUG_MEMORY_METRICS = "debug.memory.metrics.received";
public static final String INFO_RUN_HIGH_NETWORK_CONSUMPTION_NOTIFY = "info.run.high.network.consumption.notify";
public static final String DEBUG_NETWORK_RUN_METRICS_RECEIVED = "debug.network.run.metrics.received";
public static final String DEBUG_RUN_NOT_NETWORK_CONSUMING = "debug.run.not.network.consuming";


// Kubernetes messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ public static MetricRequester getRequester(final ELKUsageMetric metric,
return new MemoryRequester(client);
case FS:
return new FSRequester(client);
case NETWORK:
return new NetworkRequester(client);
default:
throw new IllegalArgumentException("Metric type: " + metric.getName() + " isn't supported!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@

import com.epam.pipeline.entity.cluster.monitoring.ELKUsageMetric;
import com.epam.pipeline.entity.cluster.monitoring.MonitoringStats;
import org.apache.commons.lang.NotImplementedException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand All @@ -49,12 +53,27 @@ protected ELKUsageMetric metric() {
@Override
public SearchRequest buildRequest(final Collection<String> resourceIds, final LocalDateTime from,
final LocalDateTime to, final Map<String, String> additional) {
throw new NotImplementedException("Method NetworkRequester::buildRequest is not implemented yet.");
return request(from, to,
new SearchSourceBuilder()
.query(QueryBuilders.boolQuery()
.filter(QueryBuilders.termsQuery(path(FIELD_METRICS_TAGS, FIELD_NODENAME_RAW),
resourceIds))
.filter(QueryBuilders.termQuery(path(FIELD_METRICS_TAGS, FIELD_TYPE), NODE))
.filter(QueryBuilders.rangeQuery(metric().getTimestamp())
.from(from.toInstant(ZoneOffset.UTC).toEpochMilli())
.to(to.toInstant(ZoneOffset.UTC).toEpochMilli())))
.size(0)
.aggregation(ordered(AggregationBuilders.terms(AGGREGATION_NODE_NAME))
.field(path(FIELD_METRICS_TAGS, FIELD_NODENAME_RAW))
.size(resourceIds.size())
.subAggregation(average(AVG_AGGREGATION + RX_RATE, RX_RATE))
.subAggregation(average(AVG_AGGREGATION + TX_RATE, TX_RATE))));
}

@Override
public Map<String, Double> parseResponse(final SearchResponse response) {
throw new NotImplementedException("Method NetworkRequester::buildRequest is not implemented yet.");
return ((Terms) response.getAggregations().get(AGGREGATION_NODE_NAME)).getBuckets().stream()
.collect(Collectors.toMap(MultiBucketsAggregation.Bucket::getKeyAsString, this::getNetworkUsage));
}

@Override
Expand Down Expand Up @@ -101,4 +120,11 @@ private MonitoringStats toMonitoringStats(final MultiBucketsAggregation.Bucket b
monitoringStats.setNetworkUsage(networkUsage);
return monitoringStats;
}

private Double getNetworkUsage(final MultiBucketsAggregation.Bucket bucket) {
final List<Aggregation> aggregations = aggregations(bucket);
final Optional<Double> rxRate = doubleValue(aggregations, AVG_AGGREGATION + RX_RATE);
final Optional<Double> txRate = doubleValue(aggregations, AVG_AGGREGATION + TX_RATE);
return Math.max(rxRate.orElse(0.0), txRate.orElse(0.0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,7 @@ public enum PipelineRunParameters {
LAST_NOTIFICATION_TIME,
PROLONGED_AT_TIME,
LAST_IDLE_NOTIFICATION_TIME,
LAST_NETWORK_CONSUMPTION_NOTIFICATION_TIME,
PROJECT_PIPELINES,
PROJECT_CONFIGS,
EXEC_PREFERENCES,
Expand Down Expand Up @@ -1070,6 +1071,8 @@ static MapSqlParameterSource getParameters(PipelineRun run, Connection connectio
params.addValue(PROLONGED_AT_TIME.name(), run.getProlongedAtTime());
params.addValue(LAST_NOTIFICATION_TIME.name(), run.getLastNotificationTime());
params.addValue(LAST_IDLE_NOTIFICATION_TIME.name(), run.getLastIdleNotificationTime());
params.addValue(LAST_NETWORK_CONSUMPTION_NOTIFICATION_TIME.name(),
run.getLastNetworkConsumptionNotificationTime());
params.addValue(EXEC_PREFERENCES.name(),
JsonMapper.convertDataToJsonStringForQuery(run.getExecutionPreferences()));
params.addValue(PRETTY_URL.name(), run.getPrettyUrl());
Expand Down Expand Up @@ -1214,9 +1217,16 @@ public static PipelineRun parsePipelineRun(ResultSet rs) throws SQLException {
run.setLastNotificationTime(new Date(lastNotificationTime.getTime()));
}

Timestamp lastIdleNotifiactionTime = rs.getTimestamp(LAST_IDLE_NOTIFICATION_TIME.name());
Timestamp lastIdleNotificationTime = rs.getTimestamp(LAST_IDLE_NOTIFICATION_TIME.name());
if (!rs.wasNull()) {
run.setLastIdleNotificationTime(lastIdleNotifiactionTime.toLocalDateTime()); // convert to UTC
run.setLastIdleNotificationTime(lastIdleNotificationTime.toLocalDateTime()); // convert to UTC
}

Timestamp lastNetworkConsumptionNotificationTime = rs.getTimestamp(
LAST_NETWORK_CONSUMPTION_NOTIFICATION_TIME.name());
if (!rs.wasNull()) {
// convert to UTC
run.setLastNetworkConsumptionNotificationTime(lastNetworkConsumptionNotificationTime.toLocalDateTime());
}

Timestamp idleNotificationStartingTime = rs.getTimestamp(PROLONGED_AT_TIME.name());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2024 EPAM Systems, Inc. (https://www.epam.com/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.epam.pipeline.entity.monitoring;

import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Describes actions, that can be taken if a PipelineRun is consuming network resources for a configured period of time,
* controlled by
* {@code SystemPreferences.SYSTEM_MAX_POD_BANDWIDTH_LIMIT_TIMEOUT_MINUTES} and
* {@code SystemPreferences.SYSTEM_POD_BANDWIDTH_ACTION_BACKOFF_PERIOD}
*/
public enum NetworkConsumingRunAction {
LIMIT_BANDWIDTH,
/**
* Just notify the owner of a Run
*/
NOTIFY;

private static final Set<String> VALUE_SET = Arrays.stream(NetworkConsumingRunAction.values())
.map(Enum::name)
.collect(Collectors.toSet());

public static boolean contains(String value) {
return VALUE_SET.contains(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.stream.Stream;

import com.epam.pipeline.entity.cluster.monitoring.ELKUsageMetric;
import com.epam.pipeline.entity.monitoring.NetworkConsumingRunAction;
import com.epam.pipeline.entity.monitoring.LongPausedRunAction;
import com.epam.pipeline.entity.pipeline.StopServerlessRun;
import com.epam.pipeline.entity.pipeline.TaskStatus;
Expand Down Expand Up @@ -86,6 +87,7 @@
public class ResourceMonitoringManager extends AbstractSchedulingManager {

public static final String UTILIZATION_LEVEL_LOW = "IDLE";
public static final String NETWORK_CONSUMING_LEVEL_HIGH = "NETWORK_PRESSURE";
public static final String UTILIZATION_LEVEL_HIGH = "PRESSURE";
public static final String TRUE_VALUE_STRING = "true";

Expand Down Expand Up @@ -156,6 +158,7 @@ public void removeOldIndices() {
public void monitorResourceUsage() {
List<PipelineRun> runs = pipelineRunManager.loadRunningPipelineRuns();
processIdleRuns(runs);
processHighNetworkConsumingRuns(runs);
processOverloadedRuns(runs);
processPausingResumingRuns();
processServerlessRuns();
Expand Down Expand Up @@ -417,6 +420,123 @@ private void performNotify(PipelineRun run, double cpuUsageRate,
pipelinesToNotify.add(new ImmutablePair<>(run, cpuUsageRate));
}

private void processHighNetworkConsumingRuns(final List<PipelineRun> runs) {
final Map<String, PipelineRun> running = groupedByNode(runs);

final int bandwidthLimitTimeout = preferenceManager.getPreference(
SystemPreferences.SYSTEM_MAX_POD_BANDWIDTH_LIMIT_TIMEOUT_MINUTES);

log.debug(messageHelper.getMessage(MessageConstants.DEBUG_RUN_METRICS_REQUEST,
"NETWORK", running.size(), String.join(", ", running.keySet())));

final LocalDateTime now = DateUtils.nowUTC();
final Map<String, Double> networkMetrics = monitoringDao.loadMetrics(ELKUsageMetric.NETWORK,
running.keySet(), now.minusMinutes(bandwidthLimitTimeout + ONE), now);
log.debug(messageHelper.getMessage(MessageConstants.DEBUG_NETWORK_RUN_METRICS_RECEIVED,
networkMetrics.entrySet().stream().map(e -> e.getKey() + ":" + e.getValue())
.collect(Collectors.joining(", ")))
);

final double bandwidthLimit = preferenceManager.getPreference(
SystemPreferences.SYSTEM_POD_BANDWIDTH_LIMIT);
final int actionTimeout = preferenceManager.getPreference(
SystemPreferences.SYSTEM_POD_BANDWIDTH_ACTION_BACKOFF_PERIOD);
final NetworkConsumingRunAction action = NetworkConsumingRunAction.valueOf(preferenceManager
.getPreference(SystemPreferences.SYSTEM_POD_BANDWIDTH_ACTION));

processHighNetworkConsumingRuns(running, networkMetrics, bandwidthLimit, actionTimeout, action);
}

private void processHighNetworkConsumingRun(PipelineRun run, int actionTimeout,
NetworkConsumingRunAction action,
List<Pair<PipelineRun, Double>> pipelinesToNotify,
List<PipelineRun> runsToUpdateNotificationTime,
Double networkBandwidthLevel, List<PipelineRun> runsToUpdateTags) {
if (shouldPerformActionOnNetworkConsumingRun(run, actionTimeout)) {
performActionOnNetworkConsumingRun(run, action, networkBandwidthLevel, pipelinesToNotify,
runsToUpdateNotificationTime);
return;
}
if (Objects.isNull(run.getLastNetworkConsumptionNotificationTime())) {
run.addTag(NETWORK_CONSUMING_LEVEL_HIGH, TRUE_VALUE_STRING);
Optional.ofNullable(getTimestampTag(NETWORK_CONSUMING_LEVEL_HIGH))
.ifPresent(tag -> run.addTag(tag, DateUtils.nowUTCStr()));
runsToUpdateTags.add(run);
run.setLastNetworkConsumptionNotificationTime(DateUtils.nowUTC());
runsToUpdateNotificationTime.add(run);
}
pipelinesToNotify.add(new ImmutablePair<>(run, networkBandwidthLevel));
log.info(messageHelper.getMessage(MessageConstants.INFO_RUN_HIGH_NETWORK_CONSUMPTION_NOTIFY,
run.getPodId(), networkBandwidthLevel));
}

private void processHighNetworkConsumingRuns(Map<String, PipelineRun> running,
Map<String, Double> networkMetrics,
double bandwidthLimit,
int actionTimeout, NetworkConsumingRunAction action) {
final List<PipelineRun> runsToUpdateNotificationTime = new ArrayList<>(running.size());
final List<Pair<PipelineRun, Double>> runsToNotify = new ArrayList<>(running.size());
final List<PipelineRun> runsToUpdateTags = new ArrayList<>(running.size());
for (Map.Entry<String, PipelineRun> entry : running.entrySet()) {
PipelineRun run = entry.getValue();
Double metric = networkMetrics.get(entry.getKey());
if (metric != null) {
if (metric >= bandwidthLimit) {
processHighNetworkConsumingRun(run, actionTimeout, action, runsToNotify,
runsToUpdateNotificationTime, metric, runsToUpdateTags);
} else if (run.getLastNetworkConsumptionNotificationTime() != null) {
// No action is longer needed, clear timeout
log.debug(messageHelper.getMessage(MessageConstants.DEBUG_RUN_NOT_NETWORK_CONSUMING,
run.getPodId(), metric));
processFormerHighNetworkConsumingRun(run, runsToUpdateNotificationTime, runsToUpdateTags);
}
}
}
notificationManager.notifyHighNetworkConsumingRuns(runsToNotify,
NotificationType.HIGH_CONSUMED_NETWORK_BANDWIDTH);
pipelineRunManager.updatePipelineRunsLastNotification(runsToUpdateNotificationTime);
pipelineRunManager.updateRunsTags(runsToUpdateTags);
}

private void processFormerHighNetworkConsumingRun(final PipelineRun run,
final List<PipelineRun> runsToUpdateNotificationTime,
final List<PipelineRun> runsToUpdateTags) {
run.setLastNetworkConsumptionNotificationTime(null);
run.removeTag(NETWORK_CONSUMING_LEVEL_HIGH);
run.removeTag(getTimestampTag(NETWORK_CONSUMING_LEVEL_HIGH));
runsToUpdateNotificationTime.add(run);
runsToUpdateTags.add(run);
}

private boolean shouldPerformActionOnNetworkConsumingRun(final PipelineRun run, final int actionTimeout) {
return Objects.nonNull(run.getLastNetworkConsumptionNotificationTime()) &&
run.getLastNetworkConsumptionNotificationTime()
.isBefore(DateUtils.nowUTC().minusMinutes(actionTimeout));
}

private void performActionOnNetworkConsumingRun(final PipelineRun run,
final NetworkConsumingRunAction action,
final double networkBandwidthLevel,
final List<Pair<PipelineRun, Double>> pipelinesToNotify,
final List<PipelineRun> runsToUpdate) {
log.info(messageHelper.getMessage(MessageConstants.INFO_RUN_IDLE_ACTION, run.getPodId(),
networkBandwidthLevel, action));
switch (action) {
case LIMIT_BANDWIDTH:
// TODO
break;
default:
performHighNetworkConsumingNotify(run, networkBandwidthLevel, pipelinesToNotify);
}
runsToUpdate.add(run);
}

private void performHighNetworkConsumingNotify(PipelineRun run, double networkBandwidthLevel,
List<Pair<PipelineRun, Double>> pipelinesToNotify) {
run.setLastNetworkConsumptionNotificationTime(DateUtils.nowUTC());
pipelinesToNotify.add(new ImmutablePair<>(run, networkBandwidthLevel));
}

private void performStop(final PipelineRun run,
final double cpuUsageRate,
final List<PipelineRun> runsToUpdateTags) {
Expand Down
Loading

0 comments on commit cdb4be4

Please sign in to comment.