From 0dbad3b44add706d1b0cc84a1db174aefa585e64 Mon Sep 17 00:00:00 2001
From: Zixuan Liu
Date: Wed, 21 Feb 2024 23:39:28 +0800
Subject: [PATCH] Upgrade mockito from 3.12.4 to 4.11.0 (#4218)
---
.../apache/bookkeeper/bookie/BookieImpl.java | 26 ++++++++++---
.../org/apache/bookkeeper/bookie/Journal.java | 38 +++++++++++++------
.../bookie/BookieMultipleJournalsTest.java | 4 +-
pom.xml | 2 +-
4 files changed, 50 insertions(+), 20 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index 654ace45476..a660a13ce84 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -87,7 +87,7 @@
/**
* Implements a bookie.
*/
-public class BookieImpl extends BookieCriticalThread implements Bookie {
+public class BookieImpl implements Bookie {
private static final Logger LOG = LoggerFactory.getLogger(Bookie.class);
@@ -119,6 +119,8 @@ public class BookieImpl extends BookieCriticalThread implements Bookie {
protected StateManager stateManager;
+ private BookieCriticalThread bookieThread;
+
// Expose Stats
final StatsLogger statsLogger;
private final BookieStats bookieStats;
@@ -390,7 +392,6 @@ public BookieImpl(ServerConfiguration conf,
ByteBufAllocator allocator,
Supplier bookieServiceInfoProvider)
throws IOException, InterruptedException, BookieException {
- super("Bookie-" + conf.getBookiePort());
this.bookieServiceInfoProvider = bookieServiceInfoProvider;
this.statsLogger = statsLogger;
this.conf = conf;
@@ -656,7 +657,9 @@ private void replay(Journal journal, JournalScanner scanner) throws IOException
@Override
public synchronized void start() {
- setDaemon(true);
+ bookieThread = new BookieCriticalThread(() -> run(), "Bookie-" + conf.getBookiePort());
+ bookieThread.setDaemon(true);
+
ThreadRegistry.register("BookieThread", 0);
if (LOG.isDebugEnabled()) {
LOG.debug("I'm starting a bookie with journal directories {}",
@@ -717,7 +720,7 @@ public synchronized void start() {
syncThread.start();
// start bookie thread
- super.start();
+ bookieThread.start();
// After successful bookie startup, register listener for disk
// error/full notifications.
@@ -741,6 +744,20 @@ public synchronized void start() {
}
}
+ @Override
+ public void join() throws InterruptedException {
+ if (bookieThread != null) {
+ bookieThread.join();
+ }
+ }
+
+ public boolean isAlive() {
+ if (bookieThread == null) {
+ return false;
+ }
+ return bookieThread.isAlive();
+ }
+
/*
* Get the DiskFailure listener for the bookie
*/
@@ -824,7 +841,6 @@ public boolean isRunning() {
return stateManager.isRunning();
}
- @Override
public void run() {
// start journals
for (Journal journal: journals) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 3b730cb476e..df10dfe5226 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -42,6 +42,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.bookie.stats.JournalStats;
import org.apache.bookkeeper.common.collections.BatchedArrayBlockingQueue;
@@ -66,13 +67,15 @@
/**
* Provide journal related management.
*/
-public class Journal extends BookieCriticalThread implements CheckpointSource {
+public class Journal implements CheckpointSource {
private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
private static final RecyclableArrayList.Recycler entryListRecycler =
new RecyclableArrayList.Recycler();
+ private BookieCriticalThread thread;
+
/**
* Filter to pickup journals.
*/
@@ -461,13 +464,13 @@ private class ForceWriteThread extends BookieCriticalThread {
volatile boolean running = true;
// This holds the queue entries that should be notified after a
// successful force write
- Thread threadToNotifyOnEx;
+ Consumer threadToNotifyOnEx;
// should we group force writes
private final boolean enableGroupForceWrites;
private final Counter forceWriteThreadTime;
- public ForceWriteThread(Thread threadToNotifyOnEx,
+ public ForceWriteThread(Consumer threadToNotifyOnEx,
boolean enableGroupForceWrites,
StatsLogger statsLogger) {
super("ForceWriteThread");
@@ -531,7 +534,7 @@ public void run() {
// Regardless of what caused us to exit, we should notify the
// the parent thread as it should either exit or be in the process
// of exiting else we will have write requests hang
- threadToNotifyOnEx.interrupt();
+ threadToNotifyOnEx.accept(null);
}
private void syncJournal(ForceWriteRequest lastRequest) throws IOException {
@@ -652,8 +655,6 @@ public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf
public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf,
LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger, ByteBufAllocator allocator) {
- super(journalThreadName + "-" + conf.getBookiePort());
- this.setPriority(Thread.MAX_PRIORITY);
this.allocator = allocator;
StatsLogger journalStatsLogger = statsLogger.scopeLabel("journalIndex", String.valueOf(journalIndex));
@@ -678,8 +679,8 @@ public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf
this.journalWriteBufferSize = conf.getJournalWriteBufferSizeKB() * KB;
this.syncData = conf.getJournalSyncData();
this.maxBackupJournals = conf.getMaxBackupJournals();
- this.forceWriteThread = new ForceWriteThread(this, conf.getJournalAdaptiveGroupWrites(),
- journalStatsLogger);
+ this.forceWriteThread = new ForceWriteThread((__) -> this.interruptThread(),
+ conf.getJournalAdaptiveGroupWrites(), journalStatsLogger);
this.maxGroupWaitInNanos = TimeUnit.MILLISECONDS.toNanos(conf.getJournalMaxGroupWaitMSec());
this.bufferedWritesThreshold = conf.getJournalBufferedWritesThreshold();
this.bufferedEntriesThreshold = conf.getJournalBufferedEntriesThreshold();
@@ -956,7 +957,6 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(),
*
* @see org.apache.bookkeeper.bookie.SyncThread
*/
- @Override
public void run() {
LOG.info("Starting journal on {}", journalDirectory);
ThreadRegistry.register(journalThreadName, 0);
@@ -1255,8 +1255,8 @@ public synchronized void shutdown() {
forceWriteThread.shutdown();
running = false;
- this.interrupt();
- this.join();
+ this.interruptThread();
+ this.joinThread();
LOG.info("Finished Shutting down Journal thread");
} catch (IOException | InterruptedException ie) {
Thread.currentThread().interrupt();
@@ -1284,7 +1284,21 @@ private static int fullRead(JournalChannel fc, ByteBuffer bb) throws IOException
*/
@VisibleForTesting
public void joinThread() throws InterruptedException {
- join();
+ if (thread != null) {
+ thread.join();
+ }
+ }
+
+ public void interruptThread() {
+ if (thread != null) {
+ thread.interrupt();
+ }
+ }
+
+ public synchronized void start() {
+ thread = new BookieCriticalThread(() -> run(), journalThreadName + "-" + conf.getBookiePort());
+ thread.setPriority(Thread.MAX_PRIORITY);
+ thread.start();
}
long getMemoryUsage() {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java
index b3622b3bb65..664e31541bf 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java
@@ -74,7 +74,7 @@ public void testJournalExit() throws Exception {
Field journalList = bookie.getClass().getDeclaredField("journals");
journalList.setAccessible(true);
List journals = (List) journalList.get(bookie);
- journals.get(0).interrupt();
+ journals.get(0).interruptThread();
Awaitility.await().untilAsserted(() -> assertFalse(bookie.isRunning()));
}
@@ -92,7 +92,7 @@ public void testJournalExitAndShutdown() throws Exception {
Field journalList = bookie.getClass().getDeclaredField("journals");
journalList.setAccessible(true);
List journals = (List) journalList.get(bookie);
- journals.get(0).interrupt();
+ journals.get(0).interruptThread();
bookie.shutdown(ExitCode.OK);
Awaitility.await().untilAsserted(() -> assertFalse(bookie.isRunning()));
}
diff --git a/pom.xml b/pom.xml
index 69d1bbe56b0..215700bce0f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -153,7 +153,7 @@
1.18.30
2.18.0
1.3.0
- 3.12.4
+ 4.11.0
4.1.104.Final
0.0.24.Final
9.1.3