Skip to content

Commit 32477e7

Browse files
PatrickRenAHeise
authored andcommitted
[hotfix][testutil] Return Optional in MetricListener getters
1 parent 0136489 commit 32477e7

File tree

5 files changed

+116
-103
lines changed

5 files changed

+116
-103
lines changed

flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,13 @@
5757
import java.nio.ByteBuffer;
5858
import java.time.Duration;
5959
import java.util.Comparator;
60+
import java.util.Optional;
6061
import java.util.PriorityQueue;
6162
import java.util.Properties;
6263
import java.util.stream.IntStream;
6364

6465
import static org.hamcrest.Matchers.greaterThan;
65-
import static org.junit.Assert.assertThrows;
66+
import static org.junit.jupiter.api.Assertions.assertTrue;
6667

6768
/** Tests for the standalone KafkaWriter. */
6869
public class KafkaWriterITCase {
@@ -115,7 +116,7 @@ public void setUp() {
115116
public void testRegisterMetrics(DeliveryGuarantee guarantee) throws Exception {
116117
try (final KafkaWriter<Integer> ignored =
117118
createWriterWithConfiguration(getKafkaClientConfiguration(), guarantee)) {
118-
metricListener.getGauge(KAFKA_METRIC_WITH_GROUP_NAME);
119+
assertTrue(metricListener.getGauge(KAFKA_METRIC_WITH_GROUP_NAME).isPresent());
119120
}
120121
}
121122

@@ -126,9 +127,8 @@ public void testNotRegisterMetrics(DeliveryGuarantee guarantee) throws Exception
126127
config.put("flink.disable-metrics", "true");
127128
try (final KafkaWriter<Integer> ignored =
128129
createWriterWithConfiguration(config, guarantee)) {
129-
assertThrows(
130-
IllegalArgumentException.class,
131-
() -> metricListener.getGauge(KAFKA_METRIC_WITH_GROUP_NAME));
130+
Assertions.assertFalse(
131+
metricListener.getGauge(KAFKA_METRIC_WITH_GROUP_NAME).isPresent());
132132
}
133133
}
134134

@@ -159,8 +159,10 @@ public void testCurrentSendTimeMetric() throws Exception {
159159
getKafkaClientConfiguration(),
160160
DeliveryGuarantee.AT_LEAST_ONCE,
161161
metricGroup)) {
162-
final Gauge<Long> currentSendTime = metricListener.getGauge("currentSendTime");
163-
Assertions.assertEquals(currentSendTime.getValue(), 0L);
162+
final Optional<Gauge<Long>> currentSendTime =
163+
metricListener.getGauge("currentSendTime");
164+
assertTrue(currentSendTime.isPresent());
165+
Assertions.assertEquals(currentSendTime.get().getValue(), 0L);
164166
IntStream.range(0, 100)
165167
.forEach(
166168
(run) -> {
@@ -174,7 +176,7 @@ public void testCurrentSendTimeMetric() throws Exception {
174176
throw new RuntimeException("Failed writing Kafka record.");
175177
}
176178
});
177-
MatcherAssert.assertThat(currentSendTime.getValue(), greaterThan(0L));
179+
MatcherAssert.assertThat(currentSendTime.get().getValue(), greaterThan(0L));
178180
}
179181
}
180182

flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetricsTest.java

Lines changed: 25 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,20 @@
1818

1919
package org.apache.flink.connector.kafka.source.metrics;
2020

21+
import org.apache.flink.metrics.Counter;
2122
import org.apache.flink.metrics.Gauge;
2223
import org.apache.flink.metrics.testutils.MetricListener;
2324

2425
import org.apache.kafka.common.TopicPartition;
2526
import org.junit.Test;
2627

28+
import java.util.Optional;
29+
2730
import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.PARTITION_GROUP;
2831
import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.TOPIC_GROUP;
2932
import static org.junit.Assert.assertEquals;
3033
import static org.junit.Assert.assertThrows;
34+
import static org.junit.Assert.assertTrue;
3135

