From d13dc202bef9dd554422e4e3a722b9c12dc74fd2 Mon Sep 17 00:00:00 2001 From: Tibor Billes Date: Mon, 13 Jul 2015 17:43:35 +0200 Subject: [PATCH] Implements interrupt-safe transaction log MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When an interrupt happened on a thread during writing the transaction log, the underlying FileChannel was closed as a result. Any further use (even from another threads) threw a java.nio.channels.ClosedChannelException rendering the transaction manager unusable since the transaction logs were never reopened. This commit changes this behavior, log file operations reopen the file if it was closed by an interrupt of another thread. fixes bitronix/btm#45 Commit by Tibor Billes, Miklós Karakó, Balázs Póka --- btm/pom.xml | 20 +- .../tm/gui/TransactionLogHeaderPanel.java | 5 +- .../InterruptibleLockedRandomAccessFile.java | 104 +++++ .../tm/journal/TransactionLogAppender.java | 41 +- .../tm/journal/TransactionLogHeader.java | 24 +- .../bitronix/tm/journal/ByteBufferUtil.java | 18 + .../tm/journal/DiskJournalInterruptTest.java | 73 ++++ .../bitronix/tm/journal/InterruptService.java | 53 +++ .../tm/journal/InterruptableThreadList.java | 40 ++ ...tibleLockedRandomAccessFileStressTest.java | 149 +++++++ ...terruptibleLockedRandomAccessFileTest.java | 387 ++++++++++++++++++ 11 files changed, 873 insertions(+), 41 deletions(-) create mode 100644 btm/src/main/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFile.java create mode 100644 btm/src/test/java/bitronix/tm/journal/ByteBufferUtil.java create mode 100644 btm/src/test/java/bitronix/tm/journal/DiskJournalInterruptTest.java create mode 100644 btm/src/test/java/bitronix/tm/journal/InterruptService.java create mode 100644 btm/src/test/java/bitronix/tm/journal/InterruptableThreadList.java create mode 100644 btm/src/test/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFileStressTest.java create mode 100644 btm/src/test/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFileTest.java diff --git a/btm/pom.xml b/btm/pom.xml index 1322cd9e..2d5fddd6 100644 --- a/btm/pom.xml +++ b/btm/pom.xml @@ -12,7 +12,7 @@ bundle - + javax.transaction jta provided @@ -70,6 +70,24 @@ provided true + + org.apache.commons + commons-io + 1.3.2 + test + + + org.apache.commons + commons-lang3 + 3.4 + test + + + org.easytesting + fest-assert-core + 2.0M10 + test + diff --git a/btm/src/main/java/bitronix/tm/gui/TransactionLogHeaderPanel.java b/btm/src/main/java/bitronix/tm/gui/TransactionLogHeaderPanel.java index 0d334a3f..4d5a4a01 100644 --- a/btm/src/main/java/bitronix/tm/gui/TransactionLogHeaderPanel.java +++ b/btm/src/main/java/bitronix/tm/gui/TransactionLogHeaderPanel.java @@ -16,6 +16,7 @@ package bitronix.tm.gui; import bitronix.tm.journal.TransactionLogHeader; +import bitronix.tm.journal.InterruptibleLockedRandomAccessFile; import bitronix.tm.utils.Decoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,8 +75,8 @@ public void setPosition(long position) { } public void read(File logFile, boolean active) throws IOException { - RandomAccessFile raf = new RandomAccessFile(logFile, "r"); - TransactionLogHeader header = new TransactionLogHeader(raf.getChannel(), 0L); + InterruptibleLockedRandomAccessFile raf = new InterruptibleLockedRandomAccessFile(logFile, "r"); + TransactionLogHeader header = new TransactionLogHeader(raf, 0L); raf.close(); if (log.isDebugEnabled()) { log.debug("read header: " + header); } setLogFile(logFile); diff --git a/btm/src/main/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFile.java b/btm/src/main/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFile.java new file mode 100644 index 00000000..a10c967d --- /dev/null +++ b/btm/src/main/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFile.java @@ -0,0 +1,104 @@ +package bitronix.tm.journal; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; + +public class InterruptibleLockedRandomAccessFile { + + private final File file; + private final String mode; + private RandomAccessFile openedFile; + private FileChannel fileChannel; + private FileLock fileLock; + private long currentPosition = 0; + private boolean closed; + + public InterruptibleLockedRandomAccessFile(final File file, final String mode) + throws IOException { + this.file = file; + this.mode = mode; + open(); + } + + private synchronized void open() throws IOException { + openedFile = new RandomAccessFile(file, mode); + fileChannel = openedFile.getChannel(); + + final boolean shared = false; + this.fileLock = fileChannel + .tryLock(0, TransactionLogHeader.TIMESTAMP_HEADER, shared); + if (this.fileLock == null) { + throw new IOException("File " + file.getAbsolutePath() + + " is locked. Is another instance already running?"); + } + } + + public synchronized final void close() throws IOException { + try { + if (!fileLock.isValid()) { + checkState(!fileChannel.isOpen(), "invalid/unhandled state"); + return; + } + fileLock.release(); + fileChannel.close(); + openedFile.close(); + } finally { + closed = true; + } + } + + public synchronized void position(final long newPosition) throws IOException { + checkNotClosed(); + reopenFileChannelIfClosed(); + + fileChannel.position(newPosition); + currentPosition = newPosition; + } + + private void checkNotClosed() { + checkState(!closed, "File has been closed"); + } + + private static void checkState(final boolean expression, final String errorMessage) { + if (!expression) { + throw new IllegalStateException(errorMessage); + } + } + + public synchronized void force(final boolean metaData) throws IOException { + checkNotClosed(); + reopenFileChannelIfClosed(); + + fileChannel.force(metaData); + } + + public synchronized int write(final ByteBuffer src, final long position) + throws IOException { + checkNotClosed(); + reopenFileChannelIfClosed(); + + return fileChannel.write(src, position); + } + + public synchronized void read(final ByteBuffer buffer) throws IOException { + checkNotClosed(); + reopenFileChannelIfClosed(); + + fileChannel.read(buffer); + currentPosition = fileChannel.position(); + } + + private void reopenFileChannelIfClosed() throws IOException { + if (!fileChannel.isOpen()) { + open(); + } + + if (fileChannel.position() != currentPosition) { + fileChannel.position(currentPosition); + } + } +} \ No newline at end of file diff --git a/btm/src/main/java/bitronix/tm/journal/TransactionLogAppender.java b/btm/src/main/java/bitronix/tm/journal/TransactionLogAppender.java index 55338cc7..7a96b130 100644 --- a/btm/src/main/java/bitronix/tm/journal/TransactionLogAppender.java +++ b/btm/src/main/java/bitronix/tm/journal/TransactionLogAppender.java @@ -15,17 +15,9 @@ */ package bitronix.tm.journal; -import bitronix.tm.utils.Uid; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.transaction.Status; import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.channels.FileLock; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -35,6 +27,13 @@ import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; +import javax.transaction.Status; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import bitronix.tm.utils.Uid; + /** * Used to write {@link TransactionLogRecord} objects to a log file. * @@ -53,9 +52,7 @@ public class TransactionLogAppender { public static final int END_RECORD = 0x786e7442; private final File file; - private final RandomAccessFile randomeAccessFile; - private final FileChannel fc; - private final FileLock lock; + private final InterruptibleLockedRandomAccessFile randomAccessFile; private final TransactionLogHeader header; private final long maxFileLength; private final AtomicInteger outstandingWrites; @@ -70,14 +67,10 @@ public class TransactionLogAppender { */ public TransactionLogAppender(File file, long maxFileLength) throws IOException { this.file = file; - this.randomeAccessFile = new RandomAccessFile(file, "rw"); - this.fc = randomeAccessFile.getChannel(); - this.header = new TransactionLogHeader(fc, maxFileLength); + this.randomAccessFile = new InterruptibleLockedRandomAccessFile(file, "rw"); + this.header = new TransactionLogHeader(randomAccessFile, maxFileLength); this.maxFileLength = maxFileLength; - this.lock = fc.tryLock(0, TransactionLogHeader.TIMESTAMP_HEADER, false); - if (this.lock == null) - throw new IOException("transaction log file " + file.getName() + " is locked. Is another instance already running?"); - + this.outstandingWrites = new AtomicInteger(); this.danglingRecords = new HashMap>(); @@ -144,7 +137,7 @@ protected void writeLog(TransactionLogRecord tlog) throws IOException { final long writePosition = tlog.getWritePosition(); while (buf.hasRemaining()) { - fc.write(buf, writePosition + buf.position()); + randomAccessFile.write(buf, writePosition + buf.position()); } trackOutstanding(status, gtrid, uniqueNames); @@ -273,18 +266,14 @@ public long getPosition() { return position; } - /** * Close the appender and the underlying file. * @throws IOException if an I/O error occurs. */ protected void close() throws IOException { header.setState(TransactionLogHeader.CLEAN_LOG_STATE); - fc.force(false); - if (lock != null) - lock.release(); - fc.close(); - randomeAccessFile.close(); + randomAccessFile.force(false); + randomAccessFile.close(); } /** @@ -304,7 +293,7 @@ protected TransactionLogCursor getCursor() throws IOException { */ protected void force() throws IOException { if (log.isDebugEnabled()) { log.debug("forcing log writing"); } - fc.force(false); + randomAccessFile.force(false); if (log.isDebugEnabled()) { log.debug("done forcing log"); } } diff --git a/btm/src/main/java/bitronix/tm/journal/TransactionLogHeader.java b/btm/src/main/java/bitronix/tm/journal/TransactionLogHeader.java index 748ab635..9f681aac 100644 --- a/btm/src/main/java/bitronix/tm/journal/TransactionLogHeader.java +++ b/btm/src/main/java/bitronix/tm/journal/TransactionLogHeader.java @@ -69,7 +69,7 @@ public class TransactionLogHeader { */ public final static byte UNCLEAN_LOG_STATE = -1; - private final FileChannel fc; + private final InterruptibleLockedRandomAccessFile file; private final long maxFileLength; private volatile int formatId; @@ -79,25 +79,25 @@ public class TransactionLogHeader { /** * TransactionLogHeader are used to control headers of the specified RandomAccessFile. - * @param fc the file channel to read from. + * @param randomAccessFile the file to read from. * @param maxFileLength the max file length. * @throws IOException if an I/O error occurs. */ - public TransactionLogHeader(FileChannel fc, long maxFileLength) throws IOException { - this.fc = fc; + public TransactionLogHeader(InterruptibleLockedRandomAccessFile randomAccessFile, long maxFileLength) throws IOException { + this.file = randomAccessFile; this.maxFileLength = maxFileLength; - fc.position(FORMAT_ID_HEADER); + randomAccessFile.position(FORMAT_ID_HEADER); ByteBuffer buf = ByteBuffer.allocate(4 + 8 + 1 + 8); while (buf.hasRemaining()) { - this.fc.read(buf); + this.file.read(buf); } buf.flip(); formatId = buf.getInt(); timestamp = buf.getLong(); state = buf.get(); position = buf.getLong(); - fc.position(position); + randomAccessFile.position(position); if (log.isDebugEnabled()) { log.debug("read header " + this); } } @@ -149,7 +149,7 @@ public void setFormatId(int formatId) throws IOException { buf.putInt(formatId); buf.flip(); while (buf.hasRemaining()) { - fc.write(buf, FORMAT_ID_HEADER + buf.position()); + file.write(buf, FORMAT_ID_HEADER + buf.position()); } this.formatId = formatId; } @@ -165,7 +165,7 @@ public void setTimestamp(long timestamp) throws IOException { buf.putLong(timestamp); buf.flip(); while (buf.hasRemaining()) { - fc.write(buf, TIMESTAMP_HEADER + buf.position()); + file.write(buf, TIMESTAMP_HEADER + buf.position()); } this.timestamp = timestamp; } @@ -181,7 +181,7 @@ public void setState(byte state) throws IOException { buf.put(state); buf.flip(); while (buf.hasRemaining()) { - fc.write(buf, STATE_HEADER + buf.position()); + file.write(buf, STATE_HEADER + buf.position()); } this.state = state; } @@ -202,11 +202,11 @@ public void setPosition(long position) throws IOException { buf.putLong(position); buf.flip(); while (buf.hasRemaining()) { - fc.write(buf, CURRENT_POSITION_HEADER + buf.position()); + file.write(buf, CURRENT_POSITION_HEADER + buf.position()); } this.position = position; - fc.position(position); + file.position(position); } /** diff --git a/btm/src/test/java/bitronix/tm/journal/ByteBufferUtil.java b/btm/src/test/java/bitronix/tm/journal/ByteBufferUtil.java new file mode 100644 index 00000000..92f02eda --- /dev/null +++ b/btm/src/test/java/bitronix/tm/journal/ByteBufferUtil.java @@ -0,0 +1,18 @@ +package bitronix.tm.journal; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; + +public final class ByteBufferUtil { + + private ByteBufferUtil() { + } + + public static ByteBuffer createByteBuffer(final String input) + throws UnsupportedEncodingException { + final byte[] inputArray = input.getBytes("UTF-8"); + final ByteBuffer byteBuffer = ByteBuffer.wrap(inputArray); + return byteBuffer; + } + +} diff --git a/btm/src/test/java/bitronix/tm/journal/DiskJournalInterruptTest.java b/btm/src/test/java/bitronix/tm/journal/DiskJournalInterruptTest.java new file mode 100644 index 00000000..d04b99a7 --- /dev/null +++ b/btm/src/test/java/bitronix/tm/journal/DiskJournalInterruptTest.java @@ -0,0 +1,73 @@ +package bitronix.tm.journal; + +import java.io.IOException; +import java.nio.channels.ClosedChannelException; +import java.util.HashSet; +import java.util.Set; + +import javax.transaction.Status; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import bitronix.tm.utils.UidGenerator; + +/** + * Source: http://bitronix-transaction-manager.10986.n7.nabble.com/Fix-for-BTM-138-td1701.html + * + * @author Kazuya Uno + */ +public class DiskJournalInterruptTest { + + private DiskJournal diskJournal = new DiskJournal(); + private Set names = new HashSet(); + + @Before + public void setUp() throws IOException { + diskJournal.open(); + } + + @After + public void tearDown() throws IOException { + diskJournal.close(); + } + + @Test + public void testShouldInterruptOnAThreadDontCauseOtherThreadToFail() + throws Exception { + // given: a thread writing logs + Thread thread = new Thread() { + + @Override + public void run() { + try { + writeLog(); + } catch (IOException e) { + // normal + } + }; + }; + thread.start(); + + // when thread is interrupted + thread.interrupt(); + + // this detect closed channel and reopen logs + try { + writeLog(); + } catch (ClosedChannelException cce) { + // this is expected. + } + + // then writing logs should work + writeLog(); + + } + + private void writeLog() throws IOException { + diskJournal.log(Status.STATUS_COMMITTED, UidGenerator.generateUid(), + names); + } + +} \ No newline at end of file diff --git a/btm/src/test/java/bitronix/tm/journal/InterruptService.java b/btm/src/test/java/bitronix/tm/journal/InterruptService.java new file mode 100644 index 00000000..0de8ea52 --- /dev/null +++ b/btm/src/test/java/bitronix/tm/journal/InterruptService.java @@ -0,0 +1,53 @@ +package bitronix.tm.journal; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class InterruptService { + + private final InterruptableThreadList interruptableThreadList; + private final AtomicLong successfulInterrupts = new AtomicLong(); + private ExecutorService executorService; + private final AtomicBoolean run = new AtomicBoolean(true); + + public InterruptService(final InterruptableThreadList interruptableThreadList) { + if (interruptableThreadList == null) { + throw new NullPointerException("threadList cannot be null"); + } + this.interruptableThreadList = interruptableThreadList; + } + + public void start() { + executorService = Executors.newSingleThreadExecutor(); + final Runnable interrupter = new Runnable() { + @Override + public void run() { + while (run.get()) { + final boolean successfulInterrupt = interruptableThreadList + .interruptRandomThread(); + if (successfulInterrupt) { + successfulInterrupts.incrementAndGet(); + } + } + } + }; + executorService.submit(interrupter); + } + + public void stop() throws InterruptedException { + run.set(false); + executorService.shutdown(); + final boolean terminated = executorService.awaitTermination(2, + TimeUnit.SECONDS); + if (!terminated) { + throw new IllegalStateException("termination"); + } + } + + public long getSuccessfulInterrupts() { + return successfulInterrupts.get(); + } +} diff --git a/btm/src/test/java/bitronix/tm/journal/InterruptableThreadList.java b/btm/src/test/java/bitronix/tm/journal/InterruptableThreadList.java new file mode 100644 index 00000000..3066942b --- /dev/null +++ b/btm/src/test/java/bitronix/tm/journal/InterruptableThreadList.java @@ -0,0 +1,40 @@ +package bitronix.tm.journal; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +public class InterruptableThreadList { + + private final List threads = new ArrayList(); + + public InterruptableThreadList() { + } + + public synchronized void addCurrentThread() { + final Thread currentThread = Thread.currentThread(); + threads.add(currentThread); + Thread.yield(); + } + + public synchronized void removeCurrentThread() { + final Thread currentThread = Thread.currentThread(); + threads.remove(currentThread); + } + + /** + * + * @return true on successful interruption + */ + public synchronized boolean interruptRandomThread() { + if (threads.isEmpty()) { + return false; + } + final Random random = new Random(); + final int threadIndex = random.nextInt(threads.size()); + + final Thread thread = threads.get(threadIndex); + thread.interrupt(); + return true; + } +} diff --git a/btm/src/test/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFileStressTest.java b/btm/src/test/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFileStressTest.java new file mode 100644 index 00000000..4bf6c746 --- /dev/null +++ b/btm/src/test/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFileStressTest.java @@ -0,0 +1,149 @@ +package bitronix.tm.journal; + +import static bitronix.tm.journal.ByteBufferUtil.createByteBuffer; +import static org.fest.assertions.api.Assertions.assertThat; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class InterruptibleLockedRandomAccessFileStressTest { + + @Rule + public final TemporaryFolder folder = new TemporaryFolder(); + + private File inputFile; + + private final InterruptableThreadList threadList = new InterruptableThreadList(); + + private final InterruptService interruptService = new InterruptService( + threadList); + + @Before + public void setUp() throws Exception { + inputFile = folder.newFile("bitronix-stresstest.log"); + interruptService.start(); + } + + @After + public void tearDown() throws Exception { + interruptService.stop(); + } + + @Test + public void stressTestWriteInterrupts() throws Exception { + final int recordLength = 15; + final int taskNumber = 10000; + initializeFileContent(recordLength, taskNumber); + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + + final ExecutorService executorService = Executors.newFixedThreadPool(4); + + final AtomicLong successfulWrites = new AtomicLong(); + final AtomicLong writeErrors = new AtomicLong(); + for (int i = 0; i < taskNumber; i++) { + final int taskId = i; + final String data = createRecord(taskId); + assertThat(data.length()).isLessThanOrEqualTo(recordLength); + executorService.submit(new Runnable() { + + @Override + public void run() { + threadList.addCurrentThread(); + try { + final int position = taskId * recordLength; + file.write(createByteBuffer(data), position); + successfulWrites.incrementAndGet(); + } catch (final Exception expected) { + writeErrors.incrementAndGet(); + } finally { + threadList.removeCurrentThread(); + } + } + }); + } + + shutdownExecutor(executorService, 30, TimeUnit.SECONDS); + + file.close(); + + assertThat(successfulWrites.get() + writeErrors.get()).isEqualTo( + taskNumber); + + final long writtenRecords = countWrittenRecords(taskNumber); + final long missingRecords = countMissingRecords(taskNumber); + assertThat(writtenRecords + missingRecords).isEqualTo(taskNumber); + + // System.out.println("written: " + writtenRecords); + // System.out.println("missing: " + missingRecords); + // System.out.println("successful writes: " + successfulWrites); + // System.out.println("write errors: " + writeErrors); + // System.out.println("interrupts: " + // + interruptService.getSuccessfulInterrupts()); + + assertThat(writtenRecords).isGreaterThanOrEqualTo( + successfulWrites.get()); + assertThat(missingRecords).isLessThanOrEqualTo(writeErrors.get()); + assertThat(interruptService.getSuccessfulInterrupts()) + .isGreaterThanOrEqualTo(missingRecords); + } + + private void initializeFileContent(final int recordLength, + final int taskNumber) throws Exception { + final String initialFileConent = StringUtils.repeat(".", recordLength + * taskNumber); + FileUtils.writeStringToFile(inputFile, initialFileConent); + } + + private String createRecord(final int recordId) { + return String.format("data%5dX", recordId); + } + + private long countMissingRecords(final int taskNumber) throws Exception { + final String writtenContent = FileUtils.readFileToString(inputFile, + "UTF-8"); + long missingRecords = 0; + for (int taskId = 0; taskId < taskNumber; taskId++) { + final String data = createRecord(taskId); + if (!writtenContent.contains(data)) { + missingRecords++; + } + } + return missingRecords; + } + + private long countWrittenRecords(final int taskNumber) throws Exception { + final String writtenContent = FileUtils.readFileToString(inputFile, + "UTF-8"); + long writtenRecords = 0; + for (int taskId = 0; taskId < taskNumber; taskId++) { + final String data = createRecord(taskId); + if (writtenContent.contains(data)) { + writtenRecords++; + } + } + return writtenRecords; + } + + private void shutdownExecutor(final ExecutorService executorService, + final long timeout, final TimeUnit timeoutUnit) + throws InterruptedException { + executorService.shutdown(); + final boolean terminated = executorService.awaitTermination(timeout, + timeoutUnit); + assertTrue("termination", terminated); + } +} diff --git a/btm/src/test/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFileTest.java b/btm/src/test/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFileTest.java new file mode 100644 index 00000000..e5f8a25f --- /dev/null +++ b/btm/src/test/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFileTest.java @@ -0,0 +1,387 @@ +package bitronix.tm.journal; + +import static bitronix.tm.journal.ByteBufferUtil.createByteBuffer; +import static org.fest.assertions.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.channels.OverlappingFileLockException; + +import org.apache.commons.io.FileUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class InterruptibleLockedRandomAccessFileTest { + + @Rule + public final TemporaryFolder folder = new TemporaryFolder(); + + private File inputFile; + + @Before + public void setUp() throws Exception { + inputFile = folder.newFile("btmlog-test.log"); + } + + @Test + public void testOpenClose() throws Exception { + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + file.close(); + } + + @Test + public void testLockedOpen() throws Exception { + final RandomAccessFile firstFile = new RandomAccessFile(inputFile, "rw"); + final FileChannel fileChannel = firstFile.getChannel(); + final FileLock lock = fileChannel.tryLock(); + assertNotNull("null lock", lock); + + try { + new InterruptibleLockedRandomAccessFile(inputFile, "rw"); + fail("should not open a locked file"); + } catch (OverlappingFileLockException expected) { + } finally { + lock.release(); + fileChannel.close(); + firstFile.close(); + } + } + + @Test + public void testReadAfterClose() throws Exception { + final String data = "testdata"; + FileUtils.writeStringToFile(inputFile, data, "UTF-8"); + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + file.close(); + + try { + readFile(file, 1); + fail("should not read after close"); + } catch (final IllegalStateException expected) { + } + } + + @Test + public void testWriteAfterClose() throws Exception { + final String data = "testdata"; + FileUtils.writeStringToFile(inputFile, data, "UTF-8"); + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + file.close(); + + final ByteBuffer buffer = createByteBuffer("testdata"); + final long position = 0L; + try { + file.write(buffer, position); + fail("should not write after close"); + } catch (final IllegalStateException expected) { + } + } + + @Test + public void testForceAfterClose() throws Exception { + final String data = "testdata"; + FileUtils.writeStringToFile(inputFile, data, "UTF-8"); + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + file.close(); + + try { + final boolean metaData = true; + file.force(metaData); + fail("should not force after close"); + } catch (final IllegalStateException expected) { + } + } + + @Test + public void testPositionAfterClose() throws Exception { + final String data = "testdata"; + FileUtils.writeStringToFile(inputFile, data, "UTF-8"); + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + file.close(); + + try { + file.position(1L); + fail("should not position after close"); + } catch (final IllegalStateException expected) { + } + } + + @Test + public void testRead() throws Exception { + final String data = "testdata"; + FileUtils.writeStringToFile(inputFile, data, "UTF-8"); + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + verifyRead(data, file); + file.close(); + } + + @Test + public void testReadTwice() throws Exception { + final String data = "testdata"; + FileUtils.writeStringToFile(inputFile, data, "UTF-8"); + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + + verifyRead(data, file); + file.position(0L); + verifyRead(data, file); + + file.close(); + } + + @Test + public void testReadAfterInterrupt() throws Exception { + final String data = "testdataTESTDATA"; + FileUtils.writeStringToFile(inputFile, data, "UTF-8"); + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + + verifyRead(data, file); + file.position(0L); + interruptCurrentThread(); + try { + verifyRead(data, file); + fail("interrupt should close the FileChannel"); + } catch (final ClosedByInterruptException expected) { + } + + clearInterruptedFlag(); + + verifyRead(data, file); + file.close(); + } + + private void verifyRead(final String expectedData, + final InterruptibleLockedRandomAccessFile file) throws Exception { + final int bytesToRead = expectedData.getBytes("UTF-8").length; + final String readData = readFile(file, bytesToRead); + + assertThat(readData).isEqualTo(expectedData); + } + + private String readFile(final InterruptibleLockedRandomAccessFile file, + final int bytesToRead) throws IOException { + final ByteBuffer inputBuffer = ByteBuffer.allocate(bytesToRead); + file.read(inputBuffer); + return toString(inputBuffer); + } + + private String toString(ByteBuffer buffer) + throws UnsupportedEncodingException { + return new String(buffer.array(), "UTF-8"); + } + + @Test + public void testWrite() throws Exception { + final String data = "testdata"; + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + + final ByteBuffer outputBuffer = createByteBuffer(data); + + final long position = 0L; + file.write(outputBuffer, position); + + file.close(); + + verifyFileContent(inputFile, data); + } + + @Test + public void testWriteAndForce() throws Exception { + final String data = "testdata"; + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + + final ByteBuffer outputBuffer = createByteBuffer(data); + + final long position = 0L; + file.write(outputBuffer, position); + file.force(true); + + file.close(); + + verifyFileContent(inputFile, data); + } + + private void verifyFileContent(final File file, final String expectedData) + throws IOException { + final String fileContent = FileUtils.readFileToString(inputFile, + "UTF-8"); + assertEquals(expectedData, fileContent); + } + + @Test + public void testTwoWrites() throws Exception { + final String dataOne = "testdata"; + final String dataTwo = "TESTDATA"; + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + + final ByteBuffer dataOneBuffer = createByteBuffer(dataOne); + file.write(dataOneBuffer, 0L); + file.write(createByteBuffer(dataTwo), dataOneBuffer.capacity()); + + file.close(); + + verifyFileContent(inputFile, dataOne + dataTwo); + } + + @Test + public void testWriteAfterInterrupt() throws Exception { + final String dataOne = "testdata"; + final String dataTwo = "TESTDATA"; + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + file.write(createByteBuffer(dataOne), 0L); + + interruptCurrentThread(); + + try { + file.write(createByteBuffer(dataTwo), dataOne.length()); + } catch (final ClosedByInterruptException expected) { + } + + clearInterruptedFlag(); + + final String dataThree = "__third__"; + file.write(createByteBuffer(dataThree), dataOne.length()); + + file.close(); + + verifyFileContent(inputFile, dataOne + dataThree); + } + + @Test + public void testForceAfterInterrupt() throws Exception { + final String dataOne = "testdata"; + final String dataTwo = "TESTDATA"; + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + file.write(createByteBuffer(dataOne), 0L); + + interruptCurrentThread(); + + try { + file.write(createByteBuffer(dataTwo), dataOne.length()); + } catch (final ClosedByInterruptException expected) { + } + + clearInterruptedFlag(); + + file.force(true); + + file.close(); + + verifyFileContent(inputFile, dataOne); + } + + @Test + public void testFilePositionSetOnWriteInterrupt() throws Exception { + final String dataOne = "testdata"; + FileUtils.writeStringToFile(inputFile, dataOne, "UTF-8"); + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + + final ByteBuffer outputBuffer = createByteBuffer(dataOne); + + long position = 2; + file.position(position); + + interruptCurrentThread(); + try { + file.write(outputBuffer, 1L); + fail("writing a FileChannel should fail on an interrupted thread"); + } catch (ClosedChannelException expected) { + } + clearInterruptedFlag(); + + final String readData = readFile(file, 5); + file.close(); + + assertEquals("read from file", "stdat", readData); + } + + @Test + public void testFilePositionSetOnReadInterrupt() throws Exception { + final String dataOne = "testdata"; + FileUtils.writeStringToFile(inputFile, dataOne, "UTF-8"); + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + + readFile(file, 4); + + interruptCurrentThread(); + try { + readFile(file, 2); + fail("reading a FileChannel should fail on an interrupted thread"); + } catch (ClosedChannelException expected) { + } + clearInterruptedFlag(); + + final String readData = readFile(file, 4); + file.close(); + + assertEquals("read from file", "data", readData); + } + + @Test + public void testCloseAfterInterrupt() throws Exception { + final String data = "testdata"; + FileUtils.writeStringToFile(inputFile, data, "UTF-8"); + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + + interruptCurrentThread(); + try { + readFile(file, 1); + fail("should not read after interrupt"); + } catch (final ClosedChannelException expected) { + } + clearInterruptedFlag(); + + file.close(); + } + + private void interruptCurrentThread() { + Thread.currentThread().interrupt(); + } + + private void clearInterruptedFlag() { + Thread.interrupted(); + } + +}