Skip to content

Commit 7d5bf95

Browse files
committed
[hotfix] Clean up code warnings and remove some unnecessary bits + enforce junit5 assertions
1 parent 0cba89c commit 7d5bf95

File tree

42 files changed

+149
-215
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+149
-215
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public class FlinkOperator {
6969
private final FlinkServiceFactory flinkServiceFactory;
7070
private final FlinkConfigManager configManager;
7171
private final Set<FlinkResourceValidator> validators;
72-
@VisibleForTesting final Set<RegisteredController> registeredControllers = new HashSet<>();
72+
@VisibleForTesting final Set<RegisteredController<?>> registeredControllers = new HashSet<>();
7373
private final KubernetesOperatorMetricGroup metricGroup;
7474
private final Collection<FlinkResourceListener> listeners;
7575

@@ -149,9 +149,7 @@ void registerSessionJobController() {
149149
var reconciler =
150150
new SessionJobReconciler(
151151
client, flinkServiceFactory, configManager, eventRecorder, statusRecorder);
152-
var observer =
153-
new SessionJobObserver(
154-
flinkServiceFactory, configManager, statusRecorder, eventRecorder);
152+
var observer = new SessionJobObserver(flinkServiceFactory, configManager, eventRecorder);
155153
var controller =
156154
new FlinkSessionJobController(
157155
configManager, validators, reconciler, observer, statusRecorder);

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public Map<String, EventSource> prepareEventSources(
122122
EventSourceUtils.getFlinkDeploymentInformerEventSource(context));
123123
}
124124

125-
private boolean validateSessionJob(FlinkSessionJob sessionJob, Context context) {
125+
private boolean validateSessionJob(FlinkSessionJob sessionJob, Context<?> context) {
126126
for (FlinkResourceValidator validator : validators) {
127127
Optional<String> validationError =
128128
validator.validateSessionJob(

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkVersion.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,6 @@
2020

2121
import org.apache.flink.annotation.Experimental;
2222

23-
import java.util.LinkedHashSet;
24-
import java.util.Set;
25-
import java.util.stream.Collectors;
26-
import java.util.stream.Stream;
27-
2823
/** Enumeration for supported Flink versions. */
2924
@Experimental
3025
public enum FlinkVersion {
@@ -37,19 +32,6 @@ public boolean isNewerVersionThan(FlinkVersion otherVersion) {
3732
return this.ordinal() > otherVersion.ordinal();
3833
}
3934

40-
/**
41-
* Returns all versions within the defined range, inclusive both start and end.
42-
*
43-
* @param start Starting version.
44-
* @param end Last version.
45-
* @return Versions within the range.
46-
*/
47-
public static Set<FlinkVersion> rangeOf(FlinkVersion start, FlinkVersion end) {
48-
return Stream.of(FlinkVersion.values())
49-
.filter(v -> v.ordinal() >= start.ordinal() && v.ordinal() <= end.ordinal())
50-
.collect(Collectors.toCollection(LinkedHashSet::new));
51-
}
52-
5335
/**
5436
* Returns the current version.
5537
*

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,5 @@ public enum ReconciliationState {
2626
/** In the process of rolling back to the lastStableSpec. */
2727
ROLLING_BACK,
2828
/** Rolled back to the lastStableSpec. */
29-
ROLLED_BACK;
29+
ROLLED_BACK
3030
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/docs/CrdReferenceDoclet.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public boolean run(DocletEnvironment environment) {
9393
printStream.write(FileUtils.readFileToByteArray(new File(templateFile)));
9494

9595
MdPrinter se = new MdPrinter(printStream);
96-
printStream.println("");
96+
printStream.println();
9797
printStream.println("## Spec");
9898
var spec =
9999
sortedByName(
@@ -103,7 +103,7 @@ public boolean run(DocletEnvironment environment) {
103103
handleAbstractClass(spec, environment.getTypeUtils());
104104
se.show(spec);
105105

106-
printStream.println("");
106+
printStream.println();
107107
printStream.println("## Status");
108108
var status =
109109
sortedByName(
@@ -173,12 +173,12 @@ public Void scan(Element e, Integer depth) {
173173
ElementKind kind = e.getKind();
174174
switch (kind) {
175175
case CLASS:
176-
out.println("");
176+
out.println();
177177
out.println("### " + e.getSimpleName());
178178
out.println("**Class**: " + e);
179-
out.println("");
179+
out.println();
180180
out.println("**Description**: " + dcTree);
181-
out.println("");
181+
out.println();
182182
out.println("| Parameter | Type | Docs |");
183183
out.println("| ----------| ---- | ---- |");
184184
// if this is a child class, print it's parent's enclosed elements.
@@ -198,12 +198,12 @@ public Void scan(Element e, Integer depth) {
198198
+ " |");
199199
return null;
200200
case ENUM:
201-
out.println("");
201+
out.println();
202202
out.println("### " + e.getSimpleName());
203203
out.println("**Class**: " + e);
204-
out.println("");
204+
out.println();
205205
out.println("**Description**: " + dcTree);
206-
out.println("");
206+
out.println();
207207
out.println("| Value | Docs |");
208208
out.println("| ----- | ---- |");
209209
break;

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ public <T> T timeControllerExecution(ControllerExecution<T> execution) throws Ex
8484
histogram(execution, execution.successTypeName(result)).update(toSeconds(startTime));
8585
return result;
8686
} catch (Exception e) {
87-
var h = histogram(execution, "failed");
8887
histogram(execution, "failed").update(toSeconds(startTime));
8988
throw e;
9089
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,6 @@ private static MetricRegistryImpl createMetricRegistry(
9898
ReporterSetup.fromConfiguration(configuration, pluginManager));
9999
}
100100

101-
public static Histogram synchronizedHistogram(Histogram histogram) {
102-
return new SynchronizedHistogram(histogram);
103-
}
104-
105101
public static Counter synchronizedCounter(Counter counter) {
106102
return new SynchronizedCounter(counter);
107103
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java

Lines changed: 34 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -177,48 +177,46 @@ private synchronized void init(CR cr) {
177177
private Map<String, List<Histogram>> getTransitionHistograms(CR cr) {
178178
var histos = new HashMap<String, List<Histogram>>();
179179
transitionMetrics.forEach(
180-
(metricName, t) -> {
181-
histos.put(
182-
metricName,
183-
namespaceHistosEnabled
184-
? List.of(
185-
t.f0,
186-
t.f1.computeIfAbsent(
187-
cr.getMetadata().getNamespace(),
188-
ns ->
189-
createTransitionHistogram(
190-
metricName,
191-
operatorMetricGroup
192-
.createResourceNamespaceGroup(
193-
configManager
194-
.getDefaultConfig(),
195-
ns))))
196-
: List.of(t.f0));
197-
});
180+
(metricName, t) ->
181+
histos.put(
182+
metricName,
183+
namespaceHistosEnabled
184+
? List.of(
185+
t.f0,
186+
t.f1.computeIfAbsent(
187+
cr.getMetadata().getNamespace(),
188+
ns ->
189+
createTransitionHistogram(
190+
metricName,
191+
operatorMetricGroup
192+
.createResourceNamespaceGroup(
193+
configManager
194+
.getDefaultConfig(),
195+
ns))))
196+
: List.of(t.f0)));
198197
return histos;
199198
}
200199

201200
private Map<ResourceLifecycleState, List<Histogram>> getStateTimeHistograms(CR cr) {
202201
var histos = new HashMap<ResourceLifecycleState, List<Histogram>>();
203202
stateTimeMetrics.forEach(
204-
(state, t) -> {
205-
histos.put(
206-
state,
207-
namespaceHistosEnabled
208-
? List.of(
209-
t.f0,
210-
t.f1.computeIfAbsent(
211-
cr.getMetadata().getNamespace(),
212-
ns ->
213-
createStateTimeHistogram(
214-
state,
215-
operatorMetricGroup
216-
.createResourceNamespaceGroup(
217-
configManager
218-
.getDefaultConfig(),
219-
ns))))
220-
: List.of(t.f0));
221-
});
203+
(state, t) ->
204+
histos.put(
205+
state,
206+
namespaceHistosEnabled
207+
? List.of(
208+
t.f0,
209+
t.f1.computeIfAbsent(
210+
cr.getMetadata().getNamespace(),
211+
ns ->
212+
createStateTimeHistogram(
213+
state,
214+
operatorMetricGroup
215+
.createResourceNamespaceGroup(
216+
configManager
217+
.getDefaultConfig(),
218+
ns))))
219+
: List.of(t.f0)));
222220
return histos;
223221
}
224222

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,5 @@ public interface Observer<CR> {
2828
* @param cr the target custom resource
2929
* @param context the context with which the operation is executed
3030
*/
31-
void observe(CR cr, Context context);
31+
void observe(CR cr, Context<?> context);
3232
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.flink.kubernetes.operator.utils.ConfigOptionUtils;
3333
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
3434
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
35-
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
3635

3736
import org.slf4j.Logger;
3837
import org.slf4j.LoggerFactory;
@@ -49,22 +48,18 @@ public class SavepointObserver<
4948

5049
private final FlinkService flinkService;
5150
private final FlinkConfigManager configManager;
52-
private final StatusRecorder<CR, STATUS> statusRecorder;
5351
private final EventRecorder eventRecorder;
5452

5553
public SavepointObserver(
5654
FlinkService flinkService,
5755
FlinkConfigManager configManager,
58-
StatusRecorder<CR, STATUS> statusRecorder,
5956
EventRecorder eventRecorder) {
6057
this.flinkService = flinkService;
6158
this.configManager = configManager;
62-
this.statusRecorder = statusRecorder;
6359
this.eventRecorder = eventRecorder;
6460
}
6561

66-
public void observeSavepointStatus(
67-
AbstractFlinkResource<?, STATUS> resource, Configuration deployedConfig) {
62+
public void observeSavepointStatus(CR resource, Configuration deployedConfig) {
6863

6964
var jobStatus = resource.getStatus().getJobStatus();
7065
var savepointInfo = jobStatus.getSavepointInfo();

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/context/ApplicationObserverContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@
2626
public class ApplicationObserverContext {
2727

2828
public final FlinkDeployment flinkApp;
29-
public final Context context;
29+
public final Context<?> context;
3030
public final Configuration deployedConfig;
3131

3232
public ApplicationObserverContext(
33-
FlinkDeployment flinkApp, Context context, Configuration deployedConfig) {
33+
FlinkDeployment flinkApp, Context<?> context, Configuration deployedConfig) {
3434
this.flinkApp = flinkApp;
3535
this.context = context;
3636
this.deployedConfig = deployedConfig;

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
2828
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
2929
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
30-
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
3130
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
3231
import org.apache.flink.kubernetes.operator.observer.Observer;
3332
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
@@ -72,7 +71,7 @@ public AbstractDeploymentObserver(
7271
}
7372

7473
@Override
75-
public void observe(FlinkDeployment flinkApp, Context context) {
74+
public void observe(FlinkDeployment flinkApp, Context<?> context) {
7675
var status = flinkApp.getStatus();
7776
var reconciliationStatus = status.getReconciliationStatus();
7877

@@ -119,7 +118,7 @@ private void observeClusterInfo(FlinkDeployment flinkApp, Configuration configur
119118
}
120119

121120
protected void observeJmDeployment(
122-
FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
121+
FlinkDeployment flinkApp, Context<?> context, Configuration effectiveConfig) {
123122
FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
124123
JobManagerDeploymentStatus previousJmStatus =
125124
deploymentStatus.getJobManagerDeploymentStatus();
@@ -215,7 +214,7 @@ protected boolean isJmDeploymentReady(FlinkDeployment dep) {
215214

216215
protected void clearErrorsIfDeploymentIsHealthy(FlinkDeployment dep) {
217216
FlinkDeploymentStatus status = dep.getStatus();
218-
ReconciliationStatus reconciliationStatus = status.getReconciliationStatus();
217+
var reconciliationStatus = status.getReconciliationStatus();
219218
if (status.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.ERROR
220219
&& !JobStatus.FAILED.name().equals(dep.getStatus().getJobStatus().getState())
221220
&& reconciliationStatus.isLastReconciledSpecStable()) {
@@ -261,7 +260,7 @@ private void onMissingDeployment(FlinkDeployment deployment) {
261260
* @param flinkDep Flink resource to check.
262261
* @param context Context for reconciliation.
263262
*/
264-
private void checkIfAlreadyUpgraded(FlinkDeployment flinkDep, Context context) {
263+
private void checkIfAlreadyUpgraded(FlinkDeployment flinkDep, Context<?> context) {
265264
var status = flinkDep.getStatus();
266265
if (status.getReconciliationStatus().isFirstDeployment()) {
267266
return;
@@ -309,5 +308,5 @@ private void checkIfAlreadyUpgraded(FlinkDeployment flinkDep, Context context) {
309308
* @param deployedConfig config that is deployed on the Flink cluster
310309
*/
311310
protected abstract void observeFlinkCluster(
312-
FlinkDeployment flinkApp, Context context, Configuration deployedConfig);
311+
FlinkDeployment flinkApp, Context<?> context, Configuration deployedConfig);
313312
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.flink.kubernetes.operator.observer.context.ApplicationObserverContext;
2828
import org.apache.flink.kubernetes.operator.service.FlinkService;
2929
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
30-
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
3130
import org.apache.flink.runtime.client.JobStatusMessage;
3231

3332
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -44,11 +43,10 @@ public class ApplicationObserver extends AbstractDeploymentObserver {
4443
public ApplicationObserver(
4544
FlinkService flinkService,
4645
FlinkConfigManager configManager,
47-
StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder,
4846
EventRecorder eventRecorder) {
4947
super(flinkService, configManager, eventRecorder);
5048
this.savepointObserver =
51-
new SavepointObserver(flinkService, configManager, statusRecorder, eventRecorder);
49+
new SavepointObserver<>(flinkService, configManager, eventRecorder);
5250
this.jobStatusObserver =
5351
new JobStatusObserver<>(flinkService, eventRecorder) {
5452
@Override
@@ -69,7 +67,7 @@ protected Optional<JobStatusMessage> filterTargetJob(
6967

7068
@Override
7169
protected void observeFlinkCluster(
72-
FlinkDeployment flinkApp, Context context, Configuration deployedConfig) {
70+
FlinkDeployment flinkApp, Context<?> context, Configuration deployedConfig) {
7371

7472
boolean jobFound =
7573
jobStatusObserver.observe(

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ public Observer<FlinkDeployment> getOrCreate(FlinkDeployment flinkApp) {
6969
return new ApplicationObserver(
7070
flinkServiceFactory.getOrCreate(flinkApp),
7171
configManager,
72-
statusRecorder,
7372
eventRecorder);
7473
default:
7574
throw new UnsupportedOperationException(

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public SessionObserver(
4040

4141
@Override
4242
public void observeFlinkCluster(
43-
FlinkDeployment deployment, Context context, Configuration deployedConfig) {
43+
FlinkDeployment deployment, Context<?> context, Configuration deployedConfig) {
4444
// Check if session cluster can serve rest calls following our practice in JobObserver
4545
try {
4646
flinkService.listJobs(deployedConfig);

0 commit comments

Comments
 (0)