3236
/** Unit test for {@link KafkaSourceReaderMetrics}. */
3337
public class KafkaSourceReaderMetricsTest {
@@ -82,23 +86,16 @@ public void testCommitOffsetTracking() {
8286
assertCommittedOffset(BAR_0, 18613L, metricListener);
8387
assertCommittedOffset(BAR_1, 15513L, metricListener);
8488

85-
assertEquals(
86-
0L,
87-
metricListener
88-
.getCounter(
89-
KafkaSourceReaderMetrics.KAFKA_SOURCE_READER_METRIC_GROUP,
90-
KafkaSourceReaderMetrics.COMMITS_SUCCEEDED_METRIC_COUNTER)
91-
.getCount());
89+
final Optional<Counter> commitsSucceededCounter =
90+
metricListener.getCounter(
91+
KafkaSourceReaderMetrics.KAFKA_SOURCE_READER_METRIC_GROUP,
92+
KafkaSourceReaderMetrics.COMMITS_SUCCEEDED_METRIC_COUNTER);
93+
assertTrue(commitsSucceededCounter.isPresent());
94+
assertEquals(0L, commitsSucceededCounter.get().getCount());
9295

9396
kafkaSourceReaderMetrics.recordSucceededCommit();
9497

95-
assertEquals(
96-
1L,
97-
metricListener
98-
.getCounter(
99-
KafkaSourceReaderMetrics.KAFKA_SOURCE_READER_METRIC_GROUP,
100-
KafkaSourceReaderMetrics.COMMITS_SUCCEEDED_METRIC_COUNTER)
101-
.getCount());
98+
assertEquals(1L, commitsSucceededCounter.get().getCount());
10299
}
103100

104101
@Test
@@ -120,47 +117,41 @@ public void testFailedCommit() {
120117
final KafkaSourceReaderMetrics kafkaSourceReaderMetrics =
121118
new KafkaSourceReaderMetrics(metricListener.getMetricGroup());
122119
kafkaSourceReaderMetrics.recordFailedCommit();
123-
assertEquals(
124-
1L,
125-
metricListener
126-
.getCounter(
127-
KafkaSourceReaderMetrics.KAFKA_SOURCE_READER_METRIC_GROUP,
128-
KafkaSourceReaderMetrics.COMMITS_FAILED_METRIC_COUNTER)
129-
.getCount());
120+
final Optional<Counter> commitsFailedCounter =
121+
metricListener.getCounter(
122+
KafkaSourceReaderMetrics.KAFKA_SOURCE_READER_METRIC_GROUP,
123+
KafkaSourceReaderMetrics.COMMITS_FAILED_METRIC_COUNTER);
124+
assertTrue(commitsFailedCounter.isPresent());
125+
assertEquals(1L, commitsFailedCounter.get().getCount());
130126
}
131127

132128
// ----------- Assertions --------------
133129

134130
private void assertCurrentOffset(
135131
TopicPartition tp, long expectedOffset, MetricListener metricListener) {
136-
assertGaugeValueEquals(
137-
expectedOffset,
132+
final Optional<Gauge<Long>> currentOffsetGauge =
138133
metricListener.getGauge(
139134
KafkaSourceReaderMetrics.KAFKA_SOURCE_READER_METRIC_GROUP,
140135
TOPIC_GROUP,
141136
tp.topic(),
142137
PARTITION_GROUP,
143138
String.valueOf(tp.partition()),
144-
KafkaSourceReaderMetrics.CURRENT_OFFSET_METRIC_GAUGE),
145-
Long.class);
139+
KafkaSourceReaderMetrics.CURRENT_OFFSET_METRIC_GAUGE);
140+
assertTrue(currentOffsetGauge.isPresent());
141+
assertEquals(expectedOffset, (long) currentOffsetGauge.get().getValue());
146142
}
147143

148144
private void assertCommittedOffset(
149145
TopicPartition tp, long expectedOffset, MetricListener metricListener) {
150-
assertGaugeValueEquals(
151-
expectedOffset,
146+
final Optional<Gauge<Long>> committedOffsetGauge =
152147
metricListener.getGauge(
153148
KafkaSourceReaderMetrics.KAFKA_SOURCE_READER_METRIC_GROUP,
154149
TOPIC_GROUP,
155150
tp.topic(),
156151
PARTITION_GROUP,
157152
String.valueOf(tp.partition()),
158-
KafkaSourceReaderMetrics.COMMITTED_OFFSET_METRIC_GAUGE),
159-
Long.class);
160-
}
161-
162-
private <T> void assertGaugeValueEquals(T expected, Gauge<?> gauge, Class<T> type) {
163-
final T actual = type.cast(gauge.getValue());
164-
assertEquals(expected, actual);
153+
KafkaSourceReaderMetrics.COMMITTED_OFFSET_METRIC_GAUGE);
154+
assertTrue(committedOffsetGauge.isPresent());
155+
assertEquals(expectedOffset, (long) committedOffsetGauge.get().getValue());
165156
}
166157
}

flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
3434
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
3535
import org.apache.flink.core.io.InputStatus;
36+
import org.apache.flink.metrics.Counter;
37+
import org.apache.flink.metrics.Gauge;
3638
import org.apache.flink.metrics.MetricGroup;
3739
import org.apache.flink.metrics.testutils.MetricListener;
3840
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
@@ -56,6 +58,7 @@
5658
import java.util.Collections;
5759
import java.util.List;
5860
import java.util.Map;
61+
import java.util.Optional;
5962
import java.util.function.Supplier;
6063

6164
import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.COMMITS_SUCCEEDED_METRIC_COUNTER;
@@ -294,13 +297,11 @@ public void testKafkaSourceMetrics() throws Exception {
294297
assertEquals(NUM_RECORDS_PER_SPLIT, getCommittedOffsetMetric(tp1, metricListener));
295298

296299
// Number of successful commits should be 1
297-
assertEquals(
298-
1L,
299-
metricListener
300-
.getCounter(
301-
KAFKA_SOURCE_READER_METRIC_GROUP,
302-
COMMITS_SUCCEEDED_METRIC_COUNTER)
303-
.getCount());
300+
final Optional<Counter> commitsSucceeded =
301+
metricListener.getCounter(
302+
KAFKA_SOURCE_READER_METRIC_GROUP, COMMITS_SUCCEEDED_METRIC_COUNTER);
303+
assertTrue(commitsSucceeded.isPresent());
304+
assertEquals(1L, commitsSucceeded.get().getCount());
304305
}
305306
}
306307

@@ -396,37 +397,37 @@ private void pollUntil(
396397
}
397398

398399
private long getKafkaConsumerMetric(String name, MetricListener listener) {
399-
return ((Double)
400-
listener.getGauge(
401-
KAFKA_SOURCE_READER_METRIC_GROUP,
402-
KAFKA_CONSUMER_METRIC_GROUP,
403-
name)
404-
.getValue())
405-
.longValue();
400+
final Optional<Gauge<Object>> kafkaConsumerGauge =
401+
listener.getGauge(
402+
KAFKA_SOURCE_READER_METRIC_GROUP, KAFKA_CONSUMER_METRIC_GROUP, name);
403+
assertTrue(kafkaConsumerGauge.isPresent());
404+
return ((Double) kafkaConsumerGauge.get().getValue()).longValue();
406405
}
407406

408407
private long getCurrentOffsetMetric(TopicPartition tp, MetricListener listener) {
409-
return (long)
408+
final Optional<Gauge<Object>> currentOffsetGauge =
410409
listener.getGauge(
411-
KAFKA_SOURCE_READER_METRIC_GROUP,
412-
TOPIC_GROUP,
413-
tp.topic(),
414-
PARTITION_GROUP,
415-
String.valueOf(tp.partition()),
416-
CURRENT_OFFSET_METRIC_GAUGE)
417-
.getValue();
410+
KAFKA_SOURCE_READER_METRIC_GROUP,
411+
TOPIC_GROUP,
412+
tp.topic(),
413+
PARTITION_GROUP,
414+
String.valueOf(tp.partition()),
415+
CURRENT_OFFSET_METRIC_GAUGE);
416+
assertTrue(currentOffsetGauge.isPresent());
417+
return (long) currentOffsetGauge.get().getValue();
418418
}
419419

