From 3d5e4138e9e7274cf77519d2ddcf3a7708c71dec Mon Sep 17 00:00:00 2001 From: Oksana_Kolesnikova Date: Mon, 15 Jul 2024 22:32:35 +0300 Subject: [PATCH 1/8] Issue 3602 Pod nework bandwidth usage --- .../pipeline/common/MessageConstants.java | 3 + .../pipeline/dao/pipeline/PipelineRunDao.java | 14 ++- .../monitoring/NetworkConsumingRunAction.java | 42 +++++++ .../ResourceMonitoringManager.java | 118 ++++++++++++++++++ .../notification/NotificationManager.java | 72 +++++++++++ .../notification/NotificationService.java | 5 + .../preference/PreferenceValidators.java | 4 + .../manager/preference/SystemPreferences.java | 16 +++ api/src/main/resources/dao/filter-dao.xml | 1 + .../main/resources/dao/pipeline-run-dao.xml | 28 ++++- ...k_consumption_notification_time_column.sql | 2 + api/src/main/resources/messages.properties | 3 + .../ResourceMonitoringManagerTest.java | 49 +++++--- .../entity/notification/NotificationType.java | 4 +- .../pipeline/entity/pipeline/PipelineRun.java | 1 + 15 files changed, 339 insertions(+), 23 deletions(-) create mode 100644 api/src/main/java/com/epam/pipeline/entity/monitoring/NetworkConsumingRunAction.java create mode 100644 api/src/main/resources/db/migration/v2024.07.17_14.00__issue_3602_added_last_network_consumption_notification_time_column.sql diff --git a/api/src/main/java/com/epam/pipeline/common/MessageConstants.java b/api/src/main/java/com/epam/pipeline/common/MessageConstants.java index e3f3290601..9937d8bd89 100644 --- a/api/src/main/java/com/epam/pipeline/common/MessageConstants.java +++ b/api/src/main/java/com/epam/pipeline/common/MessageConstants.java @@ -215,6 +215,9 @@ public final class MessageConstants { public static final String DEBUG_RUN_NOT_IDLED = "debug.run.not.idled"; public static final String DEBUG_RUN_HAS_NOT_NODE_NAME = "debug.run.has.not.node.name"; public static final String DEBUG_MEMORY_METRICS = "debug.memory.metrics.received"; + public static final String INFO_RUN_HIGH_NETWORK_CONSUMPTION_NOTIFY = "info.run.high.network.consumption.notify"; + public static final String DEBUG_NETWORK_RUN_METRICS_RECEIVED = "debug.network.run.metrics.received"; + public static final String DEBUG_RUN_NOT_NETWORK_CONSUMING = "debug.run.not.network.consuming"; // Kubernetes messages diff --git a/api/src/main/java/com/epam/pipeline/dao/pipeline/PipelineRunDao.java b/api/src/main/java/com/epam/pipeline/dao/pipeline/PipelineRunDao.java index 3e5b902a93..ae1a24ec09 100644 --- a/api/src/main/java/com/epam/pipeline/dao/pipeline/PipelineRunDao.java +++ b/api/src/main/java/com/epam/pipeline/dao/pipeline/PipelineRunDao.java @@ -1014,6 +1014,7 @@ public enum PipelineRunParameters { LAST_NOTIFICATION_TIME, PROLONGED_AT_TIME, LAST_IDLE_NOTIFICATION_TIME, + LAST_NETWORK_CONSUMPTION_NOTIFICATION_TIME, PROJECT_PIPELINES, PROJECT_CONFIGS, EXEC_PREFERENCES, @@ -1070,6 +1071,8 @@ static MapSqlParameterSource getParameters(PipelineRun run, Connection connectio params.addValue(PROLONGED_AT_TIME.name(), run.getProlongedAtTime()); params.addValue(LAST_NOTIFICATION_TIME.name(), run.getLastNotificationTime()); params.addValue(LAST_IDLE_NOTIFICATION_TIME.name(), run.getLastIdleNotificationTime()); + params.addValue(LAST_NETWORK_CONSUMPTION_NOTIFICATION_TIME.name(), + run.getLastNetworkConsumptionNotificationTime()); params.addValue(EXEC_PREFERENCES.name(), JsonMapper.convertDataToJsonStringForQuery(run.getExecutionPreferences())); params.addValue(PRETTY_URL.name(), run.getPrettyUrl()); @@ -1214,9 +1217,16 @@ public static PipelineRun parsePipelineRun(ResultSet rs) throws SQLException { run.setLastNotificationTime(new Date(lastNotificationTime.getTime())); } - Timestamp lastIdleNotifiactionTime = rs.getTimestamp(LAST_IDLE_NOTIFICATION_TIME.name()); + Timestamp lastIdleNotificationTime = rs.getTimestamp(LAST_IDLE_NOTIFICATION_TIME.name()); if (!rs.wasNull()) { - run.setLastIdleNotificationTime(lastIdleNotifiactionTime.toLocalDateTime()); // convert to UTC + run.setLastIdleNotificationTime(lastIdleNotificationTime.toLocalDateTime()); // convert to UTC + } + + Timestamp lastNetworkConsumptionNotificationTime = rs.getTimestamp( + LAST_NETWORK_CONSUMPTION_NOTIFICATION_TIME.name()); + if (!rs.wasNull()) { + // convert to UTC + run.setLastNetworkConsumptionNotificationTime(lastNetworkConsumptionNotificationTime.toLocalDateTime()); } Timestamp idleNotificationStartingTime = rs.getTimestamp(PROLONGED_AT_TIME.name()); diff --git a/api/src/main/java/com/epam/pipeline/entity/monitoring/NetworkConsumingRunAction.java b/api/src/main/java/com/epam/pipeline/entity/monitoring/NetworkConsumingRunAction.java new file mode 100644 index 0000000000..990454bc80 --- /dev/null +++ b/api/src/main/java/com/epam/pipeline/entity/monitoring/NetworkConsumingRunAction.java @@ -0,0 +1,42 @@ +/* + * Copyright 2024 EPAM Systems, Inc. (https://www.epam.com/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.epam.pipeline.entity.monitoring; + +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Describes actions, that can be taken if a PipelineRun is consuming network resources for a configured period of time, + * controlled by + * {@code SystemPreferences.SYSTEM_MAX_POD_BANDWIDTH_LIMIT_TIMEOUT_MINUTES} and + * {@code SystemPreferences.SYSTEM_POD_BANDWIDTH_ACTION_BACKOFF_PERIOD} + */ +public enum NetworkConsumingRunAction { + /** + * Just notify the owner of a Run + */ + NOTIFY; + + private static final Set VALUE_SET = Arrays.stream(NetworkConsumingRunAction.values()) + .map(Enum::name) + .collect(Collectors.toSet()); + + public static boolean contains(String value) { + return VALUE_SET.contains(value); + } +} diff --git a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java index e8a2acd83e..a3e7a983e3 100644 --- a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java +++ b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java @@ -33,6 +33,7 @@ import java.util.stream.Stream; import com.epam.pipeline.entity.cluster.monitoring.ELKUsageMetric; +import com.epam.pipeline.entity.monitoring.NetworkConsumingRunAction; import com.epam.pipeline.entity.monitoring.LongPausedRunAction; import com.epam.pipeline.entity.pipeline.StopServerlessRun; import com.epam.pipeline.entity.pipeline.TaskStatus; @@ -86,6 +87,7 @@ public class ResourceMonitoringManager extends AbstractSchedulingManager { public static final String UTILIZATION_LEVEL_LOW = "IDLE"; + public static final String NETWORK_CONSUMING_LEVEL_HIGH = "NETWORK_CONSUMING"; public static final String UTILIZATION_LEVEL_HIGH = "PRESSURE"; public static final String TRUE_VALUE_STRING = "true"; @@ -156,6 +158,7 @@ public void removeOldIndices() { public void monitorResourceUsage() { List runs = pipelineRunManager.loadRunningPipelineRuns(); processIdleRuns(runs); + processHighNetworkConsumingRuns(runs); processOverloadedRuns(runs); processPausingResumingRuns(); processServerlessRuns(); @@ -417,6 +420,121 @@ private void performNotify(PipelineRun run, double cpuUsageRate, pipelinesToNotify.add(new ImmutablePair<>(run, cpuUsageRate)); } + private void processHighNetworkConsumingRuns(final List runs) { + final Map running = groupedByNode(runs); + + final int bandwidthLimitTimeout = preferenceManager.getPreference( + SystemPreferences.SYSTEM_MAX_POD_BANDWIDTH_LIMIT_TIMEOUT_MINUTES); + + log.debug(messageHelper.getMessage(MessageConstants.DEBUG_RUN_METRICS_REQUEST, + "NETWORK", running.size(), String.join(", ", running.keySet()))); + + final LocalDateTime now = DateUtils.nowUTC(); + final Map networkMetrics = monitoringDao.loadMetrics(ELKUsageMetric.NETWORK, + running.keySet(), now.minusMinutes(bandwidthLimitTimeout + ONE), now); + log.debug(messageHelper.getMessage(MessageConstants.DEBUG_NETWORK_RUN_METRICS_RECEIVED, + networkMetrics.entrySet().stream().map(e -> e.getKey() + ":" + e.getValue()) + .collect(Collectors.joining(", "))) + ); + + final double bandwidthLimit = preferenceManager.getPreference( + SystemPreferences.SYSTEM_POD_BANDWIDTH_LIMIT); + final int actionTimeout = preferenceManager.getPreference( + SystemPreferences.SYSTEM_POD_BANDWIDTH_ACTION_BACKOFF_PERIOD); + final NetworkConsumingRunAction action = NetworkConsumingRunAction.valueOf(preferenceManager + .getPreference(SystemPreferences.SYSTEM_POD_BANDWIDTH_ACTION)); + + processHighNetworkConsumingRuns(running, networkMetrics, bandwidthLimit, actionTimeout, action); + } + + private void processHighNetworkConsumingRun(PipelineRun run, int actionTimeout, + NetworkConsumingRunAction action, + List> pipelinesToNotify, + List runsToUpdateNotificationTime, + Double networkBandwidthLevel, List runsToUpdateTags) { + if (shouldPerformActionOnNetworkConsumingRun(run, actionTimeout)) { + performActionOnNetworkConsumingRun(run, action, networkBandwidthLevel, pipelinesToNotify, + runsToUpdateNotificationTime, runsToUpdateTags); + return; + } + if (Objects.isNull(run.getLastNetworkConsumptionNotificationTime())) { + run.addTag(NETWORK_CONSUMING_LEVEL_HIGH, TRUE_VALUE_STRING); + Optional.ofNullable(getTimestampTag(NETWORK_CONSUMING_LEVEL_HIGH)) + .ifPresent(tag -> run.addTag(tag, DateUtils.nowUTCStr())); + runsToUpdateTags.add(run); + run.setLastNetworkConsumptionNotificationTime(DateUtils.nowUTC()); + runsToUpdateNotificationTime.add(run); + } + pipelinesToNotify.add(new ImmutablePair<>(run, networkBandwidthLevel)); + log.info(messageHelper.getMessage(MessageConstants.INFO_RUN_HIGH_NETWORK_CONSUMPTION_NOTIFY, + run.getPodId(), networkBandwidthLevel)); + } + + private void processHighNetworkConsumingRuns(Map running, + Map networkMetrics, + double bandwidthLimit, + int actionTimeout, NetworkConsumingRunAction action) { + final List runsToUpdateNotificationTime = new ArrayList<>(running.size()); + final List> runsToNotify = new ArrayList<>(running.size()); + final List runsToUpdateTags = new ArrayList<>(running.size()); + for (Map.Entry entry : running.entrySet()) { + PipelineRun run = entry.getValue(); + Double metric = networkMetrics.get(entry.getKey()); + if (metric != null) { + if (Precision.compareTo(metric, bandwidthLimit, ONE_THOUSANDTH) >= 0) { + processHighNetworkConsumingRun(run, actionTimeout, action, runsToNotify, + runsToUpdateNotificationTime, metric, runsToUpdateTags); + } else if (run.getLastNetworkConsumptionNotificationTime() != null) { + // No action is longer needed, clear timeout + log.debug(messageHelper.getMessage(MessageConstants.DEBUG_RUN_NOT_NETWORK_CONSUMING, + run.getPodId(), metric)); + processFormerHighNetworkConsumingRun(run, runsToUpdateNotificationTime, runsToUpdateTags); + } + } + } + notificationManager.notifyHighNetworkConsumingRuns(runsToNotify, + NotificationType.HIGH_CONSUMED_NETWORK_BANDWIDTH); + pipelineRunManager.updatePipelineRunsLastNotification(runsToUpdateNotificationTime); + pipelineRunManager.updateRunsTags(runsToUpdateTags); + } + + private void processFormerHighNetworkConsumingRun(final PipelineRun run, + final List runsToUpdateNotificationTime, + final List runsToUpdateTags) { + run.setLastNetworkConsumptionNotificationTime(null); + run.removeTag(NETWORK_CONSUMING_LEVEL_HIGH); + run.removeTag(getTimestampTag(NETWORK_CONSUMING_LEVEL_HIGH)); + runsToUpdateNotificationTime.add(run); + runsToUpdateTags.add(run); + } + + private boolean shouldPerformActionOnNetworkConsumingRun(final PipelineRun run, final int actionTimeout) { + return Objects.nonNull(run.getLastNetworkConsumptionNotificationTime()) && + run.getLastNetworkConsumptionNotificationTime() + .isBefore(DateUtils.nowUTC().minusMinutes(actionTimeout)); + } + + private void performActionOnNetworkConsumingRun(final PipelineRun run, + final NetworkConsumingRunAction action, + final double networkBandwidthLevel, + final List> pipelinesToNotify, + final List runsToUpdate, + final List runsToUpdateTags) { + log.info(messageHelper.getMessage(MessageConstants.INFO_RUN_IDLE_ACTION, run.getPodId(), + networkBandwidthLevel, action)); + switch (action) { + default: + performHighNetworkConsumingNotify(run, networkBandwidthLevel, pipelinesToNotify); + } + runsToUpdate.add(run); + } + + private void performHighNetworkConsumingNotify(PipelineRun run, double networkBandwidthLevel, + List> pipelinesToNotify) { + run.setLastNetworkConsumptionNotificationTime(DateUtils.nowUTC()); + pipelinesToNotify.add(new ImmutablePair<>(run, networkBandwidthLevel)); + } + private void performStop(final PipelineRun run, final double cpuUsageRate, final List runsToUpdateTags) { diff --git a/api/src/main/java/com/epam/pipeline/manager/notification/NotificationManager.java b/api/src/main/java/com/epam/pipeline/manager/notification/NotificationManager.java index 956759d4ac..87ae798775 100644 --- a/api/src/main/java/com/epam/pipeline/manager/notification/NotificationManager.java +++ b/api/src/main/java/com/epam/pipeline/manager/notification/NotificationManager.java @@ -341,6 +341,69 @@ private NotificationMessage buildMessageForIdleRun(final NotificationSettings id return message; } + @Transactional(propagation = Propagation.REQUIRED) + public void notifyHighNetworkConsumingRuns(final List> pipelineNetworkBandwidthPairs, + final NotificationType type) { + if (CollectionUtils.isEmpty(pipelineNetworkBandwidthPairs)) { + return; + } + + Assert.isTrue(NotificationGroup.RESOURCE_CONSUMING == type.getGroup(), + "Only RESOURCE_CONSUMING group notification types are allowed"); + + final NotificationSettings settings = settingsManager.load(type); + if (settings == null || !settings.isEnabled()) { + log.info("No template configured for high network consuming pipeline run notifications " + + "or it was disabled!"); + return; + } + + final List ccUserIds = getCCUsers(settings); + final Map pipelineOwners = getPipelinesOwners(pipelineNetworkBandwidthPairs); + + final double bandwidthLimit = preferenceManager.getPreference( + SystemPreferences.SYSTEM_POD_BANDWIDTH_LIMIT); + final String instanceTypesToExclude = preferenceManager.getPreference(SystemPreferences + .SYSTEM_NOTIFICATIONS_EXCLUDE_INSTANCE_TYPES); + final Map runParametersFilters = parseRunExcludeParams(); + + final List> filtered = pipelineNetworkBandwidthPairs.stream() + .filter(pair -> shouldNotifyHighNetworkConsumingRun(pair.getLeft().getId(), type, settings)) + .filter(pair -> noneMatchExcludedInstanceType(pair.getLeft(), instanceTypesToExclude)) + .filter(pair -> !matchExcludeRunParameters(pair.getLeft(), runParametersFilters)) + .collect(Collectors.toList()); + final List messages = filtered.stream() + .map(pair -> buildMessageForHighNetworkConsumingRun(settings, ccUserIds, pipelineOwners, pair.getLeft(), + pair.getRight(), bandwidthLimit, type)) + .collect(Collectors.toList()); + saveNotifications(messages); + + if (NotificationType.HIGH_CONSUMED_NETWORK_BANDWIDTH.equals(type)) { + final List runIds = filtered.stream() + .map(pair -> pair.getLeft().getId()).collect(Collectors.toList()); + monitoringNotificationDao.updateNotificationTimestamp(runIds, + NotificationType.HIGH_CONSUMED_NETWORK_BANDWIDTH); + } + } + + private NotificationMessage buildMessageForHighNetworkConsumingRun(final NotificationSettings settings, + final List ccUserIds, + final Map pipelineOwners, + final PipelineRun run, + final double bandwidth, + final double bandwidthLimit, + final NotificationType type) { + log.debug("Sending high network consuming run notification for run '{}'.", run.getId()); + final NotificationMessage message = new NotificationMessage(); + message.setTemplate(new NotificationTemplate(settings.getTemplateId())); + message.setTemplateParameters(parameterManager.build(type, run, bandwidth, bandwidthLimit)); + if (settings.isKeepInformedOwner()) { + message.setToUserId(pipelineOwners.getOrDefault(run.getOwner(), new PipelineUser()).getId()); + } + message.setCopyUserIds(ccUserIds); + return message; + } + @Override @Transactional(propagation = Propagation.REQUIRED) public void notifyHighResourceConsumingRuns(final List>> metrics, @@ -835,6 +898,15 @@ private boolean shouldNotifyIdleRun(final Long runId, final NotificationType not return shouldNotify(runId, notificationSettings); } + private boolean shouldNotifyHighNetworkConsumingRun(final Long runId, + final NotificationType notificationType, + final NotificationSettings notificationSettings) { + if (!NotificationType.HIGH_CONSUMED_NETWORK_BANDWIDTH.equals(notificationType)) { + return true; + } + return shouldNotify(runId, notificationSettings); + } + private Map parseRunExcludeParams() { final Map excludeParams = preferenceManager.getPreference( SystemPreferences.SYSTEM_NOTIFICATIONS_EXCLUDE_PARAMS); diff --git a/api/src/main/java/com/epam/pipeline/manager/notification/NotificationService.java b/api/src/main/java/com/epam/pipeline/manager/notification/NotificationService.java index 38536ba6c1..a1d05b3169 100644 --- a/api/src/main/java/com/epam/pipeline/manager/notification/NotificationService.java +++ b/api/src/main/java/com/epam/pipeline/manager/notification/NotificationService.java @@ -61,6 +61,11 @@ default void notifyIdleRuns(List> pipelineCpuRatePairs } + default void notifyHighNetworkConsumingRuns(List> pipelineCpuRatePairs, + NotificationType type) { + + } + default void notifyHighResourceConsumingRuns( List>> pipelinesMetrics, NotificationType notificationType) { diff --git a/api/src/main/java/com/epam/pipeline/manager/preference/PreferenceValidators.java b/api/src/main/java/com/epam/pipeline/manager/preference/PreferenceValidators.java index d85a4b82cd..4d60548351 100644 --- a/api/src/main/java/com/epam/pipeline/manager/preference/PreferenceValidators.java +++ b/api/src/main/java/com/epam/pipeline/manager/preference/PreferenceValidators.java @@ -28,6 +28,7 @@ import com.epam.pipeline.entity.execution.OSSpecificLaunchCommandTemplate; import com.epam.pipeline.entity.monitoring.IdleRunAction; import com.epam.pipeline.entity.monitoring.LongPausedRunAction; +import com.epam.pipeline.entity.monitoring.NetworkConsumingRunAction; import com.epam.pipeline.entity.preference.Preference; import com.epam.pipeline.security.ExternalServiceEndpoint; import com.epam.pipeline.utils.PipelineStringUtils; @@ -271,6 +272,9 @@ public static BiPredicate> isNullOrValidEnum(fin public static final BiPredicate> isValidIdleAction = (pref, ignored) -> IdleRunAction.contains(pref); + public static final BiPredicate> isValidNetworkConsumptionAction = + (pref, ignored) -> NetworkConsumingRunAction.contains(pref); + public static final BiPredicate> isValidLongPauseRunAction = (pref, ignored) -> LongPausedRunAction.contains(pref); diff --git a/api/src/main/java/com/epam/pipeline/manager/preference/SystemPreferences.java b/api/src/main/java/com/epam/pipeline/manager/preference/SystemPreferences.java index f49cf6f620..93e8015117 100644 --- a/api/src/main/java/com/epam/pipeline/manager/preference/SystemPreferences.java +++ b/api/src/main/java/com/epam/pipeline/manager/preference/SystemPreferences.java @@ -38,6 +38,7 @@ import com.epam.pipeline.entity.ldap.LdapBlockedUserSearchMethod; import com.epam.pipeline.entity.monitoring.IdleRunAction; import com.epam.pipeline.entity.monitoring.LongPausedRunAction; +import com.epam.pipeline.entity.monitoring.NetworkConsumingRunAction; import com.epam.pipeline.entity.notification.filter.NotificationFilter; import com.epam.pipeline.entity.metadata.CommonInstanceTagsType; import com.epam.pipeline.entity.pipeline.run.RunVisibilityPolicy; @@ -1152,6 +1153,21 @@ public class SystemPreferences { public static final BooleanPreference SYSTEM_NOTIFICATIONS_ENABLE = new BooleanPreference( "system.notifications.enable", false, SYSTEM_GROUP, pass); + public static final DoublePreference SYSTEM_POD_BANDWIDTH_LIMIT = + new DoublePreference("system.pod.bandwidth.limit", 300.0, SYSTEM_GROUP, isGreaterThan(0)); + + public static final IntPreference SYSTEM_MAX_POD_BANDWIDTH_LIMIT_TIMEOUT_MINUTES = + new IntPreference("system.max.pod.bandwidth.minutes", 30, SYSTEM_GROUP, isGreaterThan(0)); + + + public static final IntPreference SYSTEM_POD_BANDWIDTH_ACTION_BACKOFF_PERIOD = + new IntPreference("system.pod.bandwidth.action.backoff.period", 30, SYSTEM_GROUP, + isGreaterThan(0)); + + public static final StringPreference SYSTEM_POD_BANDWIDTH_ACTION = new StringPreference( + "system.pod.bandwidth.action", NetworkConsumingRunAction.NOTIFY.name(), + SYSTEM_GROUP, PreferenceValidators.isValidNetworkConsumptionAction); + // FireCloud Integration public static final ObjectPreference> FIRECLOUD_SCOPES = new ObjectPreference<>( "firecloud.api.scopes", null, new TypeReference>() {}, FIRECLOUD_GROUP, diff --git a/api/src/main/resources/dao/filter-dao.xml b/api/src/main/resources/dao/filter-dao.xml index 7f03792287..8b691e4b11 100644 --- a/api/src/main/resources/dao/filter-dao.xml +++ b/api/src/main/resources/dao/filter-dao.xml @@ -60,6 +60,7 @@ r.prolonged_at_time, r.last_notification_time, r.last_idle_notification_time, + r.last_network_consumption_notification_time, r.exec_preferences, r.pretty_url, r.price_per_hour, diff --git a/api/src/main/resources/dao/pipeline-run-dao.xml b/api/src/main/resources/dao/pipeline-run-dao.xml index 9b6d05f6f5..e745741794 100644 --- a/api/src/main/resources/dao/pipeline-run-dao.xml +++ b/api/src/main/resources/dao/pipeline-run-dao.xml @@ -171,6 +171,7 @@ r.prolonged_at_time, r.last_notification_time, r.last_idle_notification_time, + r.last_network_consumption_notification_time, r.exec_preferences, r.pretty_url, r.price_per_hour, @@ -266,6 +267,7 @@ prolonged_at_time, last_notification_time, last_idle_notification_time, + last_network_consumption_notification_time, exec_preferences, pretty_url, price_per_hour, @@ -332,6 +334,7 @@ prolonged_at_time, last_notification_time, last_idle_notification_time, + last_network_consumption_notification_time, exec_preferences, pretty_url, price_per_hour, @@ -398,6 +401,7 @@ prolonged_at_time, last_notification_time, last_idle_notification_time, + last_network_consumption_notification_time, exec_preferences, pretty_url, price_per_hour, @@ -577,7 +581,8 @@ @@ -635,6 +640,7 @@ r.prolonged_at_time, r.last_notification_time, r.last_idle_notification_time, + r.last_network_consumption_notification_time, r.exec_preferences, r.pretty_url, r.price_per_hour, @@ -703,6 +709,7 @@ r.prolonged_at_time, r.last_notification_time, r.last_idle_notification_time, + r.last_network_consumption_notification_time, r.exec_preferences, r.pretty_url, r.price_per_hour, @@ -770,6 +777,7 @@ r.prolonged_at_time, r.last_notification_time, r.last_idle_notification_time, + r.last_network_consumption_notification_time, r.exec_preferences, r.pretty_url, r.price_per_hour, @@ -836,6 +844,7 @@ active_run.prolonged_at_time, active_run.last_notification_time, active_run.last_idle_notification_time, + active_run.last_network_consumption_notification_time, active_run.exec_preferences, active_run.pretty_url, active_run.price_per_hour, @@ -915,6 +924,7 @@ r.prolonged_at_time, r.last_notification_time, r.last_idle_notification_time, + r.last_network_consumption_notification_time, r.exec_preferences, r.pretty_url, r.price_per_hour, @@ -978,6 +988,7 @@ active_run.prolonged_at_time, active_run.last_notification_time, active_run.last_idle_notification_time, + active_run.last_network_consumption_notification_time, active_run.exec_preferences, active_run.pretty_url, active_run.price_per_hour, @@ -1053,6 +1064,7 @@ r.prolonged_at_time, r.last_notification_time, r.last_idle_notification_time, + r.last_network_consumption_notification_time, r.exec_preferences, r.pretty_url, r.price_per_hour, @@ -1117,6 +1129,7 @@ r.prolonged_at_time, r.last_notification_time, r.last_idle_notification_time, + r.last_network_consumption_notification_time, r.exec_preferences, r.pretty_url, r.price_per_hour, @@ -1181,6 +1194,7 @@ r.prolonged_at_time, r.last_notification_time, r.last_idle_notification_time, + r.last_network_consumption_notification_time, r.exec_preferences, r.pretty_url, r.price_per_hour, @@ -1272,6 +1286,7 @@ runs.prolonged_at_time, runs.last_notification_time, runs.last_idle_notification_time, + runs.last_network_consumption_notification_time, runs.exec_preferences, runs.pretty_url, runs.price_per_hour, @@ -1351,6 +1366,7 @@ p.prolonged_at_time, p.last_notification_time, p.last_idle_notification_time, + p.last_network_consumption_notification_time, p.exec_preferences, p.pretty_url, p.price_per_hour, @@ -1411,6 +1427,7 @@ c.prolonged_at_time, c.last_notification_time, c.last_idle_notification_time, + c.last_network_consumption_notification_time, c.exec_preferences, c.pretty_url, c.price_per_hour, @@ -1469,6 +1486,7 @@ runs.prolonged_at_time, runs.last_notification_time, runs.last_idle_notification_time, + runs.last_network_consumption_notification_time, runs.exec_preferences, runs.pretty_url, runs.price_per_hour, @@ -1545,6 +1563,7 @@ r.prolonged_at_time, r.last_notification_time, r.last_idle_notification_time, + r.last_network_consumption_notification_time, r.exec_preferences, r.pretty_url, r.price_per_hour, @@ -1711,6 +1730,7 @@ prolonged_at_time, last_notification_time, last_idle_notification_time, + last_network_consumption_notification_time, exec_preferences, pretty_url, price_per_hour, @@ -1791,6 +1811,7 @@ prolonged_at_time, last_notification_time, last_idle_notification_time, + last_network_consumption_notification_time, exec_preferences, pretty_url, price_per_hour, @@ -1857,6 +1878,7 @@ prolonged_at_time, last_notification_time, last_idle_notification_time, + last_network_consumption_notification_time, exec_preferences, pretty_url, price_per_hour, @@ -1923,6 +1945,7 @@ prolonged_at_time, last_notification_time, last_idle_notification_time, + last_network_consumption_notification_time, exec_preferences, pretty_url, price_per_hour, @@ -1989,6 +2012,7 @@ prolonged_at_time, last_notification_time, last_idle_notification_time, + last_network_consumption_notification_time, exec_preferences, pretty_url, price_per_hour, @@ -2065,6 +2089,7 @@ prolonged_at_time, last_notification_time, last_idle_notification_time, + last_network_consumption_notification_time, exec_preferences, pretty_url, price_per_hour, @@ -2131,6 +2156,7 @@ prolonged_at_time, last_notification_time, last_idle_notification_time, + last_network_consumption_notification_time, exec_preferences, pretty_url, price_per_hour, diff --git a/api/src/main/resources/db/migration/v2024.07.17_14.00__issue_3602_added_last_network_consumption_notification_time_column.sql b/api/src/main/resources/db/migration/v2024.07.17_14.00__issue_3602_added_last_network_consumption_notification_time_column.sql new file mode 100644 index 0000000000..43ab6b46c9 --- /dev/null +++ b/api/src/main/resources/db/migration/v2024.07.17_14.00__issue_3602_added_last_network_consumption_notification_time_column.sql @@ -0,0 +1,2 @@ +-- a field to record last time the notification on high network consuming run was issued +ALTER TABLE pipeline.pipeline_run ADD COLUMN last_network_consumption_notification_time TIMESTAMP WITH TIME ZONE; \ No newline at end of file diff --git a/api/src/main/resources/messages.properties b/api/src/main/resources/messages.properties index fee5582b48..624aa29aa6 100644 --- a/api/src/main/resources/messages.properties +++ b/api/src/main/resources/messages.properties @@ -211,6 +211,9 @@ debug.run.idle.skip.check=Run {0} marked as nonPause or is cluster run! Skip. debug.run.not.idled=Run {0} has CPU usage rate: {1} considered as not idled, tag will be cleared. debug.run.has.not.node.name=Pipeline with id: {0} has not node name. debug.memory.metrics.received=Memory and disk metrics received ''{0}'' +info.run.high.network.consumption.notify=Pipeline Run {0} is high network consuming: bandwidth: {1}, notification will be sent +debug.network.run.metrics.received=Network metrics ''{0}'' +debug.run.not.network.consuming=Run {0} has network bandwidth: {1} considered as not high consuming, tag will be cleared. # Kubernetes diff --git a/api/src/test/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManagerTest.java b/api/src/test/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManagerTest.java index 39ad2a6d33..df2251e1f2 100644 --- a/api/src/test/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManagerTest.java +++ b/api/src/test/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManagerTest.java @@ -22,6 +22,7 @@ import com.epam.pipeline.entity.cluster.monitoring.ELKUsageMetric; import com.epam.pipeline.entity.monitoring.IdleRunAction; import com.epam.pipeline.entity.monitoring.LongPausedRunAction; +import com.epam.pipeline.entity.monitoring.NetworkConsumingRunAction; import com.epam.pipeline.entity.notification.NotificationType; import com.epam.pipeline.entity.pipeline.PipelineRun; import com.epam.pipeline.entity.pipeline.RunInstance; @@ -95,8 +96,11 @@ public class ResourceMonitoringManagerTest { private static final long TEST_AUTOSCALE_RUN_ID = 6; private static final int TEST_HIGH_CONSUMING_RUN_LOAD = 80; private static final double TEST_IDLE_ON_DEMAND_RUN_CPU_LOAD = 200.0; + private static final double TEST_POD_BANDWIDTH_LIMIT = 300.0; + private static final int TEST_POD_BANDWIDTH_ACTION_BACKOFF_PERIOD = 30; private static final Integer TEST_RESOURCE_MONITORING_DELAY = 111; private static final int TEST_MAX_IDLE_MONITORING_TIMEOUT = 30; + private static final int TEST_MAX_POD_BANDWIDTH_LIMIT_TIMEOUT_MINUTES = 30; private static final int TEST_IDLE_THRESHOLD_PERCENT = 30; private static final double NON_IDLE_CPU_LOAD = 700.0; private static final double MILICORES_TO_CORES = 1000.0; @@ -203,6 +207,14 @@ public void setUp() throws Exception { when(preferenceManager.getPreference(SystemPreferences.SYSTEM_LONG_PAUSED_ACTION_TIMEOUT_MINUTES)) .thenReturn(LONG_PAUSED_ACTION_TIMEOUT); when(stopServerlessRunManager.loadActiveServerlessRuns()).thenReturn(Collections.emptyList()); + when(preferenceManager.getPreference(SystemPreferences.SYSTEM_MAX_POD_BANDWIDTH_LIMIT_TIMEOUT_MINUTES)) + .thenReturn(TEST_MAX_POD_BANDWIDTH_LIMIT_TIMEOUT_MINUTES); + when(preferenceManager.getPreference(SystemPreferences.SYSTEM_POD_BANDWIDTH_LIMIT)) + .thenReturn(TEST_POD_BANDWIDTH_LIMIT); + when(preferenceManager.getPreference(SystemPreferences.SYSTEM_POD_BANDWIDTH_ACTION_BACKOFF_PERIOD)) + .thenReturn(TEST_POD_BANDWIDTH_ACTION_BACKOFF_PERIOD); + when(preferenceManager.getPreference(SystemPreferences.SYSTEM_POD_BANDWIDTH_ACTION)) + .thenReturn(NetworkConsumingRunAction.NOTIFY.name()); SecurityContext context = SecurityContextHolder.createEmptyContext(); UserContext userContext = new UserContext(1L, "admin"); @@ -313,10 +325,10 @@ public void testNotifyOnce() { resourceMonitoringManager.monitorResourceUsage(); - verify(pipelineRunManager).updatePipelineRunsLastNotification(runsToUpdateCaptor.capture()); + verify(pipelineRunManager, times(2)).updatePipelineRunsLastNotification(runsToUpdateCaptor.capture()); verify(notificationManager).notifyIdleRuns(runsToNotifyIdleCaptor.capture(), eq(NotificationType.IDLE_RUN)); - List updatedRuns = runsToUpdateCaptor.getValue(); + List updatedRuns = runsToUpdateCaptor.getAllValues().get(0); Assert.assertEquals(2, updatedRuns.size()); Assert.assertTrue(updatedRuns.stream().anyMatch(r -> r.getPodId().equals(idleSpotRun.getPodId()))); Assert.assertTrue(updatedRuns.stream().anyMatch(r -> r.getPodId().equals(idleOnDemandRun.getPodId()))); @@ -358,10 +370,10 @@ public void testSkipProlongRun() { //First time checks that notification is sent resourceMonitoringManager.monitorResourceUsage(); - verify(pipelineRunManager).updatePipelineRunsLastNotification(runsToUpdateCaptor.capture()); + verify(pipelineRunManager, times(2)).updatePipelineRunsLastNotification(runsToUpdateCaptor.capture()); verify(notificationManager).notifyIdleRuns(runsToNotifyIdleCaptor.capture(), eq(NotificationType.IDLE_RUN)); - List updatedRuns = runsToUpdateCaptor.getValue(); + List updatedRuns = runsToUpdateCaptor.getAllValues().get(0); Assert.assertEquals(1, updatedRuns.size()); Assert.assertTrue(updatedRuns.stream().anyMatch(r -> r.getPodId().equals(idleRunToProlong.getPodId()))); @@ -376,7 +388,7 @@ public void testSkipProlongRun() { idleRunToProlong.setLastIdleNotificationTime(null); resourceMonitoringManager.monitorResourceUsage(); - verify(pipelineRunManager, times(2)) + verify(pipelineRunManager, times(4)) .updatePipelineRunsLastNotification(runsToUpdateCaptor.capture()); verify(notificationManager, times(2)) .notifyIdleRuns(runsToNotifyIdleCaptor.capture(), eq(NotificationType.IDLE_RUN)); @@ -393,12 +405,12 @@ public void testSkipProlongRun() { resourceMonitoringManager.monitorResourceUsage(); - verify(pipelineRunManager, times(3)) + verify(pipelineRunManager, times(6)) .updatePipelineRunsLastNotification(runsToUpdateCaptor.capture()); verify(notificationManager, times(3)) .notifyIdleRuns(runsToNotifyIdleCaptor.capture(), eq(NotificationType.IDLE_RUN)); - updatedRuns = runsToUpdateCaptor.getValue(); + updatedRuns = runsToUpdateCaptor.getAllValues().get(0); Assert.assertEquals(1, updatedRuns.size()); Assert.assertTrue(updatedRuns.stream().anyMatch(r -> r.getPodId().equals(idleRunToProlong.getPodId()))); @@ -406,8 +418,6 @@ public void testSkipProlongRun() { Assert.assertEquals(1, runsToNotify.size()); Assert.assertTrue(runsToNotify.stream() .anyMatch(r -> r.getLeft().getPodId().equals(idleRunToProlong.getPodId()))); - - } @Test @@ -421,10 +431,10 @@ public void testNotifyTwice() throws InterruptedException { resourceMonitoringManager.monitorResourceUsage(); - verify(pipelineRunManager).updatePipelineRunsLastNotification(runsToUpdateCaptor.capture()); + verify(pipelineRunManager, times(2)).updatePipelineRunsLastNotification(runsToUpdateCaptor.capture()); verify(notificationManager).notifyIdleRuns(runsToNotifyIdleCaptor.capture(), eq(NotificationType.IDLE_RUN)); - List updatedRuns = runsToUpdateCaptor.getValue(); + List updatedRuns = runsToUpdateCaptor.getAllValues().get(0); Assert.assertEquals(2, updatedRuns.size()); Assert.assertFalse(updatedRuns.stream() .anyMatch(r -> r.getLastIdleNotificationTime().equals(lastNotificationDate))); @@ -460,11 +470,12 @@ public void testPauseOnDemand() throws InterruptedException { resourceMonitoringManager.monitorResourceUsage(); - verify(pipelineRunManager).updatePipelineRunsLastNotification(runsToUpdateCaptor.capture()); + verify(pipelineRunManager, times(2)) + .updatePipelineRunsLastNotification(runsToUpdateCaptor.capture()); verify(notificationManager).notifyIdleRuns(runsToNotifyIdleCaptor.capture(), eq(NotificationType.IDLE_RUN)); verify(notificationManager).notifyIdleRuns(any(), eq(NotificationType.IDLE_RUN_PAUSED)); - List updatedRuns = runsToUpdateCaptor.getValue(); + List updatedRuns = runsToUpdateCaptor.getAllValues().get(0); Assert.assertEquals(2, updatedRuns.size()); Assert.assertFalse(updatedRuns.stream() .anyMatch(r -> lastNotificationDate.equals(r.getLastIdleNotificationTime()))); @@ -511,12 +522,12 @@ public void testPauseOrStop() throws InterruptedException { resourceMonitoringManager.monitorResourceUsage(); - verify(pipelineRunManager).updatePipelineRunsLastNotification(runsToUpdateCaptor.capture()); + verify(pipelineRunManager, times(2)).updatePipelineRunsLastNotification(runsToUpdateCaptor.capture()); verify(notificationManager).notifyIdleRuns(runsToNotifyIdleCaptor.capture(), eq(NotificationType.IDLE_RUN)); verify(notificationManager).notifyIdleRuns(any(), eq(NotificationType.IDLE_RUN_STOPPED)); verify(notificationManager).notifyIdleRuns(any(), eq(NotificationType.IDLE_RUN_PAUSED)); - List updatedRuns = runsToUpdateCaptor.getValue(); + List updatedRuns = runsToUpdateCaptor.getAllValues().get(0); Assert.assertEquals(2, updatedRuns.size()); Assert.assertNull(updatedRuns.stream() .filter(r -> r.getPodId().equals(idleOnDemandRun.getPodId())) @@ -542,14 +553,14 @@ public void testStop() throws InterruptedException { resourceMonitoringManager.monitorResourceUsage(); - verify(pipelineRunManager).updatePipelineRunsLastNotification(runsToUpdateCaptor.capture()); + verify(pipelineRunManager, times(2)).updatePipelineRunsLastNotification(runsToUpdateCaptor.capture()); verify(notificationManager).notifyIdleRuns(runsToNotifyIdleCaptor.capture(), eq(NotificationType.IDLE_RUN)); verify(notificationManager, times(2)).notifyIdleRuns(any(), eq(NotificationType.IDLE_RUN_STOPPED)); Assert.assertTrue(runsToNotifyIdleCaptor.getValue().isEmpty()); - List updatedRuns = runsToUpdateCaptor.getValue(); + List updatedRuns = runsToUpdateCaptor.getAllValues().get(0); Assert.assertEquals(2, updatedRuns.size()); verify(pipelineRunManager).stop(TEST_IDLE_ON_DEMAND_RUN_ID); @@ -569,10 +580,10 @@ public void testRemoveLastNotificationTimeIfNotIdle() throws InterruptedExceptio idleSpotRun.setTags(new HashMap<>()); resourceMonitoringManager.monitorResourceUsage(); - verify(pipelineRunManager).updatePipelineRunsLastNotification(runsToUpdateCaptor.capture()); + verify(pipelineRunManager, times(2)).updatePipelineRunsLastNotification(runsToUpdateCaptor.capture()); verify(notificationManager).notifyIdleRuns(runsToNotifyIdleCaptor.capture(), eq(NotificationType.IDLE_RUN)); - List updatedRuns = runsToUpdateCaptor.getValue(); + List updatedRuns = runsToUpdateCaptor.getAllValues().get(0); Assert.assertEquals(2, updatedRuns.size()); Assert.assertNull(updatedRuns.stream() diff --git a/core/src/main/java/com/epam/pipeline/entity/notification/NotificationType.java b/core/src/main/java/com/epam/pipeline/entity/notification/NotificationType.java index 282e1e336d..eb8e583913 100644 --- a/core/src/main/java/com/epam/pipeline/entity/notification/NotificationType.java +++ b/core/src/main/java/com/epam/pipeline/entity/notification/NotificationType.java @@ -70,7 +70,9 @@ public enum NotificationType { LDAP_BLOCKED_USERS(19, -1L, -1L, Collections.emptyList(), true, NotificationGroup.USER), LDAP_BLOCKED_POSTPONED_USERS(20, -1L, -1L, Collections.emptyList(), true, - NotificationGroup.USER); + NotificationGroup.USER), + HIGH_CONSUMED_NETWORK_BANDWIDTH(21, -1L, -1L, Collections.emptyList(), true, + NotificationGroup.RESOURCE_CONSUMING); private static final Map BY_ID; diff --git a/core/src/main/java/com/epam/pipeline/entity/pipeline/PipelineRun.java b/core/src/main/java/com/epam/pipeline/entity/pipeline/PipelineRun.java index 8d80f8be22..e7f63a53c9 100644 --- a/core/src/main/java/com/epam/pipeline/entity/pipeline/PipelineRun.java +++ b/core/src/main/java/com/epam/pipeline/entity/pipeline/PipelineRun.java @@ -104,6 +104,7 @@ public class PipelineRun extends AbstractSecuredEntity { * Last time the notification on idle pipeline was issued */ private LocalDateTime lastIdleNotificationTime; + private LocalDateTime lastNetworkConsumptionNotificationTime; private LocalDateTime prolongedAtTime; private ExecutionPreferences executionPreferences = ExecutionPreferences.getDefault(); private String prettyUrl; From 1bf66febb54a947c153d45dda6e11e500bc028cc Mon Sep 17 00:00:00 2001 From: Oksana_Kolesnikova Date: Tue, 16 Jul 2024 10:19:07 +0300 Subject: [PATCH 2/8] Issue 3602 Pod nework bandwidth usage --- .../entity/monitoring/NetworkConsumingRunAction.java | 1 + .../ResourceMonitoringManager.java | 10 ++++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/api/src/main/java/com/epam/pipeline/entity/monitoring/NetworkConsumingRunAction.java b/api/src/main/java/com/epam/pipeline/entity/monitoring/NetworkConsumingRunAction.java index 990454bc80..f9560c0f76 100644 --- a/api/src/main/java/com/epam/pipeline/entity/monitoring/NetworkConsumingRunAction.java +++ b/api/src/main/java/com/epam/pipeline/entity/monitoring/NetworkConsumingRunAction.java @@ -27,6 +27,7 @@ * {@code SystemPreferences.SYSTEM_POD_BANDWIDTH_ACTION_BACKOFF_PERIOD} */ public enum NetworkConsumingRunAction { + LIMIT_BANDWIDTH, /** * Just notify the owner of a Run */ diff --git a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java index a3e7a983e3..99e6b7644c 100644 --- a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java +++ b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java @@ -454,7 +454,7 @@ private void processHighNetworkConsumingRun(PipelineRun run, int actionTimeout, Double networkBandwidthLevel, List runsToUpdateTags) { if (shouldPerformActionOnNetworkConsumingRun(run, actionTimeout)) { performActionOnNetworkConsumingRun(run, action, networkBandwidthLevel, pipelinesToNotify, - runsToUpdateNotificationTime, runsToUpdateTags); + runsToUpdateNotificationTime); return; } if (Objects.isNull(run.getLastNetworkConsumptionNotificationTime())) { @@ -518,12 +518,14 @@ private void performActionOnNetworkConsumingRun(final PipelineRun run, final NetworkConsumingRunAction action, final double networkBandwidthLevel, final List> pipelinesToNotify, - final List runsToUpdate, - final List runsToUpdateTags) { + final List runsToUpdate) { log.info(messageHelper.getMessage(MessageConstants.INFO_RUN_IDLE_ACTION, run.getPodId(), networkBandwidthLevel, action)); switch (action) { - default: + case LIMIT_BANDWIDTH: +// TODO + break; + case NOTIFY: performHighNetworkConsumingNotify(run, networkBandwidthLevel, pipelinesToNotify); } runsToUpdate.add(run); From 38511bc80d2dad75ed824a1d2a7b1dc08ccfefd5 Mon Sep 17 00:00:00 2001 From: Oksana_Kolesnikova Date: Tue, 16 Jul 2024 10:25:23 +0300 Subject: [PATCH 3/8] Issue 3602 Pod nework bandwidth usage --- .../performancemonitoring/ResourceMonitoringManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java index 99e6b7644c..1a608045ab 100644 --- a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java +++ b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java @@ -525,7 +525,7 @@ private void performActionOnNetworkConsumingRun(final PipelineRun run, case LIMIT_BANDWIDTH: // TODO break; - case NOTIFY: + default: performHighNetworkConsumingNotify(run, networkBandwidthLevel, pipelinesToNotify); } runsToUpdate.add(run); From 52bd6223b22948e2c4e457f29484ec01a58930b1 Mon Sep 17 00:00:00 2001 From: Oksana_Kolesnikova Date: Wed, 17 Jul 2024 10:40:26 +0300 Subject: [PATCH 4/8] Issue 3602 Pod nework bandwidth usage - fix remarks --- .../ESMonitoringManager.java | 14 ++++- .../ResourceMonitoringManager.java | 46 ++++++++++++----- .../notification/NotificationManager.java | 5 +- .../NotificationParameterManager.java | 10 ++++ .../manager/preference/SystemPreferences.java | 4 +- .../ResourceMonitoringManagerTest.java | 28 ++++++++-- .../HIGH_CONSUMED_NETWORK_BANDWIDTH.json | 8 +++ .../HIGH_CONSUMED_NETWORK_BANDWIDTH.html | 51 +++++++++++++++++++ 8 files changed, 144 insertions(+), 22 deletions(-) create mode 100644 deploy/contents/install/email-templates/configs/HIGH_CONSUMED_NETWORK_BANDWIDTH.json create mode 100644 deploy/contents/install/email-templates/contents/HIGH_CONSUMED_NETWORK_BANDWIDTH.html diff --git a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ESMonitoringManager.java b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ESMonitoringManager.java index 43d53941f0..d9b85e2019 100644 --- a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ESMonitoringManager.java +++ b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ESMonitoringManager.java @@ -161,7 +161,13 @@ private LocalDateTime fallbackMonitoringStart() { private List getStats(final String nodeName, final LocalDateTime start, final LocalDateTime end, final Duration interval) { - return Stream.of(MONITORING_METRICS) + return getStats(MONITORING_METRICS, nodeName, start, end, interval); + } + + private List getStats(final ELKUsageMetric[] monitoringMetrics, final String nodeName, + final LocalDateTime start, final LocalDateTime end, + final Duration interval) { + return Stream.of(monitoringMetrics) .map(it -> AbstractMetricRequester.getStatsRequester(it, client)) .map(it -> it.requestStats(nodeName, start, end, interval)) .flatMap(List::stream) @@ -179,6 +185,12 @@ private List getStats(final String nodeName, final LocalDateTim .collect(Collectors.toList()); } + public List getStats(final ELKUsageMetric[] monitoringMetrics, final String nodeName, + final LocalDateTime start, final LocalDateTime end) { + final Duration interval = interval(start, end); + return getStats(monitoringMetrics, nodeName, start, end, interval); + } + private Duration interval(final LocalDateTime start, final LocalDateTime end) { final Duration requested = Duration.between(start, end).dividedBy(Math.max(1, numberOfIntervals() - 1)); final Duration minimal = minimalDuration(); diff --git a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java index 1a608045ab..7f9447f4e4 100644 --- a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java +++ b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java @@ -24,15 +24,17 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; -import java.util.Set; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.Collection; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import com.epam.pipeline.entity.cluster.monitoring.ELKUsageMetric; +import com.epam.pipeline.entity.cluster.monitoring.MonitoringStats; import com.epam.pipeline.entity.monitoring.NetworkConsumingRunAction; import com.epam.pipeline.entity.monitoring.LongPausedRunAction; import com.epam.pipeline.entity.pipeline.StopServerlessRun; @@ -125,6 +127,7 @@ static class ResourceMonitoringManagerCore { private final PreferenceManager preferenceManager; private final StopServerlessRunManager stopServerlessRunManager; private final InstanceOfferManager instanceOfferManager; + private final ESMonitoringManager monitoringManager; @Autowired ResourceMonitoringManagerCore(final PipelineRunManager pipelineRunManager, @@ -135,7 +138,8 @@ static class ResourceMonitoringManagerCore { final PreferenceManager preferenceManager, final StopServerlessRunManager stopServerlessRunManager, final InstanceOfferManager instanceOfferManager, - final RunStatusManager runStatusManager) { + final RunStatusManager runStatusManager, + final ESMonitoringManager monitoringManager) { this.pipelineRunManager = pipelineRunManager; this.pipelineRunDockerOperationManager = pipelineRunDockerOperationManager; this.messageHelper = messageHelper; @@ -145,6 +149,7 @@ static class ResourceMonitoringManagerCore { this.stopServerlessRunManager = stopServerlessRunManager; this.instanceOfferManager = instanceOfferManager; this.runStatusManager = runStatusManager; + this.monitoringManager = monitoringManager; } @Scheduled(cron = "0 0 0 ? * *") @@ -430,14 +435,12 @@ private void processHighNetworkConsumingRuns(final List runs) { "NETWORK", running.size(), String.join(", ", running.keySet()))); final LocalDateTime now = DateUtils.nowUTC(); - final Map networkMetrics = monitoringDao.loadMetrics(ELKUsageMetric.NETWORK, - running.keySet(), now.minusMinutes(bandwidthLimitTimeout + ONE), now); - log.debug(messageHelper.getMessage(MessageConstants.DEBUG_NETWORK_RUN_METRICS_RECEIVED, - networkMetrics.entrySet().stream().map(e -> e.getKey() + ":" + e.getValue()) - .collect(Collectors.joining(", "))) - ); + final Map> networkMetrics = running.keySet().stream() + .collect(Collectors.toMap(nodeName -> nodeName, nodeName -> + monitoringManager.getStats(new ELKUsageMetric[]{ELKUsageMetric.NETWORK}, + nodeName, now.minusMinutes(bandwidthLimitTimeout + ONE), now))); - final double bandwidthLimit = preferenceManager.getPreference( + final long bandwidthLimit = preferenceManager.getPreference( SystemPreferences.SYSTEM_POD_BANDWIDTH_LIMIT); final int actionTimeout = preferenceManager.getPreference( SystemPreferences.SYSTEM_POD_BANDWIDTH_ACTION_BACKOFF_PERIOD); @@ -471,19 +474,34 @@ private void processHighNetworkConsumingRun(PipelineRun run, int actionTimeout, } private void processHighNetworkConsumingRuns(Map running, - Map networkMetrics, - double bandwidthLimit, + Map> networkMetrics, + long bandwidthLimit, int actionTimeout, NetworkConsumingRunAction action) { final List runsToUpdateNotificationTime = new ArrayList<>(running.size()); final List> runsToNotify = new ArrayList<>(running.size()); final List runsToUpdateTags = new ArrayList<>(running.size()); for (Map.Entry entry : running.entrySet()) { PipelineRun run = entry.getValue(); - Double metric = networkMetrics.get(entry.getKey()); + List metric = networkMetrics.get(entry.getKey()); if (metric != null) { - if (Precision.compareTo(metric, bandwidthLimit, ONE_THOUSANDTH) >= 0) { + List rxBytes = metric.stream() + .map(m -> m.getNetworkUsage().getStatsByInterface().values().stream() + .map(MonitoringStats.NetworkUsage.NetworkStats::getRxBytes) + .collect(Collectors.toList())) + .flatMap(Collection::stream).collect(Collectors.toList()); + boolean rxExceeds = rxBytes.stream().allMatch(v -> v >= bandwidthLimit); + List txBytes = metric.stream() + .map(m -> m.getNetworkUsage().getStatsByInterface().values().stream() + .map(MonitoringStats.NetworkUsage.NetworkStats::getTxBytes) + .collect(Collectors.toList())) + .flatMap(Collection::stream).collect(Collectors.toList()); + boolean txExceeds = txBytes.stream().allMatch(v -> v >= bandwidthLimit); + if (rxExceeds || txExceeds) { + long rxMax = Collections.max(rxBytes); + long txMax = Collections.max(txBytes); processHighNetworkConsumingRun(run, actionTimeout, action, runsToNotify, - runsToUpdateNotificationTime, metric, runsToUpdateTags); + runsToUpdateNotificationTime, (double) (Math.max(rxMax, txMax) / 1024), + runsToUpdateTags); } else if (run.getLastNetworkConsumptionNotificationTime() != null) { // No action is longer needed, clear timeout log.debug(messageHelper.getMessage(MessageConstants.DEBUG_RUN_NOT_NETWORK_CONSUMING, diff --git a/api/src/main/java/com/epam/pipeline/manager/notification/NotificationManager.java b/api/src/main/java/com/epam/pipeline/manager/notification/NotificationManager.java index 87ae798775..4de40c28b2 100644 --- a/api/src/main/java/com/epam/pipeline/manager/notification/NotificationManager.java +++ b/api/src/main/java/com/epam/pipeline/manager/notification/NotificationManager.java @@ -374,7 +374,7 @@ public void notifyHighNetworkConsumingRuns(final List> .collect(Collectors.toList()); final List messages = filtered.stream() .map(pair -> buildMessageForHighNetworkConsumingRun(settings, ccUserIds, pipelineOwners, pair.getLeft(), - pair.getRight(), bandwidthLimit, type)) + pair.getRight(), bandwidthLimit / 1024, type)) .collect(Collectors.toList()); saveNotifications(messages); @@ -396,7 +396,8 @@ private NotificationMessage buildMessageForHighNetworkConsumingRun(final Notific log.debug("Sending high network consuming run notification for run '{}'.", run.getId()); final NotificationMessage message = new NotificationMessage(); message.setTemplate(new NotificationTemplate(settings.getTemplateId())); - message.setTemplateParameters(parameterManager.build(type, run, bandwidth, bandwidthLimit)); + message.setTemplateParameters(parameterManager.buildHighNetworkConsumingRunParams(type, run, + bandwidth, bandwidthLimit)); if (settings.isKeepInformedOwner()) { message.setToUserId(pipelineOwners.getOrDefault(run.getOwner(), new PipelineUser()).getId()); } diff --git a/api/src/main/java/com/epam/pipeline/manager/notification/NotificationParameterManager.java b/api/src/main/java/com/epam/pipeline/manager/notification/NotificationParameterManager.java index c262959a83..61a738062a 100644 --- a/api/src/main/java/com/epam/pipeline/manager/notification/NotificationParameterManager.java +++ b/api/src/main/java/com/epam/pipeline/manager/notification/NotificationParameterManager.java @@ -90,6 +90,16 @@ public Map build(final NotificationType type, final PipelineRun return parameters; } + public Map buildHighNetworkConsumingRunParams(final NotificationType type, final PipelineRun run, + final double bandwidth, final double bandwidthLimit) { + final Map parameters = build(type); + parameters.putAll(buildEntities(NotificationEntityClass.RUN, run.getId())); + parameters.putAll(PipelineRunMapper.map(run)); + parameters.put("bandwidth", bandwidth); + parameters.put("bandwidthLimit", bandwidthLimit); + return parameters; + } + public Map build(final NotificationType type, final PipelineRun run, final Map metrics, diff --git a/api/src/main/java/com/epam/pipeline/manager/preference/SystemPreferences.java b/api/src/main/java/com/epam/pipeline/manager/preference/SystemPreferences.java index 93e8015117..efd5a39885 100644 --- a/api/src/main/java/com/epam/pipeline/manager/preference/SystemPreferences.java +++ b/api/src/main/java/com/epam/pipeline/manager/preference/SystemPreferences.java @@ -1153,8 +1153,8 @@ public class SystemPreferences { public static final BooleanPreference SYSTEM_NOTIFICATIONS_ENABLE = new BooleanPreference( "system.notifications.enable", false, SYSTEM_GROUP, pass); - public static final DoublePreference SYSTEM_POD_BANDWIDTH_LIMIT = - new DoublePreference("system.pod.bandwidth.limit", 300.0, SYSTEM_GROUP, isGreaterThan(0)); + public static final LongPreference SYSTEM_POD_BANDWIDTH_LIMIT = + new LongPreference("system.pod.bandwidth.limit", 300000L, SYSTEM_GROUP, isGreaterThan(0)); public static final IntPreference SYSTEM_MAX_POD_BANDWIDTH_LIMIT_TIMEOUT_MINUTES = new IntPreference("system.max.pod.bandwidth.minutes", 30, SYSTEM_GROUP, isGreaterThan(0)); diff --git a/api/src/test/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManagerTest.java b/api/src/test/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManagerTest.java index df2251e1f2..a92f1e3d50 100644 --- a/api/src/test/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManagerTest.java +++ b/api/src/test/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManagerTest.java @@ -20,6 +20,7 @@ import com.epam.pipeline.dao.monitoring.MonitoringESDao; import com.epam.pipeline.entity.cluster.InstanceType; import com.epam.pipeline.entity.cluster.monitoring.ELKUsageMetric; +import com.epam.pipeline.entity.cluster.monitoring.MonitoringStats; import com.epam.pipeline.entity.monitoring.IdleRunAction; import com.epam.pipeline.entity.monitoring.LongPausedRunAction; import com.epam.pipeline.entity.monitoring.NetworkConsumingRunAction; @@ -72,6 +73,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; @@ -96,7 +99,7 @@ public class ResourceMonitoringManagerTest { private static final long TEST_AUTOSCALE_RUN_ID = 6; private static final int TEST_HIGH_CONSUMING_RUN_LOAD = 80; private static final double TEST_IDLE_ON_DEMAND_RUN_CPU_LOAD = 200.0; - private static final double TEST_POD_BANDWIDTH_LIMIT = 300.0; + private static final long TEST_POD_BANDWIDTH_LIMIT = 100000; private static final int TEST_POD_BANDWIDTH_ACTION_BACKOFF_PERIOD = 30; private static final Integer TEST_RESOURCE_MONITORING_DELAY = 111; private static final int TEST_MAX_IDLE_MONITORING_TIMEOUT = 30; @@ -146,6 +149,8 @@ public class ResourceMonitoringManagerTest { private PipelineRunDockerOperationManager pipelineRunDockerOperationManager; @Mock private RunStatusManager runStatusManager; + @Mock + private ESMonitoringManager monitoringManager; @Captor ArgumentCaptor> runsToUpdateCaptor; @@ -178,7 +183,8 @@ public void setUp() throws Exception { preferenceManager, stopServerlessRunManager, instanceOfferManager, - runStatusManager); + runStatusManager, + monitoringManager); resourceMonitoringManager = new ResourceMonitoringManager(core); Whitebox.setInternalState(resourceMonitoringManager, "authManager", authManager); Whitebox.setInternalState(resourceMonitoringManager, "preferenceManager", preferenceManager); @@ -215,6 +221,7 @@ public void setUp() throws Exception { .thenReturn(TEST_POD_BANDWIDTH_ACTION_BACKOFF_PERIOD); when(preferenceManager.getPreference(SystemPreferences.SYSTEM_POD_BANDWIDTH_ACTION)) .thenReturn(NetworkConsumingRunAction.NOTIFY.name()); + when(monitoringManager.getStats(any(), any(), any(), any())).thenReturn(createStatsList()); SecurityContext context = SecurityContextHolder.createEmptyContext(); UserContext userContext = new UserContext(1L, "admin"); @@ -362,7 +369,7 @@ public void testSkipProlongRun() { when(pipelineRunManager.loadPipelineRun(idleRunToProlong.getId())).thenReturn(idleRunToProlong); when(monitoringESDao.loadMetrics(eq(ELKUsageMetric.CPU), any(), any(LocalDateTime.class), any(LocalDateTime.class))) - .thenReturn(Collections.singletonMap(idleRunToProlong.getInstance().getNodeName(), + .thenReturn(Collections.singletonMap(idleRunToProlong.getInstance().getNodeName(), TEST_IDLE_ON_DEMAND_RUN_CPU_LOAD)); when(preferenceManager.getPreference(SystemPreferences.SYSTEM_IDLE_ACTION)) .thenReturn(IdleRunAction.NOTIFY.name()); @@ -753,4 +760,19 @@ private HashMap getMockedHighConsumingStats() { stats.put(okayRun.getInstance().getNodeName(), TEST_HIGH_CONSUMING_RUN_LOAD / PERCENTS - DELTA); return stats; } + + private List createStatsList() { + return IntStream.range(0, 2).mapToObj(i -> createMonitoringStats()).collect(Collectors.toList()); + } + + private MonitoringStats createMonitoringStats() { + final MonitoringStats stats = new MonitoringStats(); + final MonitoringStats.NetworkUsage networkUsage = new MonitoringStats.NetworkUsage(); + final MonitoringStats.NetworkUsage.NetworkStats value = new MonitoringStats.NetworkUsage.NetworkStats(); + value.setRxBytes(101000); + value.setTxBytes(202000); + networkUsage.setStatsByInterface(Collections.singletonMap("interface", value)); + stats.setNetworkUsage(networkUsage); + return stats; + } } diff --git a/deploy/contents/install/email-templates/configs/HIGH_CONSUMED_NETWORK_BANDWIDTH.json b/deploy/contents/install/email-templates/configs/HIGH_CONSUMED_NETWORK_BANDWIDTH.json new file mode 100644 index 0000000000..2c052b3bcc --- /dev/null +++ b/deploy/contents/install/email-templates/configs/HIGH_CONSUMED_NETWORK_BANDWIDTH.json @@ -0,0 +1,8 @@ +{ + "keepInformedAdmins":true, + "keepInformedOwner":true, + "enabled":true, + "resendDelay": -1, + "threshold": -1, + "subject":"Job #$templateParameters[\"id\"] is high network consuming for a long time" +} \ No newline at end of file diff --git a/deploy/contents/install/email-templates/contents/HIGH_CONSUMED_NETWORK_BANDWIDTH.html b/deploy/contents/install/email-templates/contents/HIGH_CONSUMED_NETWORK_BANDWIDTH.html new file mode 100644 index 0000000000..60453596f4 --- /dev/null +++ b/deploy/contents/install/email-templates/contents/HIGH_CONSUMED_NETWORK_BANDWIDTH.html @@ -0,0 +1,51 @@ + + + + + + + + + +

Dear user,

+

*** This is a system generated email, do not reply to this email ***

+

Looks like a job with ID ${CP_DOLLAR}templateParameters["id"], launched from ${CP_DOLLAR}templateParameters.get("owner") account, is high network consuming: network bandwidth ${CP_DOLLAR}numberTool.format("#0.00", ${CP_DOLLAR}templateParameters.get("bandwidth")) Kb/sec is over the limit ${CP_DOLLAR}templateParameters.get("bandwidthLimit") Kb/sec.

+
+

Job details:

+ + + + + + + #if( ${CP_DOLLAR}templateParameters.get("pipelineName") && ${CP_DOLLAR}templateParameters.get("pipelineName") != "pipeline" )#else#end + + + + + + + ${CP_DOLLAR}calendar.setTime(${CP_DOLLAR}dateFormat.parse(${CP_DOLLAR}templateParameters.get("startDate"))) + + + + + + + + + +
Run #${CP_DOLLAR}templateParameters.get("id")
Job/Tool${CP_DOLLAR}templateParameters.get("pipelineName")${CP_DOLLAR}templateParameters.get("dockerImage")
Running Time${CP_DOLLAR}templateParameters.get("runningTime") minutes
Start Date${CP_DOLLAR}calendar.getTime().toString()
Instance Type${CP_DOLLAR}templateParameters.get("instance").get("nodeType")
Instance Disk${CP_DOLLAR}templateParameters.get("instance").get("nodeDisk")
+
+

Best regards,

+

$CP_PREF_UI_PIPELINE_DEPLOYMENT_NAME Platform

+ + + \ No newline at end of file From 3e6645dd606466fb53dd7bbfad369447e88001c2 Mon Sep 17 00:00:00 2001 From: Oksana_Kolesnikova Date: Wed, 17 Jul 2024 10:51:04 +0300 Subject: [PATCH 5/8] Issue 3602 Pod nework bandwidth usage - fix remarks --- .../performancemonitoring/ResourceMonitoringManager.java | 3 ++- .../ResourceMonitoringManagerTest.java | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java index 7f9447f4e4..90c3d83979 100644 --- a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java +++ b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java @@ -117,6 +117,7 @@ static class ResourceMonitoringManagerCore { private static final double PERCENT = 100.0; private static final double ONE_THOUSANDTH = 0.001; private static final long ONE = 1L; + private static final int BYTES_IN_KB = 1024; private final PipelineRunManager pipelineRunManager; private final RunStatusManager runStatusManager; @@ -500,7 +501,7 @@ private void processHighNetworkConsumingRuns(Map running, long rxMax = Collections.max(rxBytes); long txMax = Collections.max(txBytes); processHighNetworkConsumingRun(run, actionTimeout, action, runsToNotify, - runsToUpdateNotificationTime, (double) (Math.max(rxMax, txMax) / 1024), + runsToUpdateNotificationTime, (double) (Math.max(rxMax, txMax) / BYTES_IN_KB), runsToUpdateTags); } else if (run.getLastNetworkConsumptionNotificationTime() != null) { // No action is longer needed, clear timeout diff --git a/api/src/test/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManagerTest.java b/api/src/test/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManagerTest.java index a92f1e3d50..4daa812bb0 100644 --- a/api/src/test/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManagerTest.java +++ b/api/src/test/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManagerTest.java @@ -123,6 +123,8 @@ public class ResourceMonitoringManagerTest { private static final int LONG_PAUSED_ACTION_TIMEOUT = 30; public static final long PAUSED_RUN_ID = 234L; public static final int ONE_HOUR = 60; + private static final int RX_BYTES = 101000; + private static final int TX_BYTES = 202000; @InjectMocks private ResourceMonitoringManager resourceMonitoringManager; @@ -769,8 +771,8 @@ private MonitoringStats createMonitoringStats() { final MonitoringStats stats = new MonitoringStats(); final MonitoringStats.NetworkUsage networkUsage = new MonitoringStats.NetworkUsage(); final MonitoringStats.NetworkUsage.NetworkStats value = new MonitoringStats.NetworkUsage.NetworkStats(); - value.setRxBytes(101000); - value.setTxBytes(202000); + value.setRxBytes(RX_BYTES); + value.setTxBytes(TX_BYTES); networkUsage.setStatsByInterface(Collections.singletonMap("interface", value)); stats.setNetworkUsage(networkUsage); return stats; From 133d7c2e10542ae2a37bc2aee964e4d319c8c4e9 Mon Sep 17 00:00:00 2001 From: Oksana_Kolesnikova Date: Thu, 18 Jul 2024 10:19:36 +0300 Subject: [PATCH 6/8] Issue 3591 Pod nework bandwidth usage --- .../AbstractMetricRequester.java | 2 + .../metricrequester/NetworkRequester.java | 34 ++++++++++++-- .../ESMonitoringManager.java | 14 +----- .../ResourceMonitoringManager.java | 47 ++++++------------- .../notification/NotificationManager.java | 4 +- .../NotificationParameterManager.java | 5 +- .../manager/preference/SystemPreferences.java | 13 +++-- .../ResourceMonitoringManagerTest.java | 30 ++---------- 8 files changed, 61 insertions(+), 88 deletions(-) diff --git a/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/AbstractMetricRequester.java b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/AbstractMetricRequester.java index 1e513e5bfe..a336ef1722 100644 --- a/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/AbstractMetricRequester.java +++ b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/AbstractMetricRequester.java @@ -133,6 +133,8 @@ public static MetricRequester getRequester(final ELKUsageMetric metric, return new MemoryRequester(client); case FS: return new FSRequester(client); + case NETWORK: + return new NetworkRequester(client); default: throw new IllegalArgumentException("Metric type: " + metric.getName() + " isn't supported!"); } diff --git a/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/NetworkRequester.java b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/NetworkRequester.java index 9aeb96443c..7312808c8b 100644 --- a/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/NetworkRequester.java +++ b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/NetworkRequester.java @@ -18,15 +18,19 @@ import com.epam.pipeline.entity.cluster.monitoring.ELKUsageMetric; import com.epam.pipeline.entity.cluster.monitoring.MonitoringStats; -import org.apache.commons.lang.NotImplementedException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.builder.SearchSourceBuilder; import java.time.Duration; import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -49,13 +53,28 @@ protected ELKUsageMetric metric() { @Override public SearchRequest buildRequest(final Collection resourceIds, final LocalDateTime from, final LocalDateTime to, final Map additional) { - throw new NotImplementedException("Method NetworkRequester::buildRequest is not implemented yet."); + return request(from, to, + new SearchSourceBuilder() + .query(QueryBuilders.boolQuery() + .filter(QueryBuilders.termsQuery(path(FIELD_METRICS_TAGS, FIELD_NODENAME_RAW), + resourceIds)) + .filter(QueryBuilders.termQuery(path(FIELD_METRICS_TAGS, FIELD_TYPE), NODE)) + .filter(QueryBuilders.rangeQuery(metric().getTimestamp()) + .from(from.toInstant(ZoneOffset.UTC).toEpochMilli()) + .to(to.toInstant(ZoneOffset.UTC).toEpochMilli()))) + .size(0) + .aggregation(ordered(AggregationBuilders.terms(AGGREGATION_NODE_NAME)) + .field(path(FIELD_METRICS_TAGS, FIELD_NODENAME_RAW)) + .size(resourceIds.size()) + .subAggregation(average(AVG_AGGREGATION + RX_RATE, RX_RATE)) + .subAggregation(average(AVG_AGGREGATION + TX_RATE, TX_RATE)))); } @Override public Map parseResponse(final SearchResponse response) { - throw new NotImplementedException("Method NetworkRequester::buildRequest is not implemented yet."); - } + return ((Terms) response.getAggregations().get(AGGREGATION_NODE_NAME)).getBuckets().stream() + .collect(Collectors.toMap(MultiBucketsAggregation.Bucket::getKeyAsString, this::getNetworkUsage)); + } @Override protected SearchRequest buildStatsRequest(final String nodeName, final LocalDateTime from, final LocalDateTime to, @@ -101,4 +120,11 @@ private MonitoringStats toMonitoringStats(final MultiBucketsAggregation.Bucket b monitoringStats.setNetworkUsage(networkUsage); return monitoringStats; } + + private Double getNetworkUsage(final MultiBucketsAggregation.Bucket bucket) { + final List aggregations = aggregations(bucket); + final Optional rxRate = doubleValue(aggregations, AVG_AGGREGATION + RX_RATE); + final Optional txRate = doubleValue(aggregations, AVG_AGGREGATION + TX_RATE); + return Math.max(rxRate.orElse(0.0), txRate.orElse(0.0)); + } } diff --git a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ESMonitoringManager.java b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ESMonitoringManager.java index d9b85e2019..43d53941f0 100644 --- a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ESMonitoringManager.java +++ b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ESMonitoringManager.java @@ -161,13 +161,7 @@ private LocalDateTime fallbackMonitoringStart() { private List getStats(final String nodeName, final LocalDateTime start, final LocalDateTime end, final Duration interval) { - return getStats(MONITORING_METRICS, nodeName, start, end, interval); - } - - private List getStats(final ELKUsageMetric[] monitoringMetrics, final String nodeName, - final LocalDateTime start, final LocalDateTime end, - final Duration interval) { - return Stream.of(monitoringMetrics) + return Stream.of(MONITORING_METRICS) .map(it -> AbstractMetricRequester.getStatsRequester(it, client)) .map(it -> it.requestStats(nodeName, start, end, interval)) .flatMap(List::stream) @@ -185,12 +179,6 @@ private List getStats(final ELKUsageMetric[] monitoringMetrics, .collect(Collectors.toList()); } - public List getStats(final ELKUsageMetric[] monitoringMetrics, final String nodeName, - final LocalDateTime start, final LocalDateTime end) { - final Duration interval = interval(start, end); - return getStats(monitoringMetrics, nodeName, start, end, interval); - } - private Duration interval(final LocalDateTime start, final LocalDateTime end) { final Duration requested = Duration.between(start, end).dividedBy(Math.max(1, numberOfIntervals() - 1)); final Duration minimal = minimalDuration(); diff --git a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java index 90c3d83979..340d158884 100644 --- a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java +++ b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java @@ -24,17 +24,15 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; +import java.util.Set; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Set; -import java.util.Collection; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import com.epam.pipeline.entity.cluster.monitoring.ELKUsageMetric; -import com.epam.pipeline.entity.cluster.monitoring.MonitoringStats; import com.epam.pipeline.entity.monitoring.NetworkConsumingRunAction; import com.epam.pipeline.entity.monitoring.LongPausedRunAction; import com.epam.pipeline.entity.pipeline.StopServerlessRun; @@ -117,7 +115,6 @@ static class ResourceMonitoringManagerCore { private static final double PERCENT = 100.0; private static final double ONE_THOUSANDTH = 0.001; private static final long ONE = 1L; - private static final int BYTES_IN_KB = 1024; private final PipelineRunManager pipelineRunManager; private final RunStatusManager runStatusManager; @@ -128,7 +125,6 @@ static class ResourceMonitoringManagerCore { private final PreferenceManager preferenceManager; private final StopServerlessRunManager stopServerlessRunManager; private final InstanceOfferManager instanceOfferManager; - private final ESMonitoringManager monitoringManager; @Autowired ResourceMonitoringManagerCore(final PipelineRunManager pipelineRunManager, @@ -139,8 +135,7 @@ static class ResourceMonitoringManagerCore { final PreferenceManager preferenceManager, final StopServerlessRunManager stopServerlessRunManager, final InstanceOfferManager instanceOfferManager, - final RunStatusManager runStatusManager, - final ESMonitoringManager monitoringManager) { + final RunStatusManager runStatusManager) { this.pipelineRunManager = pipelineRunManager; this.pipelineRunDockerOperationManager = pipelineRunDockerOperationManager; this.messageHelper = messageHelper; @@ -150,7 +145,6 @@ static class ResourceMonitoringManagerCore { this.stopServerlessRunManager = stopServerlessRunManager; this.instanceOfferManager = instanceOfferManager; this.runStatusManager = runStatusManager; - this.monitoringManager = monitoringManager; } @Scheduled(cron = "0 0 0 ? * *") @@ -436,12 +430,14 @@ private void processHighNetworkConsumingRuns(final List runs) { "NETWORK", running.size(), String.join(", ", running.keySet()))); final LocalDateTime now = DateUtils.nowUTC(); - final Map> networkMetrics = running.keySet().stream() - .collect(Collectors.toMap(nodeName -> nodeName, nodeName -> - monitoringManager.getStats(new ELKUsageMetric[]{ELKUsageMetric.NETWORK}, - nodeName, now.minusMinutes(bandwidthLimitTimeout + ONE), now))); + final Map networkMetrics = monitoringDao.loadMetrics(ELKUsageMetric.NETWORK, + running.keySet(), now.minusMinutes(bandwidthLimitTimeout + ONE), now); + log.debug(messageHelper.getMessage(MessageConstants.DEBUG_NETWORK_RUN_METRICS_RECEIVED, + networkMetrics.entrySet().stream().map(e -> e.getKey() + ":" + e.getValue()) + .collect(Collectors.joining(", "))) + ); - final long bandwidthLimit = preferenceManager.getPreference( + final double bandwidthLimit = preferenceManager.getPreference( SystemPreferences.SYSTEM_POD_BANDWIDTH_LIMIT); final int actionTimeout = preferenceManager.getPreference( SystemPreferences.SYSTEM_POD_BANDWIDTH_ACTION_BACKOFF_PERIOD); @@ -475,34 +471,19 @@ private void processHighNetworkConsumingRun(PipelineRun run, int actionTimeout, } private void processHighNetworkConsumingRuns(Map running, - Map> networkMetrics, - long bandwidthLimit, + Map networkMetrics, + double bandwidthLimit, int actionTimeout, NetworkConsumingRunAction action) { final List runsToUpdateNotificationTime = new ArrayList<>(running.size()); final List> runsToNotify = new ArrayList<>(running.size()); final List runsToUpdateTags = new ArrayList<>(running.size()); for (Map.Entry entry : running.entrySet()) { PipelineRun run = entry.getValue(); - List metric = networkMetrics.get(entry.getKey()); + Double metric = networkMetrics.get(entry.getKey()); if (metric != null) { - List rxBytes = metric.stream() - .map(m -> m.getNetworkUsage().getStatsByInterface().values().stream() - .map(MonitoringStats.NetworkUsage.NetworkStats::getRxBytes) - .collect(Collectors.toList())) - .flatMap(Collection::stream).collect(Collectors.toList()); - boolean rxExceeds = rxBytes.stream().allMatch(v -> v >= bandwidthLimit); - List txBytes = metric.stream() - .map(m -> m.getNetworkUsage().getStatsByInterface().values().stream() - .map(MonitoringStats.NetworkUsage.NetworkStats::getTxBytes) - .collect(Collectors.toList())) - .flatMap(Collection::stream).collect(Collectors.toList()); - boolean txExceeds = txBytes.stream().allMatch(v -> v >= bandwidthLimit); - if (rxExceeds || txExceeds) { - long rxMax = Collections.max(rxBytes); - long txMax = Collections.max(txBytes); + if (metric >= bandwidthLimit) { processHighNetworkConsumingRun(run, actionTimeout, action, runsToNotify, - runsToUpdateNotificationTime, (double) (Math.max(rxMax, txMax) / BYTES_IN_KB), - runsToUpdateTags); + runsToUpdateNotificationTime, metric, runsToUpdateTags); } else if (run.getLastNetworkConsumptionNotificationTime() != null) { // No action is longer needed, clear timeout log.debug(messageHelper.getMessage(MessageConstants.DEBUG_RUN_NOT_NETWORK_CONSUMING, diff --git a/api/src/main/java/com/epam/pipeline/manager/notification/NotificationManager.java b/api/src/main/java/com/epam/pipeline/manager/notification/NotificationManager.java index 4de40c28b2..ea90dee49b 100644 --- a/api/src/main/java/com/epam/pipeline/manager/notification/NotificationManager.java +++ b/api/src/main/java/com/epam/pipeline/manager/notification/NotificationManager.java @@ -85,7 +85,7 @@ @Service @Slf4j -public class NotificationManager implements NotificationService { // TODO: rewrite with Strategy pattern? +public class NotificationManager implements NotificationService { private static final Pattern MENTION_PATTERN = Pattern.compile("@([^ ]*\\b)"); @@ -374,7 +374,7 @@ public void notifyHighNetworkConsumingRuns(final List> .collect(Collectors.toList()); final List messages = filtered.stream() .map(pair -> buildMessageForHighNetworkConsumingRun(settings, ccUserIds, pipelineOwners, pair.getLeft(), - pair.getRight(), bandwidthLimit / 1024, type)) + pair.getRight(), bandwidthLimit, type)) .collect(Collectors.toList()); saveNotifications(messages); diff --git a/api/src/main/java/com/epam/pipeline/manager/notification/NotificationParameterManager.java b/api/src/main/java/com/epam/pipeline/manager/notification/NotificationParameterManager.java index 61a738062a..ba5aa92da9 100644 --- a/api/src/main/java/com/epam/pipeline/manager/notification/NotificationParameterManager.java +++ b/api/src/main/java/com/epam/pipeline/manager/notification/NotificationParameterManager.java @@ -38,6 +38,7 @@ public class NotificationParameterManager { private static final double PERCENT = 100.0; + private static final int BYTES_IN_KB = 1024; private final JsonMapper jsonMapper; @@ -95,8 +96,8 @@ public Map buildHighNetworkConsumingRunParams(final Notification final Map parameters = build(type); parameters.putAll(buildEntities(NotificationEntityClass.RUN, run.getId())); parameters.putAll(PipelineRunMapper.map(run)); - parameters.put("bandwidth", bandwidth); - parameters.put("bandwidthLimit", bandwidthLimit); + parameters.put("bandwidth", bandwidth / BYTES_IN_KB); + parameters.put("bandwidthLimit", bandwidthLimit / BYTES_IN_KB); return parameters; } diff --git a/api/src/main/java/com/epam/pipeline/manager/preference/SystemPreferences.java b/api/src/main/java/com/epam/pipeline/manager/preference/SystemPreferences.java index efd5a39885..8cea33131e 100644 --- a/api/src/main/java/com/epam/pipeline/manager/preference/SystemPreferences.java +++ b/api/src/main/java/com/epam/pipeline/manager/preference/SystemPreferences.java @@ -1153,15 +1153,14 @@ public class SystemPreferences { public static final BooleanPreference SYSTEM_NOTIFICATIONS_ENABLE = new BooleanPreference( "system.notifications.enable", false, SYSTEM_GROUP, pass); - public static final LongPreference SYSTEM_POD_BANDWIDTH_LIMIT = - new LongPreference("system.pod.bandwidth.limit", 300000L, SYSTEM_GROUP, isGreaterThan(0)); + public static final DoublePreference SYSTEM_POD_BANDWIDTH_LIMIT = new DoublePreference( + "system.pod.bandwidth.limit", 300000.0, SYSTEM_GROUP, isGreaterThan(0)); - public static final IntPreference SYSTEM_MAX_POD_BANDWIDTH_LIMIT_TIMEOUT_MINUTES = - new IntPreference("system.max.pod.bandwidth.minutes", 30, SYSTEM_GROUP, isGreaterThan(0)); + public static final IntPreference SYSTEM_MAX_POD_BANDWIDTH_LIMIT_TIMEOUT_MINUTES = new IntPreference( + "system.max.pod.bandwidth.minutes", 30, SYSTEM_GROUP, isGreaterThan(0)); - - public static final IntPreference SYSTEM_POD_BANDWIDTH_ACTION_BACKOFF_PERIOD = - new IntPreference("system.pod.bandwidth.action.backoff.period", 30, SYSTEM_GROUP, + public static final IntPreference SYSTEM_POD_BANDWIDTH_ACTION_BACKOFF_PERIOD = new IntPreference( + "system.pod.bandwidth.action.backoff.period", 30, SYSTEM_GROUP, isGreaterThan(0)); public static final StringPreference SYSTEM_POD_BANDWIDTH_ACTION = new StringPreference( diff --git a/api/src/test/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManagerTest.java b/api/src/test/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManagerTest.java index 4daa812bb0..df2251e1f2 100644 --- a/api/src/test/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManagerTest.java +++ b/api/src/test/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManagerTest.java @@ -20,7 +20,6 @@ import com.epam.pipeline.dao.monitoring.MonitoringESDao; import com.epam.pipeline.entity.cluster.InstanceType; import com.epam.pipeline.entity.cluster.monitoring.ELKUsageMetric; -import com.epam.pipeline.entity.cluster.monitoring.MonitoringStats; import com.epam.pipeline.entity.monitoring.IdleRunAction; import com.epam.pipeline.entity.monitoring.LongPausedRunAction; import com.epam.pipeline.entity.monitoring.NetworkConsumingRunAction; @@ -73,8 +72,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; @@ -99,7 +96,7 @@ public class ResourceMonitoringManagerTest { private static final long TEST_AUTOSCALE_RUN_ID = 6; private static final int TEST_HIGH_CONSUMING_RUN_LOAD = 80; private static final double TEST_IDLE_ON_DEMAND_RUN_CPU_LOAD = 200.0; - private static final long TEST_POD_BANDWIDTH_LIMIT = 100000; + private static final double TEST_POD_BANDWIDTH_LIMIT = 300.0; private static final int TEST_POD_BANDWIDTH_ACTION_BACKOFF_PERIOD = 30; private static final Integer TEST_RESOURCE_MONITORING_DELAY = 111; private static final int TEST_MAX_IDLE_MONITORING_TIMEOUT = 30; @@ -123,8 +120,6 @@ public class ResourceMonitoringManagerTest { private static final int LONG_PAUSED_ACTION_TIMEOUT = 30; public static final long PAUSED_RUN_ID = 234L; public static final int ONE_HOUR = 60; - private static final int RX_BYTES = 101000; - private static final int TX_BYTES = 202000; @InjectMocks private ResourceMonitoringManager resourceMonitoringManager; @@ -151,8 +146,6 @@ public class ResourceMonitoringManagerTest { private PipelineRunDockerOperationManager pipelineRunDockerOperationManager; @Mock private RunStatusManager runStatusManager; - @Mock - private ESMonitoringManager monitoringManager; @Captor ArgumentCaptor> runsToUpdateCaptor; @@ -185,8 +178,7 @@ public void setUp() throws Exception { preferenceManager, stopServerlessRunManager, instanceOfferManager, - runStatusManager, - monitoringManager); + runStatusManager); resourceMonitoringManager = new ResourceMonitoringManager(core); Whitebox.setInternalState(resourceMonitoringManager, "authManager", authManager); Whitebox.setInternalState(resourceMonitoringManager, "preferenceManager", preferenceManager); @@ -223,7 +215,6 @@ public void setUp() throws Exception { .thenReturn(TEST_POD_BANDWIDTH_ACTION_BACKOFF_PERIOD); when(preferenceManager.getPreference(SystemPreferences.SYSTEM_POD_BANDWIDTH_ACTION)) .thenReturn(NetworkConsumingRunAction.NOTIFY.name()); - when(monitoringManager.getStats(any(), any(), any(), any())).thenReturn(createStatsList()); SecurityContext context = SecurityContextHolder.createEmptyContext(); UserContext userContext = new UserContext(1L, "admin"); @@ -371,7 +362,7 @@ public void testSkipProlongRun() { when(pipelineRunManager.loadPipelineRun(idleRunToProlong.getId())).thenReturn(idleRunToProlong); when(monitoringESDao.loadMetrics(eq(ELKUsageMetric.CPU), any(), any(LocalDateTime.class), any(LocalDateTime.class))) - .thenReturn(Collections.singletonMap(idleRunToProlong.getInstance().getNodeName(), + .thenReturn(Collections.singletonMap(idleRunToProlong.getInstance().getNodeName(), TEST_IDLE_ON_DEMAND_RUN_CPU_LOAD)); when(preferenceManager.getPreference(SystemPreferences.SYSTEM_IDLE_ACTION)) .thenReturn(IdleRunAction.NOTIFY.name()); @@ -762,19 +753,4 @@ private HashMap getMockedHighConsumingStats() { stats.put(okayRun.getInstance().getNodeName(), TEST_HIGH_CONSUMING_RUN_LOAD / PERCENTS - DELTA); return stats; } - - private List createStatsList() { - return IntStream.range(0, 2).mapToObj(i -> createMonitoringStats()).collect(Collectors.toList()); - } - - private MonitoringStats createMonitoringStats() { - final MonitoringStats stats = new MonitoringStats(); - final MonitoringStats.NetworkUsage networkUsage = new MonitoringStats.NetworkUsage(); - final MonitoringStats.NetworkUsage.NetworkStats value = new MonitoringStats.NetworkUsage.NetworkStats(); - value.setRxBytes(RX_BYTES); - value.setTxBytes(TX_BYTES); - networkUsage.setStatsByInterface(Collections.singletonMap("interface", value)); - stats.setNetworkUsage(networkUsage); - return stats; - } } From e7020696551d0b4da2973781354b3e9af0de48ce Mon Sep 17 00:00:00 2001 From: Oksana_Kolesnikova Date: Thu, 18 Jul 2024 10:25:22 +0300 Subject: [PATCH 7/8] Issue 3591 Pod nework bandwidth usage --- .../dao/monitoring/metricrequester/NetworkRequester.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/NetworkRequester.java b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/NetworkRequester.java index 7312808c8b..cc20bcbe3b 100644 --- a/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/NetworkRequester.java +++ b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/NetworkRequester.java @@ -74,7 +74,7 @@ public SearchRequest buildRequest(final Collection resourceIds, final Lo public Map parseResponse(final SearchResponse response) { return ((Terms) response.getAggregations().get(AGGREGATION_NODE_NAME)).getBuckets().stream() .collect(Collectors.toMap(MultiBucketsAggregation.Bucket::getKeyAsString, this::getNetworkUsage)); - } + } @Override protected SearchRequest buildStatsRequest(final String nodeName, final LocalDateTime from, final LocalDateTime to, From a016ffbff16094cb192b5be32b60a62a45c79e16 Mon Sep 17 00:00:00 2001 From: Pavel Silin Date: Thu, 18 Jul 2024 14:46:49 +0200 Subject: [PATCH 8/8] cleanup --- .../performancemonitoring/ResourceMonitoringManager.java | 2 +- .../epam/pipeline/manager/notification/NotificationManager.java | 2 +- ..._added_last_network_consumption_notification_time_column.sql | 2 +- .../configs/HIGH_CONSUMED_NETWORK_BANDWIDTH.json | 2 +- .../contents/HIGH_CONSUMED_NETWORK_BANDWIDTH.html | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java index 340d158884..b54bbb0af1 100644 --- a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java +++ b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ResourceMonitoringManager.java @@ -87,7 +87,7 @@ public class ResourceMonitoringManager extends AbstractSchedulingManager { public static final String UTILIZATION_LEVEL_LOW = "IDLE"; - public static final String NETWORK_CONSUMING_LEVEL_HIGH = "NETWORK_CONSUMING"; + public static final String NETWORK_CONSUMING_LEVEL_HIGH = "NETWORK_PRESSURE"; public static final String UTILIZATION_LEVEL_HIGH = "PRESSURE"; public static final String TRUE_VALUE_STRING = "true"; diff --git a/api/src/main/java/com/epam/pipeline/manager/notification/NotificationManager.java b/api/src/main/java/com/epam/pipeline/manager/notification/NotificationManager.java index ea90dee49b..1614d82ab1 100644 --- a/api/src/main/java/com/epam/pipeline/manager/notification/NotificationManager.java +++ b/api/src/main/java/com/epam/pipeline/manager/notification/NotificationManager.java @@ -85,7 +85,7 @@ @Service @Slf4j -public class NotificationManager implements NotificationService { +public class NotificationManager implements NotificationService { // TODO: rewrite with Strategy pattern? private static final Pattern MENTION_PATTERN = Pattern.compile("@([^ ]*\\b)"); diff --git a/api/src/main/resources/db/migration/v2024.07.17_14.00__issue_3602_added_last_network_consumption_notification_time_column.sql b/api/src/main/resources/db/migration/v2024.07.17_14.00__issue_3602_added_last_network_consumption_notification_time_column.sql index 43ab6b46c9..046a57e843 100644 --- a/api/src/main/resources/db/migration/v2024.07.17_14.00__issue_3602_added_last_network_consumption_notification_time_column.sql +++ b/api/src/main/resources/db/migration/v2024.07.17_14.00__issue_3602_added_last_network_consumption_notification_time_column.sql @@ -1,2 +1,2 @@ -- a field to record last time the notification on high network consuming run was issued -ALTER TABLE pipeline.pipeline_run ADD COLUMN last_network_consumption_notification_time TIMESTAMP WITH TIME ZONE; \ No newline at end of file +ALTER TABLE pipeline.pipeline_run ADD COLUMN last_network_consumption_notification_time TIMESTAMP WITH TIME ZONE; diff --git a/deploy/contents/install/email-templates/configs/HIGH_CONSUMED_NETWORK_BANDWIDTH.json b/deploy/contents/install/email-templates/configs/HIGH_CONSUMED_NETWORK_BANDWIDTH.json index 2c052b3bcc..080aa9e44f 100644 --- a/deploy/contents/install/email-templates/configs/HIGH_CONSUMED_NETWORK_BANDWIDTH.json +++ b/deploy/contents/install/email-templates/configs/HIGH_CONSUMED_NETWORK_BANDWIDTH.json @@ -5,4 +5,4 @@ "resendDelay": -1, "threshold": -1, "subject":"Job #$templateParameters[\"id\"] is high network consuming for a long time" -} \ No newline at end of file +} diff --git a/deploy/contents/install/email-templates/contents/HIGH_CONSUMED_NETWORK_BANDWIDTH.html b/deploy/contents/install/email-templates/contents/HIGH_CONSUMED_NETWORK_BANDWIDTH.html index 60453596f4..5a0e3adbb2 100644 --- a/deploy/contents/install/email-templates/contents/HIGH_CONSUMED_NETWORK_BANDWIDTH.html +++ b/deploy/contents/install/email-templates/contents/HIGH_CONSUMED_NETWORK_BANDWIDTH.html @@ -48,4 +48,4 @@

$CP_PREF_UI_PIPELINE_DEPLOYMENT_NAME Platform

- \ No newline at end of file +