Skip to content

Commit 646ff2d

Browse files
gaborgsomogyiGyula Fora
authored and
Gyula Fora
committed
[FLINK-21108][web] Add custom netty HTTP request inbound/outbound handlers
Closes apache#16463
1 parent 543959e commit 646ff2d

File tree

40 files changed

+847
-159
lines changed

40 files changed

+847
-159
lines changed

flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,7 @@ private RestClusterClient(
207207
if (restClient != null) {
208208
this.restClient = restClient;
209209
} else {
210-
this.restClient =
211-
new RestClient(
212-
restClusterClientConfiguration.getRestClientConfiguration(),
213-
executorService);
210+
this.restClient = new RestClient(configuration, executorService);
214211
}
215212

216213
this.waitStrategy = checkNotNull(waitStrategy);

flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@
2626
import org.apache.flink.configuration.UnmodifiableConfiguration;
2727
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
2828
import org.apache.flink.runtime.rest.RestClient;
29-
import org.apache.flink.runtime.rest.RestClientConfiguration;
3029
import org.apache.flink.runtime.rest.RestServerEndpoint;
31-
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
3230
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
3331
import org.apache.flink.runtime.rest.handler.HandlerRequest;
3432
import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -94,8 +92,6 @@ public class RestClusterClientSavepointTriggerTest extends TestLogger {
9492
private static final GatewayRetriever<DispatcherGateway> mockGatewayRetriever =
9593
() -> CompletableFuture.completedFuture(mockRestfulGateway);
9694

97-
private static RestServerEndpointConfiguration restServerEndpointConfiguration;
98-
9995
private static ExecutorService executor;
10096

10197
private static final Configuration REST_CONFIG;
@@ -112,8 +108,6 @@ public class RestClusterClientSavepointTriggerTest extends TestLogger {
112108

113109
@BeforeClass
114110
public static void setUp() throws ConfigurationException {
115-
restServerEndpointConfiguration =
116-
RestServerEndpointConfiguration.fromConfiguration(REST_CONFIG);
117111
executor =
118112
Executors.newSingleThreadExecutor(
119113
new ExecutorThreadFactory(
@@ -273,7 +267,7 @@ private static RestServerEndpoint createRestServerEndpoint(
273267
final FunctionWithException<TriggerId, SavepointInfo, RestHandlerException>
274268
savepointHandlerLogic)
275269
throws Exception {
276-
return TestRestServerEndpoint.builder(restServerEndpointConfiguration)
270+
return TestRestServerEndpoint.builder(REST_CONFIG)
277271
.withHandler(new TestSavepointTriggerHandler(triggerHandlerLogic))
278272
.withHandler(new TestSavepointHandler(savepointHandlerLogic))
279273
.buildAndStart();
@@ -355,7 +349,7 @@ private RestClusterClient<StandaloneClusterId> createRestClusterClient(final int
355349
clientConfig.setInteger(RestOptions.PORT, port);
356350
return new RestClusterClient<>(
357351
clientConfig,
358-
new RestClient(RestClientConfiguration.fromConfiguration(REST_CONFIG), executor),
352+
new RestClient(REST_CONFIG, executor),
359353
StandaloneClusterId.getInstance(),
360354
(attempt) -> 0);
361355
}

flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@
4545
import org.apache.flink.runtime.rest.FileUpload;
4646
import org.apache.flink.runtime.rest.HttpMethodWrapper;
4747
import org.apache.flink.runtime.rest.RestClient;
48-
import org.apache.flink.runtime.rest.RestClientConfiguration;
49-
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
5048
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
5149
import org.apache.flink.runtime.rest.handler.HandlerRequest;
5250
import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -154,8 +152,6 @@ public class RestClusterClientTest extends TestLogger {
154152

155153
private GatewayRetriever<DispatcherGateway> mockGatewayRetriever;
156154

157-
private RestServerEndpointConfiguration restServerEndpointConfiguration;
158-
159155
private volatile FailHttpRequestPredicate failHttpRequest = FailHttpRequestPredicate.never();
160156

161157
private ExecutorService executor;
@@ -177,8 +173,6 @@ public class RestClusterClientTest extends TestLogger {
177173

178174
@Before
179175
public void setUp() throws Exception {
180-
restServerEndpointConfiguration =
181-
RestServerEndpointConfiguration.fromConfiguration(restConfig);
182176
mockGatewayRetriever = () -> CompletableFuture.completedFuture(mockRestfulGateway);
183177

184178
executor =
@@ -209,7 +203,7 @@ private RestClusterClient<StandaloneClusterId> createRestClusterClient(final int
209203

210204
@Nonnull
211205
private RestClient createRestClient() throws ConfigurationException {
212-
return new RestClient(RestClientConfiguration.fromConfiguration(restConfig), executor) {
206+
return new RestClient(restConfig, executor) {
213207
@Override
214208
public <
215209
M extends MessageHeaders<R, P, U>,
@@ -1048,8 +1042,7 @@ private TestHandler(MessageHeaders<R, P, M> headers) {
10481042

10491043
private TestRestServerEndpoint createRestServerEndpoint(
10501044
final AbstractRestHandler<?, ?, ?, ?>... abstractRestHandlers) throws Exception {
1051-
TestRestServerEndpoint.Builder builder =
1052-
TestRestServerEndpoint.builder(restServerEndpointConfiguration);
1045+
TestRestServerEndpoint.Builder builder = TestRestServerEndpoint.builder(restConfig);
10531046
Arrays.stream(abstractRestHandlers).forEach(builder::withHandler);
10541047

10551048
return builder.buildAndStart();

flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint;
3232
import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
3333
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
34+
import org.apache.flink.util.ConfigurationException;
3435

3536
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
3637
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.SerializableString;
@@ -99,7 +100,7 @@ public class RestAPIDocGenerator {
99100
* @param args args[0] contains the directory into which the generated files are placed
100101
* @throws IOException if any file operation failed
101102
*/
102-
public static void main(String[] args) throws IOException {
103+
public static void main(String[] args) throws IOException, ConfigurationException {
103104
String outputDirectory = args[0];
104105

105106
for (final RestAPIVersion apiVersion : RestAPIVersion.values()) {

flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.flink.api.common.JobID;
2222
import org.apache.flink.configuration.Configuration;
2323
import org.apache.flink.runtime.rest.RestClient;
24-
import org.apache.flink.runtime.rest.RestClientConfiguration;
2524
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
2625
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
2726
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
@@ -133,9 +132,7 @@ public ClusterController startCluster(int numTaskManagers) throws IOException {
133132
distribution.startFlinkCluster();
134133

135134
try (final RestClient restClient =
136-
new RestClient(
137-
RestClientConfiguration.fromConfiguration(new Configuration()),
138-
Executors.directExecutor())) {
135+
new RestClient(new Configuration(), Executors.directExecutor())) {
139136
for (int retryAttempt = 0; retryAttempt < 30; retryAttempt++) {
140137
final CompletableFuture<TaskManagersInfo> localhost =
141138
restClient.sendRequest(

flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.flink.configuration.Configuration;
2424
import org.apache.flink.runtime.clusterframework.types.ResourceID;
2525
import org.apache.flink.runtime.rest.RestClient;
26-
import org.apache.flink.runtime.rest.RestClientConfiguration;
2726
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
2827
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
2928
import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
@@ -94,9 +93,7 @@ public static void shutdownExecutor() {
9493
public void testReporter() throws Exception {
9594
try (ClusterController ignored = dist.startCluster(1)) {
9695
final RestClient restClient =
97-
new RestClient(
98-
RestClientConfiguration.fromConfiguration(new Configuration()),
99-
scheduledExecutorService);
96+
new RestClient(new Configuration(), scheduledExecutorService);
10097

10198
checkJobManagerMetricAvailability(restClient);
10299

flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,22 @@
1818

1919
package org.apache.flink.runtime.webmonitor.utils;
2020

21+
import org.apache.flink.annotation.VisibleForTesting;
2122
import org.apache.flink.configuration.Configuration;
2223
import org.apache.flink.configuration.JobManagerOptions;
24+
import org.apache.flink.runtime.io.network.netty.InboundChannelHandlerFactory;
2325
import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
2426
import org.apache.flink.runtime.rest.handler.router.Router;
2527
import org.apache.flink.runtime.rest.handler.router.RouterHandler;
2628
import org.apache.flink.runtime.webmonitor.HttpRequestHandler;
2729
import org.apache.flink.runtime.webmonitor.PipelineErrorHandler;
30+
import org.apache.flink.util.ConfigurationException;
2831
import org.apache.flink.util.Preconditions;
2932

3033
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
3134
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
3235
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
36+
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
3337
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
3438
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
3539
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
@@ -45,7 +49,14 @@
4549
import java.net.InetAddress;
4650
import java.net.InetSocketAddress;
4751
import java.net.UnknownHostException;
52+
import java.util.ArrayList;
53+
import java.util.Comparator;
4854
import java.util.HashMap;
55+
import java.util.Iterator;
56+
import java.util.List;
57+
import java.util.Map;
58+
import java.util.Optional;
59+
import java.util.ServiceLoader;
4960

5061
/** This classes encapsulates the boot-strapping of netty for the web-frontend. */
5162
public class WebFrontendBootstrap {
@@ -55,6 +66,8 @@ public class WebFrontendBootstrap {
5566
private final ServerBootstrap bootstrap;
5667
private final Channel serverChannel;
5768
private final String restAddress;
69+
private final Map<String, String> responseHeaders;
70+
@VisibleForTesting List<InboundChannelHandlerFactory> inboundChannelHandlerFactories;
5871

5972
public WebFrontendBootstrap(
6073
Router router,
@@ -69,15 +82,34 @@ public WebFrontendBootstrap(
6982
this.router = Preconditions.checkNotNull(router);
7083
this.log = Preconditions.checkNotNull(log);
7184
this.uploadDir = directory;
85+
this.responseHeaders = new HashMap<>();
86+
inboundChannelHandlerFactories = new ArrayList<>();
87+
ServiceLoader<InboundChannelHandlerFactory> loader =
88+
ServiceLoader.load(InboundChannelHandlerFactory.class);
89+
final Iterator<InboundChannelHandlerFactory> factories = loader.iterator();
90+
while (factories.hasNext()) {
91+
try {
92+
final InboundChannelHandlerFactory factory = factories.next();
93+
if (factory != null) {
94+
inboundChannelHandlerFactories.add(factory);
95+
log.info("Loaded channel inbound factory: {}", factory);
96+
}
97+
} catch (Throwable e) {
98+
log.error("Could not load channel inbound factory.", e);
99+
throw e;
100+
}
101+
}
102+
inboundChannelHandlerFactories.sort(
103+
Comparator.comparingInt(InboundChannelHandlerFactory::priority).reversed());
72104

73105
ChannelInitializer<SocketChannel> initializer =
74106
new ChannelInitializer<SocketChannel>() {
75107

76108
@Override
77-
protected void initChannel(SocketChannel ch) {
109+
protected void initChannel(SocketChannel ch) throws ConfigurationException {
78110
RouterHandler handler =
79111
new RouterHandler(
80-
WebFrontendBootstrap.this.router, new HashMap<>());
112+
WebFrontendBootstrap.this.router, responseHeaders);
81113

82114
// SSL should be the first handler in the pipeline
83115
if (serverSSLFactory != null) {
@@ -87,8 +119,18 @@ protected void initChannel(SocketChannel ch) {
87119
serverSSLFactory.createNettySSLHandler(ch.alloc()));
88120
}
89121

122+
ch.pipeline().addLast(new HttpServerCodec());
123+
124+
for (InboundChannelHandlerFactory factory :
125+
inboundChannelHandlerFactories) {
126+
Optional<ChannelHandler> channelHandler =
127+
factory.createHandler(config, responseHeaders);
128+
if (channelHandler.isPresent()) {
129+
ch.pipeline().addLast(channelHandler.get());
130+
}
131+
}
132+
90133
ch.pipeline()
91-
.addLast(new HttpServerCodec())
92134
.addLast(new ChunkedWriteHandler())
93135
.addLast(new HttpRequestHandler(uploadDir))
94136
.addLast(handler.getName(), handler)

flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint;
2323
import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
2424
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
25+
import org.apache.flink.util.ConfigurationException;
2526
import org.apache.flink.util.TestLogger;
2627

2728
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -70,7 +71,7 @@ public RestAPIStabilityTest(final RestAPIVersion apiVersion) {
7071
}
7172

7273
@Test
73-
public void testDispatcherRestAPIStability() throws IOException {
74+
public void testDispatcherRestAPIStability() throws IOException, ConfigurationException {
7475
final String versionedSnapshotFileName =
7576
String.format(SNAPSHOT_RESOURCE_PATTERN, apiVersion.getURLVersionPrefix());
7677

flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ private static void runJob() throws Exception {
424424
env.execute();
425425
}
426426

427-
static Tuple2<Integer, String> getFromHTTP(String url) throws Exception {
427+
public static Tuple2<Integer, String> getFromHTTP(String url) throws Exception {
428428
URL u = new URL(url);
429429
HttpURLConnection connection = (HttpURLConnection) u.openConnection();
430430
connection.setConnectTimeout(100000);
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.webmonitor.utils;
20+
21+
import org.apache.flink.api.java.tuple.Tuple2;
22+
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.runtime.io.network.netty.Prio0InboundChannelHandlerFactory;
24+
import org.apache.flink.runtime.io.network.netty.Prio1InboundChannelHandlerFactory;
25+
import org.apache.flink.runtime.rest.handler.router.Router;
26+
import org.apache.flink.runtime.webmonitor.history.HistoryServerStaticFileServerHandler;
27+
import org.apache.flink.runtime.webmonitor.history.HistoryServerTest;
28+
29+
import org.junit.Assert;
30+
import org.junit.Rule;
31+
import org.junit.Test;
32+
import org.junit.rules.TemporaryFolder;
33+
import org.slf4j.LoggerFactory;
34+
35+
import java.io.File;
36+
37+
import static org.junit.Assert.assertEquals;
38+
import static org.junit.Assert.assertTrue;
39+
40+
/** Tests for the WebFrontendBootstrap. */
41+
public class WebFrontendBootstrapTest {
42+
43+
@Rule public TemporaryFolder tmp = new TemporaryFolder();
44+
45+
@Test
46+
public void testHandlersMustBeLoaded() throws Exception {
47+
File webDir = tmp.newFolder("webDir");
48+
Configuration configuration = new Configuration();
49+
configuration.setString(
50+
Prio0InboundChannelHandlerFactory.REDIRECT_FROM_URL, "/nonExisting");
51+
configuration.setString(Prio0InboundChannelHandlerFactory.REDIRECT_TO_URL, "/index.html");
52+
Router router =
53+
new Router().addGet("/:*", new HistoryServerStaticFileServerHandler(webDir));
54+
WebFrontendBootstrap webUI =
55+
new WebFrontendBootstrap(
56+
router,
57+
LoggerFactory.getLogger(WebFrontendBootstrapTest.class),
58+
tmp.newFolder("uploadDir"),
59+
null,
60+
"localhost",
61+
0,
62+
configuration);
63+
64+
assertEquals(webUI.inboundChannelHandlerFactories.size(), 2);
65+
assertTrue(
66+
webUI.inboundChannelHandlerFactories.get(0)
67+
instanceof Prio1InboundChannelHandlerFactory);
68+
assertTrue(
69+
webUI.inboundChannelHandlerFactories.get(1)
70+
instanceof Prio0InboundChannelHandlerFactory);
71+
72+
int port = webUI.getServerPort();
73+
try {
74+
Tuple2<Integer, String> index =
75+
HistoryServerTest.getFromHTTP("http://localhost:" + port + "/index.html");
76+
Assert.assertEquals(index.f0.intValue(), 200);
77+
Assert.assertTrue(index.f1.contains("Apache Flink Web Dashboard"));
78+
79+
Tuple2<Integer, String> index2 =
80+
HistoryServerTest.getFromHTTP("http://localhost:" + port + "/nonExisting");
81+
Assert.assertEquals(index2.f0.intValue(), 200);
82+
Assert.assertEquals(index, index2);
83+
} finally {
84+
webUI.shutdown();
85+
}
86+
}
87+
}

0 commit comments

Comments
 (0)