420420
private long getCommittedOffsetMetric(TopicPartition tp, MetricListener listener) {
421-
return (long)
421+
final Optional<Gauge<Object>> committedOffsetGauge =
422422
listener.getGauge(
423-
KAFKA_SOURCE_READER_METRIC_GROUP,
424-
TOPIC_GROUP,
425-
tp.topic(),
426-
PARTITION_GROUP,
427-
String.valueOf(tp.partition()),
428-
COMMITTED_OFFSET_METRIC_GAUGE)
429-
.getValue();
423+
KAFKA_SOURCE_READER_METRIC_GROUP,
424+
TOPIC_GROUP,
425+
tp.topic(),
426+
PARTITION_GROUP,
427+
String.valueOf(tp.partition()),
428+
COMMITTED_OFFSET_METRIC_GAUGE);
429+
assertTrue(committedOffsetGauge.isPresent());
430+
return (long) committedOffsetGauge.get().getValue();
430431
}
431432

432433
// ---------------------

flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/metrics/testutils/MetricListener.java

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import java.util.HashMap;
3131
import java.util.Map;
32+
import java.util.Optional;
3233

3334
/**
3435
* A MetricListener listens metric and group registration under the provided root metric group, and
@@ -75,56 +76,61 @@ public MetricGroup getMetricGroup() {
7576
* metric group can be reached by identifier ("myGroup", "myMetric")
7677
*
7778
* @param identifier identifier relative to the root metric group
78-
* @return Registered metric
79+
* @return Optional registered metric
7980
*/
80-
public <T extends Metric> T getMetric(Class<T> metricType, String... identifier) {
81-
String actualIdentifier =
82-
ROOT_METRIC_GROUP_NAME + DELIMITER + String.join(DELIMITER, identifier);
83-
if (!metrics.containsKey(actualIdentifier)) {
84-
throw new IllegalArgumentException(
85-
String.format("Metric '%s' is not registered", actualIdentifier));
81+
public <T extends Metric> Optional<T> getMetric(Class<T> metricType, String... identifier) {
82+
if (!metrics.containsKey(getActualIdentifier(identifier))) {
83+
return Optional.empty();
8684
}
87-
return metricType.cast(metrics.get(actualIdentifier));
85+
return Optional.of(metricType.cast(metrics.get(getActualIdentifier(identifier))));
8886
}
8987

9088
/**
9189
* Get registered {@link Meter} with identifier relative to the root metric group.
9290
*
9391
* @param identifier identifier relative to the root metric group
94-
* @return Registered meter
92+
* @return Optional registered meter
9593
*/
96-
public Meter getMeter(String... identifier) {
94+
public Optional<Meter> getMeter(String... identifier) {
9795
return getMetric(Meter.class, identifier);
9896
}
9997

10098
/**
10199
* Get registered {@link Counter} with identifier relative to the root metric group.
102100
*
103101
* @param identifier identifier relative to the root metric group
104-
* @return Registered counter
102+
* @return Optional registered counter
105103
*/
106-
public Counter getCounter(String... identifier) {
104+
public Optional<Counter> getCounter(String... identifier) {
107105
return getMetric(Counter.class, identifier);
108106
}
109107

110108
/**
111109
* Get registered {@link Histogram} with identifier relative to the root metric group.
112110
*
113111
* @param identifier identifier relative to the root metric group
114-
* @return Registered histogram
112+
* @return Optional registered histogram
115113
*/
116-
public Histogram getHistogram(String... identifier) {
114+
public Optional<Histogram> getHistogram(String... identifier) {
117115
return getMetric(Histogram.class, identifier);
118116
}
119117

120118
/**
121119
* Get registered {@link Gauge} with identifier relative to the root metric group.
122120
*
123121
* @param identifier identifier relative to the root metric group
124-
* @return Registered gauge
122+
* @return Optional registered gauge
125123
*/
126124
@SuppressWarnings("unchecked")
127-
public <T> Gauge<T> getGauge(String... identifier) {
128-
return (Gauge<T>) getMetric(Gauge.class, identifier);
125+
public <T> Optional<Gauge<T>> getGauge(String... identifier) {
126+
if (!metrics.containsKey(getActualIdentifier(identifier))) {
127+
return Optional.empty();
128+
} else {
129+
return Optional.of((Gauge<T>) metrics.get(getActualIdentifier(identifier)));
130+
}
131+
}
132+
133+
private String getActualIdentifier(String... identifier) {
134+
return ROOT_METRIC_GROUP_NAME + DELIMITER + String.join(DELIMITER, identifier);
129135
}
130136
}

0 commit comments

Comments
 (0)