diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index 654ace45476..a660a13ce84 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -87,7 +87,7 @@ /** * Implements a bookie. */ -public class BookieImpl extends BookieCriticalThread implements Bookie { +public class BookieImpl implements Bookie { private static final Logger LOG = LoggerFactory.getLogger(Bookie.class); @@ -119,6 +119,8 @@ public class BookieImpl extends BookieCriticalThread implements Bookie { protected StateManager stateManager; + private BookieCriticalThread bookieThread; + // Expose Stats final StatsLogger statsLogger; private final BookieStats bookieStats; @@ -390,7 +392,6 @@ public BookieImpl(ServerConfiguration conf, ByteBufAllocator allocator, Supplier bookieServiceInfoProvider) throws IOException, InterruptedException, BookieException { - super("Bookie-" + conf.getBookiePort()); this.bookieServiceInfoProvider = bookieServiceInfoProvider; this.statsLogger = statsLogger; this.conf = conf; @@ -656,7 +657,9 @@ private void replay(Journal journal, JournalScanner scanner) throws IOException @Override public synchronized void start() { - setDaemon(true); + bookieThread = new BookieCriticalThread(() -> run(), "Bookie-" + conf.getBookiePort()); + bookieThread.setDaemon(true); + ThreadRegistry.register("BookieThread", 0); if (LOG.isDebugEnabled()) { LOG.debug("I'm starting a bookie with journal directories {}", @@ -717,7 +720,7 @@ public synchronized void start() { syncThread.start(); // start bookie thread - super.start(); + bookieThread.start(); // After successful bookie startup, register listener for disk // error/full notifications. @@ -741,6 +744,20 @@ public synchronized void start() { } } + @Override + public void join() throws InterruptedException { + if (bookieThread != null) { + bookieThread.join(); + } + } + + public boolean isAlive() { + if (bookieThread == null) { + return false; + } + return bookieThread.isAlive(); + } + /* * Get the DiskFailure listener for the bookie */ @@ -824,7 +841,6 @@ public boolean isRunning() { return stateManager.isRunning(); } - @Override public void run() { // start journals for (Journal journal: journals) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 3b730cb476e..df10dfe5226 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -42,6 +42,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; import org.apache.bookkeeper.bookie.stats.JournalStats; import org.apache.bookkeeper.common.collections.BatchedArrayBlockingQueue; @@ -66,13 +67,15 @@ /** * Provide journal related management. */ -public class Journal extends BookieCriticalThread implements CheckpointSource { +public class Journal implements CheckpointSource { private static final Logger LOG = LoggerFactory.getLogger(Journal.class); private static final RecyclableArrayList.Recycler entryListRecycler = new RecyclableArrayList.Recycler(); + private BookieCriticalThread thread; + /** * Filter to pickup journals. */ @@ -461,13 +464,13 @@ private class ForceWriteThread extends BookieCriticalThread { volatile boolean running = true; // This holds the queue entries that should be notified after a // successful force write - Thread threadToNotifyOnEx; + Consumer threadToNotifyOnEx; // should we group force writes private final boolean enableGroupForceWrites; private final Counter forceWriteThreadTime; - public ForceWriteThread(Thread threadToNotifyOnEx, + public ForceWriteThread(Consumer threadToNotifyOnEx, boolean enableGroupForceWrites, StatsLogger statsLogger) { super("ForceWriteThread"); @@ -531,7 +534,7 @@ public void run() { // Regardless of what caused us to exit, we should notify the // the parent thread as it should either exit or be in the process // of exiting else we will have write requests hang - threadToNotifyOnEx.interrupt(); + threadToNotifyOnEx.accept(null); } private void syncJournal(ForceWriteRequest lastRequest) throws IOException { @@ -652,8 +655,6 @@ public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger, ByteBufAllocator allocator) { - super(journalThreadName + "-" + conf.getBookiePort()); - this.setPriority(Thread.MAX_PRIORITY); this.allocator = allocator; StatsLogger journalStatsLogger = statsLogger.scopeLabel("journalIndex", String.valueOf(journalIndex)); @@ -678,8 +679,8 @@ public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf this.journalWriteBufferSize = conf.getJournalWriteBufferSizeKB() * KB; this.syncData = conf.getJournalSyncData(); this.maxBackupJournals = conf.getMaxBackupJournals(); - this.forceWriteThread = new ForceWriteThread(this, conf.getJournalAdaptiveGroupWrites(), - journalStatsLogger); + this.forceWriteThread = new ForceWriteThread((__) -> this.interruptThread(), + conf.getJournalAdaptiveGroupWrites(), journalStatsLogger); this.maxGroupWaitInNanos = TimeUnit.MILLISECONDS.toNanos(conf.getJournalMaxGroupWaitMSec()); this.bufferedWritesThreshold = conf.getJournalBufferedWritesThreshold(); this.bufferedEntriesThreshold = conf.getJournalBufferedEntriesThreshold(); @@ -956,7 +957,6 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(), *

* @see org.apache.bookkeeper.bookie.SyncThread */ - @Override public void run() { LOG.info("Starting journal on {}", journalDirectory); ThreadRegistry.register(journalThreadName, 0); @@ -1255,8 +1255,8 @@ public synchronized void shutdown() { forceWriteThread.shutdown(); running = false; - this.interrupt(); - this.join(); + this.interruptThread(); + this.joinThread(); LOG.info("Finished Shutting down Journal thread"); } catch (IOException | InterruptedException ie) { Thread.currentThread().interrupt(); @@ -1284,7 +1284,21 @@ private static int fullRead(JournalChannel fc, ByteBuffer bb) throws IOException */ @VisibleForTesting public void joinThread() throws InterruptedException { - join(); + if (thread != null) { + thread.join(); + } + } + + public void interruptThread() { + if (thread != null) { + thread.interrupt(); + } + } + + public synchronized void start() { + thread = new BookieCriticalThread(() -> run(), journalThreadName + "-" + conf.getBookiePort()); + thread.setPriority(Thread.MAX_PRIORITY); + thread.start(); } long getMemoryUsage() { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java index b3622b3bb65..664e31541bf 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java @@ -74,7 +74,7 @@ public void testJournalExit() throws Exception { Field journalList = bookie.getClass().getDeclaredField("journals"); journalList.setAccessible(true); List journals = (List) journalList.get(bookie); - journals.get(0).interrupt(); + journals.get(0).interruptThread(); Awaitility.await().untilAsserted(() -> assertFalse(bookie.isRunning())); } @@ -92,7 +92,7 @@ public void testJournalExitAndShutdown() throws Exception { Field journalList = bookie.getClass().getDeclaredField("journals"); journalList.setAccessible(true); List journals = (List) journalList.get(bookie); - journals.get(0).interrupt(); + journals.get(0).interruptThread(); bookie.shutdown(ExitCode.OK); Awaitility.await().untilAsserted(() -> assertFalse(bookie.isRunning())); } diff --git a/pom.xml b/pom.xml index 69d1bbe56b0..215700bce0f 100644 --- a/pom.xml +++ b/pom.xml @@ -153,7 +153,7 @@ 1.18.30 2.18.0 1.3.0 - 3.12.4 + 4.11.0 4.1.104.Final 0.0.24.Final 9.1.3