diff --git a/broker/src/test/java/io/moquette/broker/unsafequeues/QueueTest.java b/broker/src/test/java/io/moquette/broker/unsafequeues/QueueTest.java index fd9eeeb3b..98b52fd50 100644 --- a/broker/src/test/java/io/moquette/broker/unsafequeues/QueueTest.java +++ b/broker/src/test/java/io/moquette/broker/unsafequeues/QueueTest.java @@ -562,6 +562,57 @@ public void reopenQueueWithFragmentation() throws QueueException, IOException { assertEquals("(0, 0), (0, 4194304)", segmentRefs); } + @Test + public void testPageFileReuse() throws QueueException, IOException { + // Use smaller segmants and pages for quicker testing. + final int kb = 1024; + // Pages with only 4 segments, for quicker testing + final int queuePageSize = 16 * kb; + final int queueSegmentSize = 4 * kb; + // write 2 segments, consume one segment, next segment allocated should be one just freed.0 + QueuePool queuePool = QueuePool.loadQueues(tempQueueFolder, queuePageSize, queueSegmentSize); + Queue queue1 = queuePool.getOrCreate("test_external_fragmentation"); + + byte[] bytes = new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n'}; + + // fill seven segments (almost the full 8 we have) + // This should force two files to open. + for (int i = 0; i < 7; i++) { + queue1.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[i * 2]))); + queue1.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[i * 2 + 1]))); + } + + // Create a new queue + final Queue queue2 = queuePool.getOrCreate("test_external_fragmentation2"); + + // Write one segment (filling the second page) + queue2.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[0]))); + queue2.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[1]))); + + // Release the first queue + queue1.force(); + queue1.close(); + + // Fill another six segments. + // This should not open more files. + for (int i = 1; i < 7; i++) { + queue2.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[i * 2]))); + queue2.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[i * 2 + 1]))); + } + + queue2.force(); + queuePool.close(); + + // Verify + // checkpoint contains che correct order, (0,0), (0, 4194304) + Properties checkpointProps3 = loadCheckpointFile(tempQueueFolder); + + // We should now have segments 6, 5, 4, 3, 2, 1, 8 + // or, 2.2, 2.1, 1.4, 1.3, 1.2, 1.1, 2.4 + final String segmentRefs = checkpointProps3.getProperty("queues.0.segments"); + assertEquals("(1, 4096), (1, 0), (0, 12288), (0, 8192), (0, 4096), (0, 0), (1, 12288)", segmentRefs); + } + private Properties loadCheckpointFile(Path dir) throws IOException { final Path checkpointPath = dir.resolve("checkpoint.properties"); final FileReader fileReader = new FileReader(checkpointPath.toFile());