Skip to content

Commit

Permalink
Issue #3602 fix for network tag removal and notification repeat (#3658)
Browse files Browse the repository at this point in the history
  • Loading branch information
SilinPavel authored Aug 27, 2024
1 parent 584b594 commit f301e19
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ public final class MessageConstants {
public static final String DEBUG_RUN_HAS_NOT_NODE_NAME = "debug.run.has.not.node.name";
public static final String DEBUG_MEMORY_METRICS = "debug.memory.metrics.received";
public static final String INFO_RUN_HIGH_NETWORK_CONSUMPTION_NOTIFY = "info.run.high.network.consumption.notify";
public static final String INFO_RUN_HIGH_NETWORK_CONSUMPTION_ACTION = "info.run.high.network.consumption.action";
public static final String DEBUG_NETWORK_RUN_METRICS_RECEIVED = "debug.network.run.metrics.received";
public static final String DEBUG_RUN_NOT_NETWORK_CONSUMING = "debug.run.not.network.consuming";
public static final String DEBUG_RUN_NOT_NETWORK_CONSUMING_DISABLED = "debug.run.network.consuming.disabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,25 +454,24 @@ private void processHighNetworkConsumingRuns(final List<PipelineRun> runs) {

private void processHighNetworkConsumingRun(PipelineRun run, int actionTimeout,
NetworkConsumingRunAction action,
List<Pair<PipelineRun, Double>> pipelinesToNotify,
List<Pair<PipelineRun, Double>> runsToNotify,
List<PipelineRun> runsToUpdateNotificationTime,
Double bandwidth, List<PipelineRun> runsToUpdateTags) {
if (shouldPerformActionOnNetworkConsumingRun(run, actionTimeout)) {
performActionOnNetworkConsumingRun(run, action, bandwidth, pipelinesToNotify,
runsToUpdateNotificationTime);
return;
}
if (Objects.isNull(run.getLastNetworkConsumptionNotificationTime())) {
run.addTag(NETWORK_CONSUMING_LEVEL_HIGH, TRUE_VALUE_STRING);
Optional.ofNullable(getTimestampTag(NETWORK_CONSUMING_LEVEL_HIGH))
.ifPresent(tag -> run.addTag(tag, DateUtils.nowUTCStr()));
runsToUpdateTags.add(run);
run.setLastNetworkConsumptionNotificationTime(DateUtils.nowUTC());
runsToUpdateNotificationTime.add(run);

log.info(messageHelper.getMessage(MessageConstants.INFO_RUN_HIGH_NETWORK_CONSUMPTION_NOTIFY,
run.getPodId(), bandwidth));

performHighNetworkConsumingNotify(run, bandwidth, runsToNotify, runsToUpdateNotificationTime);
} else if (shouldPerformActionOnNetworkConsumingRun(run, actionTimeout)) {
performActionOnNetworkConsumingRun(run, action, bandwidth, runsToNotify,
runsToUpdateNotificationTime);
}
pipelinesToNotify.add(new ImmutablePair<>(run, bandwidth));
log.info(messageHelper.getMessage(MessageConstants.INFO_RUN_HIGH_NETWORK_CONSUMPTION_NOTIFY,
run.getPodId(), bandwidth));

}

private void processHighNetworkConsumingRuns(Map<String, PipelineRun> running,
Expand Down Expand Up @@ -522,24 +521,27 @@ private boolean shouldPerformActionOnNetworkConsumingRun(final PipelineRun run,
private void performActionOnNetworkConsumingRun(final PipelineRun run,
final NetworkConsumingRunAction action,
final double bandwidth,
final List<Pair<PipelineRun, Double>> pipelinesToNotify,
final List<PipelineRun> runsToUpdate) {
log.info(messageHelper.getMessage(MessageConstants.INFO_RUN_IDLE_ACTION, run.getPodId(),
bandwidth, action));
final List<Pair<PipelineRun, Double>> runsToNotify,
final List<PipelineRun> runsToUpdateNotificationTime) {
log.info(messageHelper.getMessage(MessageConstants.INFO_RUN_HIGH_NETWORK_CONSUMPTION_ACTION,
run.getPodId(), bandwidth, action.name()));
switch (action) {
case LIMIT_BANDWIDTH:
// TODO
break;
case NOTIFY:
default:
performHighNetworkConsumingNotify(run, bandwidth, pipelinesToNotify);
performHighNetworkConsumingNotify(run, bandwidth, runsToNotify, runsToUpdateNotificationTime);
break;
}
runsToUpdate.add(run);
}

private void performHighNetworkConsumingNotify(PipelineRun run, double networkBandwidthLevel,
List<Pair<PipelineRun, Double>> pipelinesToNotify) {
private void performHighNetworkConsumingNotify(final PipelineRun run, final double networkBandwidthLevel,
final List<Pair<PipelineRun, Double>> pipelinesToNotify,
final List<PipelineRun> runsToUpdateNotificationTime) {
run.setLastNetworkConsumptionNotificationTime(DateUtils.nowUTC());
pipelinesToNotify.add(new ImmutablePair<>(run, networkBandwidthLevel));
runsToUpdateNotificationTime.add(run);
}

private void performStop(final PipelineRun run,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,11 @@ private void removeUtilizationLevelTags(final PipelineRun run) {
final String suffix = preferenceManager.getPreference(SystemPreferences.SYSTEM_RUN_TAG_DATE_SUFFIX);
Stream.of(ResourceMonitoringManager.UTILIZATION_LEVEL_LOW,
ResourceMonitoringManager.UTILIZATION_LEVEL_HIGH,
ResourceMonitoringManager.NETWORK_CONSUMING_LEVEL_HIGH,
ResourceMonitoringManager.UTILIZATION_LEVEL_LOW + suffix,
ResourceMonitoringManager.UTILIZATION_LEVEL_HIGH + suffix)
ResourceMonitoringManager.UTILIZATION_LEVEL_HIGH + suffix,
ResourceMonitoringManager.NETWORK_CONSUMING_LEVEL_HIGH + suffix,
PipelineRunManager.NETWORK_LIMIT + suffix)
.filter(run::hasTag)
.forEach(run::removeTag);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,6 @@ public void notifyHighNetworkConsumingRuns(final List<Pair<PipelineRun, Double>>
final Map<String, NotificationFilter> runParametersFilters = parseRunExcludeParams();

final List<Pair<PipelineRun, Double>> filtered = pipelineNetworkBandwidthPairs.stream()
.filter(pair -> shouldNotifyHighNetworkConsumingRun(pair.getLeft().getId(), type, settings))
.filter(pair -> noneMatchExcludedInstanceType(pair.getLeft(), instanceTypesToExclude))
.filter(pair -> !matchExcludeRunParameters(pair.getLeft(), runParametersFilters))
.collect(Collectors.toList());
Expand All @@ -377,13 +376,6 @@ public void notifyHighNetworkConsumingRuns(final List<Pair<PipelineRun, Double>>
pair.getRight(), bandwidthLimit, type))
.collect(Collectors.toList());
saveNotifications(messages);

if (NotificationType.HIGH_CONSUMED_NETWORK_BANDWIDTH.equals(type)) {
final List<Long> runIds = filtered.stream()
.map(pair -> pair.getLeft().getId()).collect(Collectors.toList());
monitoringNotificationDao.updateNotificationTimestamp(runIds,
NotificationType.HIGH_CONSUMED_NETWORK_BANDWIDTH);
}
}

private NotificationMessage buildMessageForHighNetworkConsumingRun(final NotificationSettings settings,
Expand Down Expand Up @@ -899,15 +891,6 @@ private boolean shouldNotifyIdleRun(final Long runId, final NotificationType not
return shouldNotify(runId, notificationSettings);
}

private boolean shouldNotifyHighNetworkConsumingRun(final Long runId,
final NotificationType notificationType,
final NotificationSettings notificationSettings) {
if (!NotificationType.HIGH_CONSUMED_NETWORK_BANDWIDTH.equals(notificationType)) {
return true;
}
return shouldNotify(runId, notificationSettings);
}

private Map<String, NotificationFilter> parseRunExcludeParams() {
final Map<String, NotificationFilter> excludeParams = preferenceManager.getPreference(
SystemPreferences.SYSTEM_NOTIFICATIONS_EXCLUDE_PARAMS);
Expand Down
1 change: 1 addition & 0 deletions api/src/main/resources/messages.properties
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ debug.run.not.idled=Run {0} has CPU usage rate: {1} considered as not idled, tag
debug.run.has.not.node.name=Pipeline with id: {0} has not node name.
debug.memory.metrics.received=Memory and disk metrics received ''{0}''
info.run.high.network.consumption.notify=Pipeline Run {0} is high network consuming: bandwidth: {1}, notification will be sent
info.run.high.network.consumption.action=Pipeline Run {0} is high network consuming: bandwidth: {1}, action: {2} will be performed
debug.network.run.metrics.received=Network metrics ''{0}''
debug.run.not.network.consuming=Run {0} has network bandwidth: {1} considered as not high consuming, tag will be cleared.
debug.run.network.consuming.disabled=SystemPreference system.pod.bandwidth.limit is set to 0, considering it disabled!
Expand Down

0 comments on commit f301e19

Please sign in to comment.