Skip to content

Commit

Permalink
[FLINK-36959][runtime] Fix the logic for computing readable buffer of…
Browse files Browse the repository at this point in the history
…fsets and sizes when including empty buffers.
  • Loading branch information
JunRuiLee committed Dec 25, 2024
1 parent 56ba88b commit e858969
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,13 @@ private void updateReadableOffsetAndSize(
long endPartitionOffset = indexEntryBuf.getLong();
long endPartitionSize = indexEntryBuf.getLong();

if (startPartitionOffset != endPartitionOffset) {
if (startPartitionOffset != endPartitionOffset || startPartitionSize != endPartitionSize) {
offsetAndSizesToRead.add(
Tuple2.of(
startPartitionOffset,
endPartitionOffset + endPartitionSize - startPartitionOffset));
} else if (startPartitionSize != 0) {
checkArgument(
startPartitionSize == endPartitionSize,
"Offsets need to be either contiguous or all the same.");
// this branch is for broadcast subpartitions
for (int i = startSubpartition; i <= endSubpartition; i++) {
offsetAndSizesToRead.add(Tuple2.of(startPartitionOffset, startPartitionSize));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.List;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
Expand Down Expand Up @@ -105,14 +106,24 @@ public class PartitionedFileWriter implements AutoCloseable {
/** Whether this file writer is closed or not. */
private boolean isClosed;

public PartitionedFileWriter(int numSubpartitions, int maxIndexBufferSize, String basePath)
private final int[] writeOrder;

/** Total number of bytes written before the current region. */
private long preRegionTotalBytesWritten;

public PartitionedFileWriter(
int numSubpartitions, int maxIndexBufferSize, String basePath, int[] writeOrder)
throws IOException {
this(numSubpartitions, MIN_INDEX_BUFFER_SIZE, maxIndexBufferSize, basePath);
this(numSubpartitions, MIN_INDEX_BUFFER_SIZE, maxIndexBufferSize, basePath, writeOrder);
}

@VisibleForTesting
PartitionedFileWriter(
int numSubpartitions, int minIndexBufferSize, int maxIndexBufferSize, String basePath)
int numSubpartitions,
int minIndexBufferSize,
int maxIndexBufferSize,
String basePath,
int[] writeOrder)
throws IOException {
checkArgument(numSubpartitions > 0, "Illegal number of subpartitions.");
checkArgument(maxIndexBufferSize > 0, "Illegal maximum index cache size.");
Expand All @@ -124,6 +135,7 @@ public PartitionedFileWriter(int numSubpartitions, int maxIndexBufferSize, Strin
this.subpartitionBytes = new long[numSubpartitions];
this.dataFilePath = new File(basePath + PartitionedFile.DATA_FILE_SUFFIX).toPath();
this.indexFilePath = new File(basePath + PartitionedFile.INDEX_FILE_SUFFIX).toPath();
this.writeOrder = checkNotNull(writeOrder);

this.indexBuffer = ByteBuffer.allocate(minIndexBufferSize);
BufferReaderWriterUtil.configureByteBuffer(indexBuffer);
Expand Down Expand Up @@ -195,13 +207,45 @@ private boolean extendIndexBufferIfPossible() {

private void writeRegionIndex() throws IOException {
if (Arrays.stream(subpartitionBytes).sum() > 0) {
updateEmptySubpartitionOffsets();
for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
writeIndexEntry(subpartitionOffsets[subpartition], subpartitionBytes[subpartition]);
}

currentSubpartition = -1;
++numRegions;
Arrays.fill(subpartitionBytes, 0);
preRegionTotalBytesWritten = totalBytesWritten;
}
}

/**
* Updates the offsets of subpartitions, ensuring that they are contiguous.
*
* <p>This method is necessary because empty subpartitions do not trigger an update to their
* offsets during the usual process. As such, we need to ensure here that every subpartition,
* including empty ones, has its offset updated to maintain continuity. This process involves
* adjusting each subpartition's offset based on the sum of previous subpartitions' bytes,
* ensuring seamless data handling and storage alignment.
*/
private void updateEmptySubpartitionOffsets() {
for (int i = 0; i < writeOrder.length; i++) {
int currentSubPartition = writeOrder[i];

if (subpartitionBytes[currentSubPartition] == 0) {
if (i == 0) {
// For the first subpartition, set its offset to the current pre-region total
// bytes written if it's empty.
subpartitionOffsets[currentSubPartition] = preRegionTotalBytesWritten;
} else {
// For non-first subpartitions, update the offset of an empty subpartition to be
// contiguous with the previous subpartition.
int preSubPartition = writeOrder[i - 1];
subpartitionOffsets[currentSubPartition] =
subpartitionOffsets[preSubPartition]
+ subpartitionBytes[preSubPartition];
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ protected void setupInternal() throws IOException {
try {
// allocate at most 4M heap memory for caching of index entries
fileWriter =
new PartitionedFileWriter(numSubpartitions, 4194304, resultFileBasePath);
new PartitionedFileWriter(
numSubpartitions, 4194304, resultFileBasePath, subpartitionOrder);
} catch (Throwable throwable) {
throw new IOException("Failed to create file writer.", throwable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,13 @@ public static PartitionedFile createPartitionedFile(
}
}

int[] writeOrder = new int[numSubpartitions];
for (int i = 0; i < numSubpartitions; i++) {
writeOrder[i] = i;
}

PartitionedFileWriter fileWriter =
new PartitionedFileWriter(numSubpartitions, 1024, basePath);
new PartitionedFileWriter(numSubpartitions, 1024, basePath, writeOrder);
fileWriter.startNewRegion(false);
fileWriter.writeBuffers(buffers);
return fileWriter.finish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void testWriteAndReadPartitionedFile() throws Exception {
numRegions,
buffersWritten,
regionStat,
createPartitionedFileWriter(numSubpartitions),
createPartitionedFileWriter(numSubpartitions, writeOrder),
subpartitionIndex -> subpartitionIndex,
random.nextBoolean(),
writeOrder);
Expand Down Expand Up @@ -151,7 +151,7 @@ void testComputeReadablePosition(boolean randomSubpartitionOrder, boolean broadc
numRegions,
buffersWritten,
regionStat,
createPartitionedFileWriter(numSubpartitions),
createPartitionedFileWriter(numSubpartitions, writeOrder),
subpartitionIndex -> subpartitionIndex,
broadcastRegion,
writeOrder);
Expand Down Expand Up @@ -286,7 +286,7 @@ void testWriteAndReadPartitionedFileForSubpartitionRange() throws Exception {
numRegions,
buffersWritten,
regionStat,
createPartitionedFileWriter(numSubpartitions),
createPartitionedFileWriter(numSubpartitions, writeOrder),
subpartitionIndex -> subpartitionIndex / 2,
false,
writeOrder);
Expand Down Expand Up @@ -419,7 +419,10 @@ private void addReadBuffer(Buffer buffer, List<Buffer> buffersRead) {
}

private static Queue<MemorySegment> allocateBuffers(int bufferSize) {
int numBuffers = 2;
return allocateBuffers(bufferSize, 2);
}

private static Queue<MemorySegment> allocateBuffers(int bufferSize, int numBuffers) {
Queue<MemorySegment> readBuffers = new LinkedList<>();
while (numBuffers-- > 0) {
readBuffers.add(MemorySegmentFactory.allocateUnpooledSegment(bufferSize));
Expand All @@ -441,7 +444,8 @@ void testWriteAndReadWithEmptySubpartition() throws Exception {
buffersRead[subpartition] = new ArrayList<>();
}

PartitionedFileWriter fileWriter = createPartitionedFileWriter(numSubpartitions);
PartitionedFileWriter fileWriter =
createPartitionedFileWriter(numSubpartitions, new int[] {0, 1, 2, 3, 4});
for (int region = 0; region < numRegions; ++region) {
fileWriter.startNewRegion(false);
for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
Expand Down Expand Up @@ -481,6 +485,68 @@ void testWriteAndReadWithEmptySubpartition() throws Exception {
IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
}

@Test
void testWriteAndReadWithEmptySubpartitionForMultipleSubpartitions() throws Exception {
int numRegions = 10;
int numSubpartitions = 5;
int bufferSize = 1024;
Random random = new Random();

Queue<Buffer>[] subpartitionBuffers = new ArrayDeque[numRegions];
List<Buffer>[] buffersRead = new List[numRegions];
for (int region = 0; region < numRegions; region++) {
subpartitionBuffers[region] = new ArrayDeque<>();
buffersRead[region] = new ArrayList<>();
}

int[] writeOrder = new int[] {0, 1, 2, 3, 4};
PartitionedFileWriter fileWriter =
createPartitionedFileWriter(numSubpartitions, writeOrder);
for (int region = 0; region < numRegions; ++region) {
fileWriter.startNewRegion(false);
for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
if (random.nextBoolean()) {
Buffer buffer = createBuffer(random, bufferSize);
subpartitionBuffers[region].add(buffer);
fileWriter.writeBuffers(getBufferWithSubpartitions(buffer, subpartition));
}
}
}
PartitionedFile partitionedFile = fileWriter.finish();

FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
PartitionedFileReader fileReader =
new PartitionedFileReader(
partitionedFile,
new ResultSubpartitionIndexSet(0, numSubpartitions - 1),
dataFileChannel,
indexFileChannel,
BufferReaderWriterUtil.allocatedHeaderBuffer(),
createAndConfigIndexEntryBuffer(),
writeOrder[0]);
int regionIndex = 0;
while (fileReader.hasRemaining()) {
if (subpartitionBuffers[regionIndex].isEmpty()) {
regionIndex++;
} else {
int finalRegionIndex = regionIndex;
fileReader.readCurrentRegion(
allocateBuffers(bufferSize, 10),
FreeingBufferRecycler.INSTANCE,
buffer -> addReadBuffer(buffer, buffersRead[finalRegionIndex]));
for (Buffer buffer : buffersRead[finalRegionIndex]) {
assertBufferEquals(
checkNotNull(subpartitionBuffers[finalRegionIndex].poll()), buffer);
}

assertThat(subpartitionBuffers[finalRegionIndex]).isEmpty();
regionIndex++;
}
}
IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
}

private void assertBufferEquals(Buffer expected, Buffer actual) {
assertThat(expected.getDataType()).isEqualTo(actual.getDataType());
assertThat(expected.getNioBufferReadable()).isEqualTo(actual.getNioBufferReadable());
Expand All @@ -498,7 +564,8 @@ private Buffer createBuffer(Random random, int bufferSize) {

@Test
void testNotWriteDataOfTheSameSubpartitionTogether() throws Exception {
PartitionedFileWriter partitionedFileWriter = createPartitionedFileWriter(2);
PartitionedFileWriter partitionedFileWriter =
createPartitionedFileWriter(2, new int[] {1, 0});
try {
MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024);

Expand Down Expand Up @@ -609,7 +676,8 @@ void testMultipleThreadGetIndexEntry() throws Exception {
createPartitionedFileWriter(
numSubpartitions,
PartitionedFile.INDEX_ENTRY_SIZE * numSubpartitions,
PartitionedFile.INDEX_ENTRY_SIZE * numSubpartitions),
PartitionedFile.INDEX_ENTRY_SIZE * numSubpartitions,
writeOrder),
subpartitionIndex -> subpartitionIndex,
random.nextBoolean(),
writeOrder);
Expand Down Expand Up @@ -659,29 +727,34 @@ private List<BufferWithSubpartition> getBufferWithSubpartitions(
}

private PartitionedFile createEmptyPartitionedFile() throws IOException {
PartitionedFileWriter partitionedFileWriter = createPartitionedFileWriter(2);
PartitionedFileWriter partitionedFileWriter = createPartitionedFileWriter(2, new int[0]);
return partitionedFileWriter.finish();
}

private PartitionedFileWriter createPartitionedFileWriter(int numSubpartitions)
throws IOException {
return createPartitionedFileWriter(numSubpartitions, 640);
private PartitionedFileWriter createPartitionedFileWriter(
int numSubpartitions, int[] writeOrder) throws IOException {
return createPartitionedFileWriter(numSubpartitions, 640, writeOrder);
}

private PartitionedFileWriter createPartitionedFileWriter(
int numSubpartitions, int minIndexBufferSize, int maxIndexBufferSize)
int numSubpartitions, int minIndexBufferSize, int maxIndexBufferSize, int[] writeOrder)
throws IOException {
return new PartitionedFileWriter(
numSubpartitions, minIndexBufferSize, maxIndexBufferSize, tempPath.toString());
numSubpartitions,
minIndexBufferSize,
maxIndexBufferSize,
tempPath.toString(),
writeOrder);
}

private PartitionedFileWriter createPartitionedFileWriter(
int numSubpartitions, int maxIndexBufferSize) throws IOException {
return new PartitionedFileWriter(numSubpartitions, maxIndexBufferSize, tempPath.toString());
int numSubpartitions, int maxIndexBufferSize, int[] writeOrder) throws IOException {
return new PartitionedFileWriter(
numSubpartitions, maxIndexBufferSize, tempPath.toString(), writeOrder);
}

private PartitionedFileWriter createAndFinishPartitionedFileWriter() throws IOException {
PartitionedFileWriter partitionedFileWriter = createPartitionedFileWriter(1);
PartitionedFileWriter partitionedFileWriter = createPartitionedFileWriter(1, new int[0]);
partitionedFileWriter.finish();
return partitionedFileWriter;
}
Expand Down

0 comments on commit e858969

Please sign in to comment.