Skip to content

Commit

Permalink
[fix] Fix ByteBuf release/retain in PerChannelBookClient (#4289)
Browse files Browse the repository at this point in the history
* [fix] ByteBuf release/retain incorrect

* improve the code comment

* fix other cases

* modify the code comment

* improve the code

* improve the test

* add description
  • Loading branch information
poorbarcode authored and merlimat committed Apr 17, 2024
1 parent 996b5d4 commit 02a06c4
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -842,10 +842,10 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen
cb, ctx, ledgerId, entryId));
final Channel c = channel;
if (c == null) {
// usually checked in writeAndFlush, but we have extra check
// because we need to release toSend.
// Manually release the binary data(variable "request") that we manually created when it can not be sent out
// because the channel is switching.
errorOut(completionKey);
ReferenceCountUtil.release(toSend);
ReferenceCountUtil.release(request);
return;
} else {
// addEntry times out on backpressure
Expand Down Expand Up @@ -1180,6 +1180,7 @@ private void writeAndFlush(final Channel channel,
if (channel == null) {
LOG.warn("Operation {} failed: channel == null", StringUtils.requestToString(request));
errorOut(key);
ReferenceCountUtil.release(request);
return;
}

Expand All @@ -1194,6 +1195,7 @@ private void writeAndFlush(final Channel channel,
StringUtils.requestToString(request));

errorOut(key, BKException.Code.TooManyRequestsException);
ReferenceCountUtil.release(request);
return;
}

Expand All @@ -1215,6 +1217,9 @@ private void writeAndFlush(final Channel channel,
} catch (Throwable e) {
LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
errorOut(key);
// If the request goes into the writeAndFlush, it should be handled well by Netty. So all the exceptions we
// get here, we can release the request.
ReferenceCountUtil.release(request);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,31 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.MockUncleanShutdownDetection;
import org.apache.bookkeeper.bookie.TestBookieImpl;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperClientStats;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.client.api.WriteFlag;
Expand All @@ -57,27 +63,32 @@
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookieClientImpl;
import org.apache.bookkeeper.proto.BookieProtoEncoding;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.BookkeeperProtocol;
import org.apache.bookkeeper.proto.DataFormats;
import org.apache.bookkeeper.proto.PerChannelBookieClient;
import org.apache.bookkeeper.proto.PerChannelBookieClientPool;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.IOUtils;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
* Test the bookie client.
*/
@Slf4j
public class BookieClientTest {
BookieServer bs;
File tmpDir;
Expand Down Expand Up @@ -745,4 +756,164 @@ public void testBatchedReadWithMaxSizeLimitCase2() throws Exception {
assertTrue(Arrays.equals(kbData, bytes));
}
}

/**
* Explain the stacks of "BookieClientImpl.addEntry" here
* 1.`BookieClientImpl.addEntry`.
* a.Retain the `ByteBuf` before get `PerChannelBookieClient`. We call this `ByteBuf` as `toSend` in the
* following sections. `toSend.recCnf` is `2` now.
* 2.`Get PerChannelBookieClient`.
* 3.`ChannelReadyForAddEntryCallback.operationComplete`
* a.`PerChannelBookieClient.addEntry`
* a-1.Build a new ByteBuf for request command. We call this `ByteBuf` new as `request` in the following
* sections.
* a-2.`channle.writeAndFlush(request)` or release the ByteBuf when `channel` is switching.
* Note the callback will be called immediately if the channel is switching.
* b.Release the `ByteBuf` since it has been retained at `step 1`. `toSend.recCnf` should be `1` now.
*/
public void testDataRefCnfWhenReconnect(boolean useV2WireProtocol, boolean smallPayload,
boolean withDelayReconnect, boolean withDelayAddEntry,
int tryTimes) throws Exception {
final long ledgerId = 1;
final BookieId addr = bs.getBookieId();
// Build passwd.
byte[] passwd = new byte[20];
Arrays.fill(passwd, (byte) 'a');
// Build digest manager.
DigestManager digestManager = DigestManager.instantiate(1, passwd,
BookKeeper.DigestType.toProtoDigestType(BookKeeper.DigestType.DUMMY),
PooledByteBufAllocator.DEFAULT, useV2WireProtocol);
// Build client.
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setUseV2WireProtocol(useV2WireProtocol);
BookieClientImpl client = new BookieClientImpl(clientConf, eventLoopGroup,
UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);

// Inject a reconnect event.
// 1. Get the channel that will be used.
// 2. Call add entry.
// 3. Another thread close the channel that is using.
for (int i = 0; i < tryTimes; i++) {
long entryId = i + 1;
long lac = i;
// Build payload.
int payloadLen;
ByteBuf payload;
if (smallPayload) {
payloadLen = 1;
payload = PooledByteBufAllocator.DEFAULT.buffer(1);
payload.writeByte(1);
} else {
payloadLen = BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD;
payload = PooledByteBufAllocator.DEFAULT.buffer();
byte[] bs = new byte[payloadLen];
payload.writeBytes(bs);
}

// Digest.
ReferenceCounted bb = digestManager.computeDigestAndPackageForSending(entryId, lac,
payloadLen * entryId, payload, passwd, BookieProtocol.FLAG_NONE);
log.info("Before send. bb.refCnf: {}", bb.refCnt());

// Step: get the channel that will be used.
PerChannelBookieClientPool perChannelBookieClientPool = client.lookupClient(addr);
AtomicReference<PerChannelBookieClient> perChannelBookieClient = new AtomicReference<>();
perChannelBookieClientPool.obtain((rc, result) -> perChannelBookieClient.set(result), ledgerId);
Awaitility.await().untilAsserted(() -> {
assertNotNull(perChannelBookieClient.get());
});

// Step: Inject a reconnect event.
final int delayMillis = i;
new Thread(() -> {
if (withDelayReconnect) {
sleep(delayMillis);
}
Channel channel = WhiteboxImpl.getInternalState(perChannelBookieClient.get(), "channel");
if (channel != null) {
channel.close();
}
}).start();
if (withDelayAddEntry) {
sleep(delayMillis);
}

// Step: add entry.
AtomicBoolean callbackExecuted = new AtomicBoolean();
WriteCallback callback = (rc, lId, eId, socketAddr, ctx) -> {
log.info("Writing is finished. rc: {}, withDelayReconnect: {}, withDelayAddEntry: {}, ledgerId: {},"
+ " entryId: {}, socketAddr: {}, ctx: {}",
rc, withDelayReconnect, withDelayAddEntry, lId, eId, socketAddr, ctx);
callbackExecuted.set(true);
};
client.addEntry(addr, ledgerId, passwd, entryId, bb, callback, i, BookieProtocol.FLAG_NONE, false,
WriteFlag.NONE);
// Wait for adding entry is finish.
Awaitility.await().untilAsserted(() -> assertTrue(callbackExecuted.get()));
// The steps have be explained on the method description.
// Since the step "3-a-2" always runs before the step "3-b", so the "callbackExecuted" will be finished
// before the step "3-b". Add a sleep to wait the step "3-a-2" is finish.
Thread.sleep(100);
// Check the ref count.
Awaitility.await().atMost(Duration.ofSeconds(60)).untilAsserted(() -> {
assertEquals(1, bb.refCnt());
// V2 will release this original data if it is a small.
if (!useV2WireProtocol && !smallPayload) {
assertEquals(1, payload.refCnt());
}
});
bb.release();
// V2 will release this original data if it is a small.
if (!useV2WireProtocol && !smallPayload) {
payload.release();
}
}
// cleanup.
client.close();
}

private void sleep(int milliSeconds) {
try {
if (milliSeconds > 0) {
Thread.sleep(1);
}
} catch (InterruptedException e) {
log.warn("Error occurs", e);
}
}

/**
* Relate to https://github.com/apache/bookkeeper/pull/4289.
*/
@Test
public void testDataRefCnfWhenReconnectV2() throws Exception {
// Large payload.
// Run this test may not reproduce the issue, you can reproduce the issue this way:
// 1. Add two break points.
// a. At the line "Channel c = channel" in the method PerChannelBookieClient.addEntry.
// b. At the line "channel = null" in the method "PerChannelBookieClient.channelInactive".
// 2. Make the break point b to run earlier than the break point a during debugging.
testDataRefCnfWhenReconnect(true, false, false, false, 10);
testDataRefCnfWhenReconnect(true, false, true, false, 10);
testDataRefCnfWhenReconnect(true, false, false, true, 10);

// Small payload.
// There is no issue without https://github.com/apache/bookkeeper/pull/4289, just add a test for this scenario.
testDataRefCnfWhenReconnect(true, true, false, false, 10);
testDataRefCnfWhenReconnect(true, true, true, false, 10);
testDataRefCnfWhenReconnect(true, true, false, true, 10);
}

/**
* Please see the comment of the scenario "Large payload" in the {@link #testDataRefCnfWhenReconnectV2()} if you
* can not reproduce the issue when running this test.
* Relate to https://github.com/apache/bookkeeper/pull/4289.
*/
@Test
public void testDataRefCnfWhenReconnectV3() throws Exception {
testDataRefCnfWhenReconnect(false, true, false, false, 10);
testDataRefCnfWhenReconnect(false, true, true, false, 10);
testDataRefCnfWhenReconnect(false, true, false, true, 10);
}
}

0 comments on commit 02a06c4

Please sign in to comment.