Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-31860] Ignore Event creation errors during cleanup #577

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public boolean scale(FlinkResourceContext<? extends AbstractFlinkResource<?, ?>>
registerResourceScalingMetrics(resource, ctx.getResourceMetricGroup());

var specAdjusted =
scalingExecutor.scaleResource(resource, autoScalerInfo, conf, evaluatedMetrics);
scalingExecutor.scaleResource(ctx, autoScalerInfo, conf, evaluatedMetrics);
autoScalerInfo.replaceInKubernetes(kubernetesClient);
return specAdjusted;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -65,7 +65,7 @@ public JobVertexScaler(EventRecorder eventRecorder) {
}

public int computeScaleTargetParallelism(
AbstractFlinkResource<?, ?> resource,
FlinkResourceContext<?> ctx,
Configuration conf,
JobVertexID vertex,
Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
Expand Down Expand Up @@ -112,7 +112,7 @@ public int computeScaleTargetParallelism(

if (newParallelism == currentParallelism
|| blockScalingBasedOnPastActions(
resource,
ctx,
vertex,
conf,
evaluatedMetrics,
Expand All @@ -129,7 +129,7 @@ public int computeScaleTargetParallelism(
}

private boolean blockScalingBasedOnPastActions(
AbstractFlinkResource<?, ?> resource,
FlinkResourceContext<?> ctx,
JobVertexID vertex,
Configuration conf,
Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
Expand All @@ -148,8 +148,7 @@ private boolean blockScalingBasedOnPastActions(

if (currentParallelism == lastSummary.getNewParallelism() && lastSummary.isScaledUp()) {
if (scaledUp) {
return detectIneffectiveScaleUp(
resource, vertex, conf, evaluatedMetrics, lastSummary);
return detectIneffectiveScaleUp(ctx, vertex, conf, evaluatedMetrics, lastSummary);
} else {
return detectImmediateScaleDownAfterScaleUp(vertex, conf, lastScalingTs);
}
Expand All @@ -172,7 +171,7 @@ private boolean detectImmediateScaleDownAfterScaleUp(
}

private boolean detectIneffectiveScaleUp(
AbstractFlinkResource<?, ?> resource,
FlinkResourceContext<?> ctx,
JobVertexID vertex,
Configuration conf,
Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
Expand Down Expand Up @@ -200,7 +199,7 @@ private boolean detectIneffectiveScaleUp(
var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex);

eventRecorder.triggerEvent(
resource,
ctx,
EventRecorder.Type.Normal,
EventRecorder.Reason.IneffectiveScaling,
EventRecorder.Component.Operator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
Expand Down Expand Up @@ -88,18 +89,18 @@ public ScalingExecutor(
}

public boolean scaleResource(
AbstractFlinkResource<?, ?> resource,
FlinkResourceContext<?> ctx,
AutoScalerInfo scalingInformation,
Configuration conf,
Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics) {

var resource = ctx.getResource();
if (!stabilizationPeriodPassed(resource, conf)) {
return false;
}

var scalingHistory = scalingInformation.getScalingHistory();
var scalingSummaries =
computeScalingSummary(resource, conf, evaluatedMetrics, scalingHistory);
var scalingSummaries = computeScalingSummary(ctx, conf, evaluatedMetrics, scalingHistory);

if (scalingSummaries.isEmpty()) {
LOG.info("All job vertices are currently running at their target parallelism.");
Expand All @@ -114,7 +115,7 @@ public boolean scaleResource(

var scalingReport = scalingReport(scalingSummaries, scalingEnabled);
eventRecorder.triggerEvent(
resource,
ctx,
EventRecorder.Type.Normal,
EventRecorder.Reason.ScalingReport,
EventRecorder.Component.Operator,
Expand Down Expand Up @@ -217,7 +218,7 @@ protected static boolean allVerticesWithinUtilizationTarget(
}

private Map<JobVertexID, ScalingSummary> computeScalingSummary(
AbstractFlinkResource<?, ?> resource,
FlinkResourceContext<?> ctx,
Configuration conf,
Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory) {
Expand All @@ -235,7 +236,7 @@ private Map<JobVertexID, ScalingSummary> computeScalingSummary(
(int) metrics.get(ScalingMetric.PARALLELISM).getCurrent();
var newParallelism =
jobVertexScaler.computeScaleTargetParallelism(
resource,
ctx,
conf,
v,
metrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentContext;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.utils.EventCollector;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.runtime.jobgraph.JobVertexID;
Expand Down Expand Up @@ -58,6 +60,8 @@ public class JobVertexScalerTest {

private FlinkDeployment flinkDep;

private FlinkResourceContext<?> ctx;

@BeforeEach
public void setup() {
flinkDep = TestUtils.buildApplicationCluster();
Expand All @@ -67,6 +71,7 @@ public void setup() {
conf = new Configuration();
conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
ctx = new FlinkDeploymentContext(flinkDep, null, null, null, null);
}

@Test
Expand All @@ -76,55 +81,55 @@ public void testParallelismScaling() {
assertEquals(
5,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));
ctx, conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));

conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
assertEquals(
8,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));
ctx, conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));

conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
assertEquals(
10,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, op, evaluated(10, 80, 100), Collections.emptySortedMap()));
ctx, conf, op, evaluated(10, 80, 100), Collections.emptySortedMap()));

conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
assertEquals(
8,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, op, evaluated(10, 60, 100), Collections.emptySortedMap()));
ctx, conf, op, evaluated(10, 60, 100), Collections.emptySortedMap()));

assertEquals(
8,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, op, evaluated(10, 59, 100), Collections.emptySortedMap()));
ctx, conf, op, evaluated(10, 59, 100), Collections.emptySortedMap()));

conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5);
assertEquals(
10,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, op, evaluated(2, 100, 40), Collections.emptySortedMap()));
ctx, conf, op, evaluated(2, 100, 40), Collections.emptySortedMap()));

conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
assertEquals(
4,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, op, evaluated(2, 100, 100), Collections.emptySortedMap()));
ctx, conf, op, evaluated(2, 100, 100), Collections.emptySortedMap()));

conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
assertEquals(
5,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
ctx, conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));

conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6);
assertEquals(
4,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
ctx, conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
}

@Test
Expand Down Expand Up @@ -168,7 +173,7 @@ public void testMinParallelismLimitIsUsed() {
assertEquals(
5,
vertexScaler.computeScaleTargetParallelism(
flinkDep,
ctx,
conf,
new JobVertexID(),
evaluated(10, 100, 500),
Expand All @@ -182,7 +187,7 @@ public void testMaxParallelismLimitIsUsed() {
assertEquals(
10,
vertexScaler.computeScaleTargetParallelism(
flinkDep,
ctx,
conf,
new JobVertexID(),
evaluated(10, 500, 100),
Expand All @@ -200,31 +205,27 @@ public void testScaleDownAfterScaleUpDetection() {
var evaluated = evaluated(5, 100, 50);
var history = new TreeMap<Instant, ScalingSummary>();
assertEquals(
10,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
10, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history));

history.put(clock.instant(), new ScalingSummary(5, 10, evaluated));

// Should not allow scale back down immediately
evaluated = evaluated(10, 50, 100);
assertEquals(
10,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
10, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history));

// Pass some time...
clock = Clock.offset(Clock.systemDefaultZone(), Duration.ofSeconds(61));
vertexScaler.setClock(clock);

assertEquals(
5,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
5, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history));
history.put(clock.instant(), new ScalingSummary(10, 5, evaluated));

// Allow immediate scale up
evaluated = evaluated(5, 100, 50);
assertEquals(
10,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
10, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history));
history.put(clock.instant(), new ScalingSummary(5, 10, evaluated));
}

Expand All @@ -237,61 +238,53 @@ public void testIneffectiveScalingDetection() {
var evaluated = evaluated(5, 100, 50);
var history = new TreeMap<Instant, ScalingSummary>();
assertEquals(
10,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
10, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history));
assertEquals(100, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
history.put(Instant.now(), new ScalingSummary(5, 10, evaluated));

// Allow to scale higher if scaling was effective (80%)
evaluated = evaluated(10, 180, 90);
assertEquals(
20,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
20, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history));
assertEquals(180, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
history.put(Instant.now(), new ScalingSummary(10, 20, evaluated));

// Detect ineffective scaling, less than 5% of target increase (instead of 90 -> 180, only
// 90 -> 94. Do not try to scale above 20
evaluated = evaluated(20, 180, 94);
assertEquals(
20,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
20, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history));
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));

// Still considered ineffective (less than <10%)
evaluated = evaluated(20, 180, 98);
assertEquals(
20,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
20, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history));
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));

// Allow scale up if current parallelism doesnt match last (user rescaled manually)
evaluated = evaluated(10, 180, 90);
assertEquals(
20,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
20, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history));

// Over 10%, effective
evaluated = evaluated(20, 180, 100);
assertEquals(
36,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
36, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history));
assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));

// Ineffective but detection is turned off
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, false);
evaluated = evaluated(20, 180, 90);
assertEquals(
40,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
40, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history));
assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true);

// Allow scale down even if ineffective
evaluated = evaluated(20, 45, 90);
assertEquals(
10,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
10, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history));
assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
}

Expand All @@ -306,7 +299,7 @@ public void testSendingIneffectiveScalingEvents() {
assertEquals(
10,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, jobVertexID, evaluated, history));
ctx, conf, jobVertexID, evaluated, history));
assertEquals(100, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
history.put(Instant.now(), new ScalingSummary(5, 10, evaluated));

Expand All @@ -315,7 +308,7 @@ public void testSendingIneffectiveScalingEvents() {
assertEquals(
20,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, jobVertexID, evaluated, history));
ctx, conf, jobVertexID, evaluated, history));
assertEquals(180, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
history.put(Instant.now(), new ScalingSummary(10, 20, evaluated));
assertEquals(0, eventCollector.events.size());
Expand All @@ -325,7 +318,7 @@ public void testSendingIneffectiveScalingEvents() {
assertEquals(
20,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, jobVertexID, evaluated, history));
ctx, conf, jobVertexID, evaluated, history));
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
assertEquals(1, eventCollector.events.size());
var event = eventCollector.events.poll();
Expand Down
Loading