diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java index 6d5c1d9e58a7b..582e1637c448a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java @@ -201,15 +201,13 @@ private void updateReadableOffsetAndSize( long endPartitionOffset = indexEntryBuf.getLong(); long endPartitionSize = indexEntryBuf.getLong(); - if (startPartitionOffset != endPartitionOffset) { + if (startPartitionOffset != endPartitionOffset || startPartitionSize != endPartitionSize) { offsetAndSizesToRead.add( Tuple2.of( startPartitionOffset, endPartitionOffset + endPartitionSize - startPartitionOffset)); } else if (startPartitionSize != 0) { - checkArgument( - startPartitionSize == endPartitionSize, - "Offsets need to be either contiguous or all the same."); + // this branch is for broadcast subpartitions for (int i = startSubpartition; i <= endSubpartition; i++) { offsetAndSizesToRead.add(Tuple2.of(startPartitionOffset, startPartitionSize)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java index ba3582aa7f9d9..bd46b18e8cb6e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java @@ -35,6 +35,7 @@ import java.util.List; import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; /** @@ -105,14 +106,24 @@ public class PartitionedFileWriter implements AutoCloseable { /** Whether this file writer is closed or not. */ private boolean isClosed; - public PartitionedFileWriter(int numSubpartitions, int maxIndexBufferSize, String basePath) + private final int[] writeOrder; + + /** Total number of bytes written before the current region. */ + private long preRegionTotalBytesWritten; + + public PartitionedFileWriter( + int numSubpartitions, int maxIndexBufferSize, String basePath, int[] writeOrder) throws IOException { - this(numSubpartitions, MIN_INDEX_BUFFER_SIZE, maxIndexBufferSize, basePath); + this(numSubpartitions, MIN_INDEX_BUFFER_SIZE, maxIndexBufferSize, basePath, writeOrder); } @VisibleForTesting PartitionedFileWriter( - int numSubpartitions, int minIndexBufferSize, int maxIndexBufferSize, String basePath) + int numSubpartitions, + int minIndexBufferSize, + int maxIndexBufferSize, + String basePath, + int[] writeOrder) throws IOException { checkArgument(numSubpartitions > 0, "Illegal number of subpartitions."); checkArgument(maxIndexBufferSize > 0, "Illegal maximum index cache size."); @@ -124,6 +135,7 @@ public PartitionedFileWriter(int numSubpartitions, int maxIndexBufferSize, Strin this.subpartitionBytes = new long[numSubpartitions]; this.dataFilePath = new File(basePath + PartitionedFile.DATA_FILE_SUFFIX).toPath(); this.indexFilePath = new File(basePath + PartitionedFile.INDEX_FILE_SUFFIX).toPath(); + this.writeOrder = checkNotNull(writeOrder); this.indexBuffer = ByteBuffer.allocate(minIndexBufferSize); BufferReaderWriterUtil.configureByteBuffer(indexBuffer); @@ -195,6 +207,7 @@ private boolean extendIndexBufferIfPossible() { private void writeRegionIndex() throws IOException { if (Arrays.stream(subpartitionBytes).sum() > 0) { + updateEmptySubpartitionOffsets(); for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) { writeIndexEntry(subpartitionOffsets[subpartition], subpartitionBytes[subpartition]); } @@ -202,6 +215,37 @@ private void writeRegionIndex() throws IOException { currentSubpartition = -1; ++numRegions; Arrays.fill(subpartitionBytes, 0); + preRegionTotalBytesWritten = totalBytesWritten; + } + } + + /** + * Updates the offsets of subpartitions, ensuring that they are contiguous. + * + *

This method is necessary because empty subpartitions do not trigger an update to their + * offsets during the usual process. As such, we need to ensure here that every subpartition, + * including empty ones, has its offset updated to maintain continuity. This process involves + * adjusting each subpartition's offset based on the sum of previous subpartitions' bytes, + * ensuring seamless data handling and storage alignment. + */ + private void updateEmptySubpartitionOffsets() { + for (int i = 0; i < writeOrder.length; i++) { + int currentSubPartition = writeOrder[i]; + + if (subpartitionBytes[currentSubPartition] == 0) { + if (i == 0) { + // For the first subpartition, set its offset to the current pre-region total + // bytes written if it's empty. + subpartitionOffsets[currentSubPartition] = preRegionTotalBytesWritten; + } else { + // For non-first subpartitions, update the offset of an empty subpartition to be + // contiguous with the previous subpartition. + int preSubPartition = writeOrder[i - 1]; + subpartitionOffsets[currentSubPartition] = + subpartitionOffsets[preSubPartition] + + subpartitionBytes[preSubPartition]; + } + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java index 6255aab17b705..99b6856ed0476 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java @@ -180,7 +180,8 @@ protected void setupInternal() throws IOException { try { // allocate at most 4M heap memory for caching of index entries fileWriter = - new PartitionedFileWriter(numSubpartitions, 4194304, resultFileBasePath); + new PartitionedFileWriter( + numSubpartitions, 4194304, resultFileBasePath, subpartitionOrder); } catch (Throwable throwable) { throw new IOException("Failed to create file writer.", throwable); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java index 994e7295e19db..20547adc2c77c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java @@ -164,8 +164,13 @@ public static PartitionedFile createPartitionedFile( } } + int[] writeOrder = new int[numSubpartitions]; + for (int i = 0; i < numSubpartitions; i++) { + writeOrder[i] = i; + } + PartitionedFileWriter fileWriter = - new PartitionedFileWriter(numSubpartitions, 1024, basePath); + new PartitionedFileWriter(numSubpartitions, 1024, basePath, writeOrder); fileWriter.startNewRegion(false); fileWriter.writeBuffers(buffers); return fileWriter.finish(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java index 7177ccc122473..587b27727d669 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java @@ -85,7 +85,7 @@ void testWriteAndReadPartitionedFile() throws Exception { numRegions, buffersWritten, regionStat, - createPartitionedFileWriter(numSubpartitions), + createPartitionedFileWriter(numSubpartitions, writeOrder), subpartitionIndex -> subpartitionIndex, random.nextBoolean(), writeOrder); @@ -151,7 +151,7 @@ void testComputeReadablePosition(boolean randomSubpartitionOrder, boolean broadc numRegions, buffersWritten, regionStat, - createPartitionedFileWriter(numSubpartitions), + createPartitionedFileWriter(numSubpartitions, writeOrder), subpartitionIndex -> subpartitionIndex, broadcastRegion, writeOrder); @@ -286,7 +286,7 @@ void testWriteAndReadPartitionedFileForSubpartitionRange() throws Exception { numRegions, buffersWritten, regionStat, - createPartitionedFileWriter(numSubpartitions), + createPartitionedFileWriter(numSubpartitions, writeOrder), subpartitionIndex -> subpartitionIndex / 2, false, writeOrder); @@ -419,7 +419,10 @@ private void addReadBuffer(Buffer buffer, List buffersRead) { } private static Queue allocateBuffers(int bufferSize) { - int numBuffers = 2; + return allocateBuffers(bufferSize, 2); + } + + private static Queue allocateBuffers(int bufferSize, int numBuffers) { Queue readBuffers = new LinkedList<>(); while (numBuffers-- > 0) { readBuffers.add(MemorySegmentFactory.allocateUnpooledSegment(bufferSize)); @@ -441,7 +444,8 @@ void testWriteAndReadWithEmptySubpartition() throws Exception { buffersRead[subpartition] = new ArrayList<>(); } - PartitionedFileWriter fileWriter = createPartitionedFileWriter(numSubpartitions); + PartitionedFileWriter fileWriter = + createPartitionedFileWriter(numSubpartitions, new int[] {0, 1, 2, 3, 4}); for (int region = 0; region < numRegions; ++region) { fileWriter.startNewRegion(false); for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) { @@ -481,6 +485,68 @@ void testWriteAndReadWithEmptySubpartition() throws Exception { IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel); } + @Test + void testWriteAndReadWithEmptySubpartitionForMultipleSubpartitions() throws Exception { + int numRegions = 10; + int numSubpartitions = 5; + int bufferSize = 1024; + Random random = new Random(); + + Queue[] subpartitionBuffers = new ArrayDeque[numRegions]; + List[] buffersRead = new List[numRegions]; + for (int region = 0; region < numRegions; region++) { + subpartitionBuffers[region] = new ArrayDeque<>(); + buffersRead[region] = new ArrayList<>(); + } + + int[] writeOrder = new int[] {0, 1, 2, 3, 4}; + PartitionedFileWriter fileWriter = + createPartitionedFileWriter(numSubpartitions, writeOrder); + for (int region = 0; region < numRegions; ++region) { + fileWriter.startNewRegion(false); + for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) { + if (random.nextBoolean()) { + Buffer buffer = createBuffer(random, bufferSize); + subpartitionBuffers[region].add(buffer); + fileWriter.writeBuffers(getBufferWithSubpartitions(buffer, subpartition)); + } + } + } + PartitionedFile partitionedFile = fileWriter.finish(); + + FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath()); + FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath()); + PartitionedFileReader fileReader = + new PartitionedFileReader( + partitionedFile, + new ResultSubpartitionIndexSet(0, numSubpartitions - 1), + dataFileChannel, + indexFileChannel, + BufferReaderWriterUtil.allocatedHeaderBuffer(), + createAndConfigIndexEntryBuffer(), + writeOrder[0]); + int regionIndex = 0; + while (fileReader.hasRemaining()) { + if (subpartitionBuffers[regionIndex].isEmpty()) { + regionIndex++; + } else { + int finalRegionIndex = regionIndex; + fileReader.readCurrentRegion( + allocateBuffers(bufferSize, 10), + FreeingBufferRecycler.INSTANCE, + buffer -> addReadBuffer(buffer, buffersRead[finalRegionIndex])); + for (Buffer buffer : buffersRead[finalRegionIndex]) { + assertBufferEquals( + checkNotNull(subpartitionBuffers[finalRegionIndex].poll()), buffer); + } + + assertThat(subpartitionBuffers[finalRegionIndex]).isEmpty(); + regionIndex++; + } + } + IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel); + } + private void assertBufferEquals(Buffer expected, Buffer actual) { assertThat(expected.getDataType()).isEqualTo(actual.getDataType()); assertThat(expected.getNioBufferReadable()).isEqualTo(actual.getNioBufferReadable()); @@ -498,7 +564,8 @@ private Buffer createBuffer(Random random, int bufferSize) { @Test void testNotWriteDataOfTheSameSubpartitionTogether() throws Exception { - PartitionedFileWriter partitionedFileWriter = createPartitionedFileWriter(2); + PartitionedFileWriter partitionedFileWriter = + createPartitionedFileWriter(2, new int[] {1, 0}); try { MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024); @@ -609,7 +676,8 @@ void testMultipleThreadGetIndexEntry() throws Exception { createPartitionedFileWriter( numSubpartitions, PartitionedFile.INDEX_ENTRY_SIZE * numSubpartitions, - PartitionedFile.INDEX_ENTRY_SIZE * numSubpartitions), + PartitionedFile.INDEX_ENTRY_SIZE * numSubpartitions, + writeOrder), subpartitionIndex -> subpartitionIndex, random.nextBoolean(), writeOrder); @@ -659,29 +727,34 @@ private List getBufferWithSubpartitions( } private PartitionedFile createEmptyPartitionedFile() throws IOException { - PartitionedFileWriter partitionedFileWriter = createPartitionedFileWriter(2); + PartitionedFileWriter partitionedFileWriter = createPartitionedFileWriter(2, new int[0]); return partitionedFileWriter.finish(); } - private PartitionedFileWriter createPartitionedFileWriter(int numSubpartitions) - throws IOException { - return createPartitionedFileWriter(numSubpartitions, 640); + private PartitionedFileWriter createPartitionedFileWriter( + int numSubpartitions, int[] writeOrder) throws IOException { + return createPartitionedFileWriter(numSubpartitions, 640, writeOrder); } private PartitionedFileWriter createPartitionedFileWriter( - int numSubpartitions, int minIndexBufferSize, int maxIndexBufferSize) + int numSubpartitions, int minIndexBufferSize, int maxIndexBufferSize, int[] writeOrder) throws IOException { return new PartitionedFileWriter( - numSubpartitions, minIndexBufferSize, maxIndexBufferSize, tempPath.toString()); + numSubpartitions, + minIndexBufferSize, + maxIndexBufferSize, + tempPath.toString(), + writeOrder); } private PartitionedFileWriter createPartitionedFileWriter( - int numSubpartitions, int maxIndexBufferSize) throws IOException { - return new PartitionedFileWriter(numSubpartitions, maxIndexBufferSize, tempPath.toString()); + int numSubpartitions, int maxIndexBufferSize, int[] writeOrder) throws IOException { + return new PartitionedFileWriter( + numSubpartitions, maxIndexBufferSize, tempPath.toString(), writeOrder); } private PartitionedFileWriter createAndFinishPartitionedFileWriter() throws IOException { - PartitionedFileWriter partitionedFileWriter = createPartitionedFileWriter(1); + PartitionedFileWriter partitionedFileWriter = createPartitionedFileWriter(1, new int[0]); partitionedFileWriter.finish(); return partitionedFileWriter; }