Skip to content

Commit

Permalink
Track if proxy protocol messages need buffering (#1669)
Browse files Browse the repository at this point in the history
* Track if proxy protocol messages need buffering

* pr feedback

* More pr feedback
  • Loading branch information
jguerra authored Oct 20, 2023
1 parent 7c0fab9 commit c005516
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,26 +48,39 @@ public void addProxyProtocol(ChannelPipeline pipeline) {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (withProxyProtocol && isHAPMDetected(msg)) {
if (!withProxyProtocol) {
ctx.pipeline().remove(this);
super.channelRead(ctx, msg);
return;
}

ProtocolDetectionState haProxyState = getDetectionState(msg);
if (haProxyState == ProtocolDetectionState.DETECTED) {
ctx.pipeline()
.addAfter(NAME, null, new HAProxyMessageChannelHandler())
.replace(this, null, new HAProxyMessageDecoder());
} else {
if (withProxyProtocol) {
final int port = ctx.channel()
.attr(SourceAddressChannelHandler.ATTR_SERVER_LOCAL_PORT)
.get();
// This likely means initialization was requested with proxy protocol, but we encountered a non-ppv2
// message
registry.counter("zuul.hapm.decode", "success", "false", "port", String.valueOf(port))
.increment();
}
final int port = ctx.channel()
.attr(SourceAddressChannelHandler.ATTR_SERVER_LOCAL_PORT)
.get();

// This likely means initialization was requested with proxy protocol, but we encountered a non-ppv2
// message
registry.counter(
"zuul.hapm.decode",
"success",
"false",
"port",
String.valueOf(port),
"needs_more_data",
String.valueOf(haProxyState == ProtocolDetectionState.NEEDS_MORE_DATA))
.increment();
ctx.pipeline().remove(this);
}
super.channelRead(ctx, msg);
}

private boolean isHAPMDetected(Object msg) {
return HAProxyMessageDecoder.detectProtocol((ByteBuf) msg).state() == ProtocolDetectionState.DETECTED;
private ProtocolDetectionState getDetectionState(Object msg) {
return HAProxyMessageDecoder.detectProtocol((ByteBuf) msg).state();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ void incrementCounterWhenPPEnabledButNonHAPMMessage() {
assertEquals(dropped, buf);
buf.release();

final Counter counter = registry.counter("zuul.hapm.decode", "success", "false", "port", String.valueOf(port));
final Counter counter = registry.counter(
"zuul.hapm.decode", "success", "false", "port", String.valueOf(port), "needs_more_data", "false");
assertEquals(1, counter.count());
}

Expand Down Expand Up @@ -181,6 +182,31 @@ void detectsSplitPpv1Message() {
assertNull(channel.pipeline().context(ElbProxyProtocolChannelHandler.class));
}

@Test
void tracksSplitMessage() {
EmbeddedChannel channel = new EmbeddedChannel();
// This is normally done by Server.
channel.attr(Server.CONN_DIMENSIONS).set(Attrs.newInstance());
int port = 7007;
channel.attr(SourceAddressChannelHandler.ATTR_SERVER_LOCAL_PORT).set(port);

channel.pipeline()
.addLast(ElbProxyProtocolChannelHandler.NAME, new ElbProxyProtocolChannelHandler(registry, true));
ByteBuf buf1 = Unpooled.wrappedBuffer("PROXY TCP4".getBytes(StandardCharsets.US_ASCII));
channel.writeInbound(buf1);

Object msg = channel.readInbound();
assertEquals(buf1, msg);
buf1.release();

// The handler should remove itself.
assertNull(channel.pipeline().context(ElbProxyProtocolChannelHandler.class));

Counter counter = registry.counter(
"zuul.hapm.decode", "success", "false", "port", String.valueOf(port), "needs_more_data", "true");
assertEquals(1, counter.count());
}

@Test
void negotiateProxy_ppv1_ipv4() {
EmbeddedChannel channel = new EmbeddedChannel();
Expand Down

0 comments on commit c005516

Please sign in to comment.