From 37708ade69129db1ff05bfd82d1f4f5b75593053 Mon Sep 17 00:00:00 2001 From: Kang Zou Date: Fri, 22 Mar 2024 00:12:02 +0800 Subject: [PATCH] Issue 4200: fix flaky test DeferredSyncTest.testForceWillAdvanceLacOnlyUpToLastAcknoledgedWrite (#4234) * Issue 4200: fix flaky test DeferredSyncTest.testForceWillAdvanceLacOnlyUpToLastAcknoledgedWrite * add a comment --- .../client/MockBookKeeperTestCase.java | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java index f43b6136c81..e1881d74771 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java @@ -303,8 +303,15 @@ protected void suspendBookieForceLedgerAcks(BookieId address) { } protected void resumeBookieWriteAcks(BookieId address) { - suspendedBookiesForForceLedgerAcks.remove(address); - List pendingResponses = deferredBookieForceLedgerResponses.remove(address); + List pendingResponses; + + // why use the BookieId instance as the object monitor? there is a date race problem if not + // see https://github.com/apache/bookkeeper/issues/4200 + synchronized (address) { + suspendedBookiesForForceLedgerAcks.remove(address); + pendingResponses = deferredBookieForceLedgerResponses.remove(address); + } + if (pendingResponses != null) { pendingResponses.forEach(Runnable::run); } @@ -656,11 +663,17 @@ protected void setupBookieClientForceLedger() { callback.forceLedgerComplete(BKException.Code.OK, ledgerId, bookieSocketAddress, ctx); }); }; - if (suspendedBookiesForForceLedgerAcks.contains(bookieSocketAddress)) { - List queue = deferredBookieForceLedgerResponses.computeIfAbsent(bookieSocketAddress, - (k) -> new CopyOnWriteArrayList<>()); - queue.add(activity); - } else { + List queue = null; + + synchronized (bookieSocketAddress) { + if (suspendedBookiesForForceLedgerAcks.contains(bookieSocketAddress)) { + queue = deferredBookieForceLedgerResponses.computeIfAbsent(bookieSocketAddress, + (k) -> new CopyOnWriteArrayList<>()); + queue.add(activity); + } + } + + if (queue == null) { activity.run(); } return null;