diff --git a/zuul-core/src/main/java/com/netflix/netty/common/proxyprotocol/ElbProxyProtocolChannelHandler.java b/zuul-core/src/main/java/com/netflix/netty/common/proxyprotocol/ElbProxyProtocolChannelHandler.java index 12e483f405..12ad24ea13 100644 --- a/zuul-core/src/main/java/com/netflix/netty/common/proxyprotocol/ElbProxyProtocolChannelHandler.java +++ b/zuul-core/src/main/java/com/netflix/netty/common/proxyprotocol/ElbProxyProtocolChannelHandler.java @@ -48,26 +48,39 @@ public void addProxyProtocol(ChannelPipeline pipeline) { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (withProxyProtocol && isHAPMDetected(msg)) { + if (!withProxyProtocol) { + ctx.pipeline().remove(this); + super.channelRead(ctx, msg); + return; + } + + ProtocolDetectionState haProxyState = getDetectionState(msg); + if (haProxyState == ProtocolDetectionState.DETECTED) { ctx.pipeline() .addAfter(NAME, null, new HAProxyMessageChannelHandler()) .replace(this, null, new HAProxyMessageDecoder()); } else { - if (withProxyProtocol) { - final int port = ctx.channel() - .attr(SourceAddressChannelHandler.ATTR_SERVER_LOCAL_PORT) - .get(); - // This likely means initialization was requested with proxy protocol, but we encountered a non-ppv2 - // message - registry.counter("zuul.hapm.decode", "success", "false", "port", String.valueOf(port)) - .increment(); - } + final int port = ctx.channel() + .attr(SourceAddressChannelHandler.ATTR_SERVER_LOCAL_PORT) + .get(); + + // This likely means initialization was requested with proxy protocol, but we encountered a non-ppv2 + // message + registry.counter( + "zuul.hapm.decode", + "success", + "false", + "port", + String.valueOf(port), + "needs_more_data", + String.valueOf(haProxyState == ProtocolDetectionState.NEEDS_MORE_DATA)) + .increment(); ctx.pipeline().remove(this); } super.channelRead(ctx, msg); } - private boolean isHAPMDetected(Object msg) { - return HAProxyMessageDecoder.detectProtocol((ByteBuf) msg).state() == ProtocolDetectionState.DETECTED; + private ProtocolDetectionState getDetectionState(Object msg) { + return HAProxyMessageDecoder.detectProtocol((ByteBuf) msg).state(); } } diff --git a/zuul-core/src/test/java/com/netflix/netty/common/proxyprotocol/ElbProxyProtocolChannelHandlerTest.java b/zuul-core/src/test/java/com/netflix/netty/common/proxyprotocol/ElbProxyProtocolChannelHandlerTest.java index f7b2960a3d..120b553acf 100644 --- a/zuul-core/src/test/java/com/netflix/netty/common/proxyprotocol/ElbProxyProtocolChannelHandlerTest.java +++ b/zuul-core/src/test/java/com/netflix/netty/common/proxyprotocol/ElbProxyProtocolChannelHandlerTest.java @@ -151,7 +151,8 @@ void incrementCounterWhenPPEnabledButNonHAPMMessage() { assertEquals(dropped, buf); buf.release(); - final Counter counter = registry.counter("zuul.hapm.decode", "success", "false", "port", String.valueOf(port)); + final Counter counter = registry.counter( + "zuul.hapm.decode", "success", "false", "port", String.valueOf(port), "needs_more_data", "false"); assertEquals(1, counter.count()); } @@ -181,6 +182,31 @@ void detectsSplitPpv1Message() { assertNull(channel.pipeline().context(ElbProxyProtocolChannelHandler.class)); } + @Test + void tracksSplitMessage() { + EmbeddedChannel channel = new EmbeddedChannel(); + // This is normally done by Server. + channel.attr(Server.CONN_DIMENSIONS).set(Attrs.newInstance()); + int port = 7007; + channel.attr(SourceAddressChannelHandler.ATTR_SERVER_LOCAL_PORT).set(port); + + channel.pipeline() + .addLast(ElbProxyProtocolChannelHandler.NAME, new ElbProxyProtocolChannelHandler(registry, true)); + ByteBuf buf1 = Unpooled.wrappedBuffer("PROXY TCP4".getBytes(StandardCharsets.US_ASCII)); + channel.writeInbound(buf1); + + Object msg = channel.readInbound(); + assertEquals(buf1, msg); + buf1.release(); + + // The handler should remove itself. + assertNull(channel.pipeline().context(ElbProxyProtocolChannelHandler.class)); + + Counter counter = registry.counter( + "zuul.hapm.decode", "success", "false", "port", String.valueOf(port), "needs_more_data", "true"); + assertEquals(1, counter.count()); + } + @Test void negotiateProxy_ppv1_ipv4() { EmbeddedChannel channel = new EmbeddedChannel();