Skip to content

Commit

Permalink
DRILL-8489: Sender memory leak when rpc encode exception
Browse files Browse the repository at this point in the history
  • Loading branch information
shfshihuafeng committed Apr 17, 2024
1 parent af7cfcd commit 55a9546
Showing 1 changed file with 10 additions and 2 deletions.
12 changes: 10 additions & 2 deletions exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.MessageToMessageEncoder;

import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;

import io.netty.util.ReferenceCounted;
import org.apache.drill.exec.proto.GeneralRPCProtos.CompleteRpcMessage;
import org.apache.drill.exec.proto.GeneralRPCProtos.RpcHeader;

Expand Down Expand Up @@ -61,7 +64,7 @@ protected void encode(ChannelHandlerContext ctx, OutboundRpcMessage msg, List<Ob
msg.release();
return;
}

boolean encodeSucess = false;
try{
if (RpcConstants.EXTRA_DEBUGGING) {
logger.debug("Encoding outbound message {}", msg);
Expand Down Expand Up @@ -131,9 +134,14 @@ protected void encode(ChannelHandlerContext ctx, OutboundRpcMessage msg, List<Ob
if (RpcConstants.EXTRA_DEBUGGING) {
logger.debug("Sent message. Ending writer index was {}.", buf.writerIndex());
}
encodeSucess = true;
} finally {
// make sure to release Rpc Messages underlying byte buffers.
//msg.release();

//if msg instanceof ReferenceCounted,netty can release msg.
if (!encodeSucess && !(msg instanceof ReferenceCounted)) {
msg.release();
}
}
}

Expand Down

0 comments on commit 55a9546

Please sign in to comment.