From 3048dbecfebf922f5e26e2bf36ac610b7c45609b Mon Sep 17 00:00:00 2001 From: YeJunHao <41894543+leaves12138@users.noreply.github.com> Date: Tue, 12 Sep 2023 19:58:35 +0800 Subject: [PATCH] [core] Introduce write-buffer-for-append to writing lots of partitions (#1969) --- docs/content/concepts/append-only-table.md | 30 ++- .../generated/core_configuration.html | 6 + .../java/org/apache/paimon/CoreOptions.java | 11 + .../paimon/append/AppendOnlyWriter.java | 202 ++++++++++++++++-- .../apache/paimon/disk/ExternalBuffer.java | 136 ++---------- .../apache/paimon/disk/InMemoryBuffer.java | 176 +++++++++++++++ .../apache/paimon/disk/InternalRowBuffer.java | 65 ++++++ .../operation/AppendOnlyFileStoreWrite.java | 11 +- .../paimon/append/AppendOnlyWriterTest.java | 183 +++++++++++++++- .../paimon/disk/ExternalBufferTest.java | 11 +- .../paimon/disk/InMemoryBufferTest.java | 117 ++++++++++ .../paimon/format/FileFormatSuffixTest.java | 10 +- .../index/GlobalIndexAssignerOperator.java | 16 +- 13 files changed, 819 insertions(+), 155 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/disk/InternalRowBuffer.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java diff --git a/docs/content/concepts/append-only-table.md b/docs/content/concepts/append-only-table.md index 3ae9d88b3bdf..e8a87b699d96 100644 --- a/docs/content/concepts/append-only-table.md +++ b/docs/content/concepts/append-only-table.md @@ -27,7 +27,7 @@ under the License. # Append Only Table If a table does not have a primary key defined, it is an append-only table by default. Separated by the definition of bucket, -we have two different append-only mode: "Append For Queue" and "Append For Scalable Table". +we have two different append-only mode: "Append For Queue" and "Append For Scalable Table". ## Append For Queue @@ -276,4 +276,30 @@ CREATE TABLE MyTable ( ``` {{< /tab >}} -{{< /tabs >}} \ No newline at end of file +{{< /tabs >}} + +## Multiple Partitions Write +While writing multiple partitions in a single insert job, we may get an out-of-memory error if +too many records arrived between two checkpoint. + +On 0.6 version, introduced a `write-buffer-for-append` option for append-only table. Setting this +parameter to true, we will cache the records use Segment Pool to avoid OOM. + +### Influence +If we also set `write-buffer-spillable` to true, we can spill the records to disk by serializer. +But this may cause the checkpoint acknowledge delay and have an influence on sink speed. + +But if we don't set `write-buffer-spillable`, once we run out of memory in segment poll, we will flush +them to filesystem as a complete data file. This may cause too many small files, and make more pressure on +compaction. We need to trade off carefully. + +### Example +```sql +CREATE TABLE MyTable ( + product_id BIGINT, + price DOUBLE, + sales BIGINT +) WITH ( + 'write-buffer-for-append' = 'true' +); +``` \ No newline at end of file diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index c3b877982cce..d327b9243b97 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -557,6 +557,12 @@ MemorySize Target size of a file. + +
write-buffer-for-append
+ false + Boolean + This option only works for append-only table. Whether the write use write buffer to avoid out-of-memory error. +
write-buffer-size
256 mb diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 7ded97bba99c..e4f233ef285d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -277,6 +277,13 @@ public class CoreOptions implements Serializable { .withDescription( "Whether the write buffer can be spillable. Enabled by default when using object storage."); + public static final ConfigOption WRITE_BUFFER_FOR_APPEND = + key("write-buffer-for-append") + .booleanType() + .defaultValue(false) + .withDescription( + "This option only works for append-only table. Whether the write use write buffer to avoid out-of-memory error."); + public static final ConfigOption WRITE_MANIFEST_CACHE = key("write-manifest-cache") .memoryType() @@ -1007,6 +1014,10 @@ public boolean writeBufferSpillable(boolean usingObjectStore, boolean isStreamin return options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore || !isStreaming); } + public boolean useWriteBuffer() { + return options.get(WRITE_BUFFER_FOR_APPEND); + } + public long sortSpillBufferSize() { return options.get(SORT_SPILL_BUFFER_SIZE).getBytes(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 84fd01d739c9..7b09095688ba 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -19,8 +19,12 @@ package org.apache.paimon.append; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.compact.CompactManager; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.disk.InternalRowBuffer; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.FileIO; import org.apache.paimon.io.CompactIncrement; @@ -28,6 +32,8 @@ import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.io.NewFilesIncrement; import org.apache.paimon.io.RowDataRollingFileWriter; +import org.apache.paimon.memory.MemoryOwner; +import org.apache.paimon.memory.MemorySegmentPool; import org.apache.paimon.statistics.FieldStatsCollector; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; @@ -38,6 +44,7 @@ import javax.annotation.Nullable; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -48,7 +55,7 @@ * A {@link RecordWriter} implementation that only accepts records which are always insert * operations and don't have any unique keys or sort keys. */ -public class AppendOnlyWriter implements RecordWriter { +public class AppendOnlyWriter implements RecordWriter, MemoryOwner { private final FileIO fileIO; private final long schemaId; @@ -63,12 +70,14 @@ public class AppendOnlyWriter implements RecordWriter { private final List compactAfter; private final LongCounter seqNumCounter; private final String fileCompression; + private final boolean spillable; + private final SinkWriter sinkWriter; private final FieldStatsCollector.Factory[] statsCollectors; - - private RowDataRollingFileWriter writer; + private final IOManager ioManager; public AppendOnlyWriter( FileIO fileIO, + IOManager ioManager, long schemaId, FileFormat fileFormat, long targetFileSize, @@ -78,6 +87,8 @@ public AppendOnlyWriter( boolean forceCompact, DataFilePathFactory pathFactory, @Nullable CommitIncrement increment, + boolean useWriteBuffer, + boolean spillable, String fileCompression, FieldStatsCollector.Factory[] statsCollectors) { this.fileIO = fileIO; @@ -93,9 +104,11 @@ public AppendOnlyWriter( this.compactAfter = new ArrayList<>(); this.seqNumCounter = new LongCounter(maxSequenceNumber + 1); this.fileCompression = fileCompression; + this.spillable = spillable; + this.ioManager = ioManager; this.statsCollectors = statsCollectors; - this.writer = createRollingRowWriter(); + sinkWriter = createSinkWrite(useWriteBuffer); if (increment != null) { newFiles.addAll(increment.newFilesIncrement().newFiles()); @@ -110,12 +123,22 @@ public void write(InternalRow rowData) throws Exception { rowData.getRowKind() == RowKind.INSERT, "Append-only writer can only accept insert row kind, but current row kind is: %s", rowData.getRowKind()); - writer.write(rowData); + boolean success = sinkWriter.write(rowData); + if (!success) { + flush(false, false); + success = sinkWriter.write(rowData); + if (!success) { + // Should not get here, because writeBuffer will throw too big exception out. + // But we throw again in case of something unexpected happens. (like someone changed + // code in SpillableBuffer.) + throw new RuntimeException("Mem table is too small to hold a single element."); + } + } } @Override public void compact(boolean fullCompaction) throws Exception { - flushWriter(true, fullCompaction); + flush(true, fullCompaction); } @Override @@ -130,19 +153,14 @@ public Collection dataFiles() { @Override public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception { - flushWriter(false, false); + flush(false, false); trySyncLatestCompaction(waitCompaction || forceCompact); return drainIncrement(); } - private void flushWriter(boolean waitForLatestCompaction, boolean forcedFullCompaction) + private void flush(boolean waitForLatestCompaction, boolean forcedFullCompaction) throws Exception { - List flushedFiles = new ArrayList<>(); - if (writer != null) { - writer.close(); - flushedFiles.addAll(writer.result()); - writer = createRollingRowWriter(); - } + List flushedFiles = sinkWriter.flush(); // add new generated files flushedFiles.forEach(compactManager::addNewFile); @@ -169,9 +187,14 @@ public void close() throws Exception { fileIO.deleteQuietly(pathFactory.toPath(file.fileName())); } - if (writer != null) { - writer.abort(); - writer = null; + sinkWriter.close(); + } + + private SinkWriter createSinkWrite(boolean useWriteBuffer) { + if (useWriteBuffer) { + return new BufferedSinkWriter(); + } else { + return new DirectSinkWriter(); } } @@ -214,4 +237,149 @@ private CommitIncrement drainIncrement() { return new CommitIncrement(newFilesIncrement, compactIncrement); } + + @Override + public void setMemoryPool(MemorySegmentPool memoryPool) { + sinkWriter.setMemoryPool(memoryPool); + } + + @Override + public long memoryOccupancy() { + return sinkWriter.memoryOccupancy(); + } + + @Override + public void flushMemory() throws Exception { + flush(false, false); + } + + @VisibleForTesting + InternalRowBuffer getWriteBuffer() { + if (sinkWriter instanceof BufferedSinkWriter) { + return ((BufferedSinkWriter) sinkWriter).writeBuffer; + } else { + return null; + } + } + + @VisibleForTesting + List getNewFiles() { + return newFiles; + } + + /** Internal interface to Sink Data from input. */ + interface SinkWriter { + + boolean write(InternalRow data) throws IOException; + + List flush() throws IOException; + + long memoryOccupancy(); + + void close(); + + void setMemoryPool(MemorySegmentPool memoryPool); + } + + /** + * Directly sink data to file, no memory cache here, use OrcWriter/ParquetWrite/etc directly + * write data. May cause out-of-memory. + */ + private class DirectSinkWriter implements SinkWriter { + + private RowDataRollingFileWriter writer = createRollingRowWriter(); + + @Override + public boolean write(InternalRow data) throws IOException { + writer.write(data); + return true; + } + + @Override + public List flush() throws IOException { + List flushedFiles = new ArrayList<>(); + if (writer != null) { + writer.close(); + flushedFiles.addAll(writer.result()); + writer = createRollingRowWriter(); + } + return flushedFiles; + } + + @Override + public long memoryOccupancy() { + return 0; + } + + @Override + public void close() { + if (writer != null) { + writer.abort(); + writer = null; + } + } + + @Override + public void setMemoryPool(MemorySegmentPool memoryPool) { + // do nothing + } + } + + /** + * Use buffered writer, segment pooled from segment pool. When spillable, may delay checkpoint + * acknowledge time. When non-spillable, may cause too many small files. + */ + private class BufferedSinkWriter implements SinkWriter { + + InternalRowBuffer writeBuffer; + + @Override + public boolean write(InternalRow data) throws IOException { + return writeBuffer.put(data); + } + + @Override + public List flush() throws IOException { + List flushedFiles = new ArrayList<>(); + if (writeBuffer != null) { + writeBuffer.complete(); + RowDataRollingFileWriter writer = createRollingRowWriter(); + try (InternalRowBuffer.InternalRowBufferIterator iterator = + writeBuffer.newIterator()) { + while (iterator.advanceNext()) { + writer.write(iterator.getRow()); + } + } finally { + writer.close(); + } + flushedFiles.addAll(writer.result()); + // reuse writeBuffer + writeBuffer.reset(); + } + return flushedFiles; + } + + @Override + public long memoryOccupancy() { + return writeBuffer.memoryOccupancy(); + } + + @Override + public void close() { + if (writeBuffer != null) { + writeBuffer.reset(); + writeBuffer = null; + } + } + + @Override + public void setMemoryPool(MemorySegmentPool memoryPool) { + writeBuffer = + InternalRowBuffer.getBuffer( + ioManager, + memoryPool, + new InternalRowSerializer(writeSchema), + spillable); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java index 9ced8e83d50f..2503bf8e6914 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java @@ -20,8 +20,6 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.data.RandomAccessInputView; -import org.apache.paimon.data.SimpleCollectingOutputView; import org.apache.paimon.data.serializer.AbstractRowDataSerializer; import org.apache.paimon.data.serializer.BinaryRowSerializer; import org.apache.paimon.memory.Buffer; @@ -32,8 +30,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; -import java.io.EOFException; import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -42,7 +38,7 @@ import static org.apache.paimon.utils.Preconditions.checkState; /** An external buffer for storing rows, it will spill the data to disk when the memory is full. */ -public class ExternalBuffer { +public class ExternalBuffer implements InternalRowBuffer { private static final Logger LOG = LoggerFactory.getLogger(ExternalBuffer.class); @@ -59,7 +55,7 @@ public class ExternalBuffer { private boolean addCompleted; - public ExternalBuffer( + ExternalBuffer( IOManager ioManager, MemorySegmentPool pool, AbstractRowDataSerializer serializer) { this.ioManager = ioManager; this.pool = pool; @@ -79,7 +75,7 @@ public ExternalBuffer( //noinspection unchecked this.inMemoryBuffer = - new InMemoryBuffer((AbstractRowDataSerializer) serializer); + new InMemoryBuffer(pool, (AbstractRowDataSerializer) serializer); } public void reset() { @@ -89,33 +85,35 @@ public void reset() { addCompleted = false; } - public void add(InternalRow row) throws IOException { + public boolean put(InternalRow row) throws IOException { checkState(!addCompleted, "This buffer has add completed."); - if (!inMemoryBuffer.write(row)) { + if (!inMemoryBuffer.put(row)) { // Check if record is too big. if (inMemoryBuffer.getCurrentDataBufferOffset() == 0) { throwTooBigException(row); } spill(); - if (!inMemoryBuffer.write(row)) { + if (!inMemoryBuffer.put(row)) { throwTooBigException(row); } } numRows++; + return true; } public void complete() { addCompleted = true; } - public BufferIterator newIterator() { + @Override + public InternalRowBufferIterator newIterator() { checkState(addCompleted, "This buffer has not add completed."); return new BufferIterator(); } private void throwTooBigException(InternalRow row) throws IOException { - int rowSize = inMemoryBuffer.serializer.toBinaryRow(row).toBytes().length; + int rowSize = inMemoryBuffer.getSerializer().toBinaryRow(row).toBytes().length; throw new IOException( "Record is too big, it can't be added to a empty InMemoryBuffer! " + "Record size: " @@ -142,7 +140,7 @@ private void spill() throws IOException { } LOG.info( "here spill the reset buffer data with {} records {} bytes", - inMemoryBuffer.numRecords, + inMemoryBuffer.size(), writer.getSize()); writer.close(); } catch (IOException e) { @@ -163,6 +161,10 @@ public int size() { return numRows; } + public long memoryOccupancy() { + return inMemoryBuffer.memoryOccupancy(); + } + private int memorySize() { return pool.freePages() * segmentSize; } @@ -179,7 +181,7 @@ private void clearChannels() { } /** Iterator of external buffer. */ - public class BufferIterator implements Closeable { + public class BufferIterator implements InternalRowBufferIterator { private MutableObjectIterator currentIterator; private final BinaryRow reuse = binaryRowSerializer.createInstance(); @@ -234,7 +236,7 @@ public boolean advanceNext() { } private boolean nextIterator() throws IOException { - if (currentChannelID == Integer.MAX_VALUE) { + if (currentChannelID == Integer.MAX_VALUE || numRows == 0) { return false; } else if (currentChannelID < spilledChannelIDs.size() - 1) { nextSpilledIterator(); @@ -275,109 +277,7 @@ private void newMemoryIterator() { } @VisibleForTesting - List getSpillChannels() { + public List getSpillChannels() { return spilledChannelIDs; } - - private class InMemoryBuffer { - - private final AbstractRowDataSerializer serializer; - private final ArrayList recordBufferSegments; - private final SimpleCollectingOutputView recordCollector; - - private long currentDataBufferOffset; - private int numBytesInLastBuffer; - private int numRecords = 0; - - private InMemoryBuffer(AbstractRowDataSerializer serializer) { - // serializer has states, so we must duplicate - this.serializer = (AbstractRowDataSerializer) serializer.duplicate(); - this.recordBufferSegments = new ArrayList<>(); - this.recordCollector = - new SimpleCollectingOutputView(this.recordBufferSegments, pool, segmentSize); - } - - private void reset() { - this.currentDataBufferOffset = 0; - this.numRecords = 0; - returnToSegmentPool(); - this.recordCollector.reset(); - } - - private void returnToSegmentPool() { - pool.returnAll(this.recordBufferSegments); - this.recordBufferSegments.clear(); - } - - public boolean write(InternalRow row) throws IOException { - try { - this.serializer.serializeToPages(row, this.recordCollector); - currentDataBufferOffset = this.recordCollector.getCurrentOffset(); - numBytesInLastBuffer = this.recordCollector.getCurrentPositionInSegment(); - numRecords++; - return true; - } catch (EOFException e) { - return false; - } - } - - private ArrayList getRecordBufferSegments() { - return recordBufferSegments; - } - - private long getCurrentDataBufferOffset() { - return currentDataBufferOffset; - } - - private int getNumRecordBuffers() { - int result = (int) (currentDataBufferOffset / segmentSize); - long mod = currentDataBufferOffset % segmentSize; - if (mod != 0) { - result += 1; - } - return result; - } - - private int getNumBytesInLastBuffer() { - return numBytesInLastBuffer; - } - - private InMemoryBufferIterator newIterator() { - RandomAccessInputView recordBuffer = - new RandomAccessInputView( - this.recordBufferSegments, segmentSize, numBytesInLastBuffer); - return new InMemoryBufferIterator(recordBuffer, serializer); - } - } - - private static class InMemoryBufferIterator - implements MutableObjectIterator, Closeable { - - private final RandomAccessInputView recordBuffer; - private final AbstractRowDataSerializer serializer; - - private InMemoryBufferIterator( - RandomAccessInputView recordBuffer, - AbstractRowDataSerializer serializer) { - this.recordBuffer = recordBuffer; - this.serializer = serializer; - } - - @Override - public BinaryRow next(BinaryRow reuse) throws IOException { - try { - return (BinaryRow) serializer.mapFromPages(reuse, recordBuffer); - } catch (EOFException e) { - return null; - } - } - - @Override - public BinaryRow next() throws IOException { - throw new RuntimeException("Not support!"); - } - - @Override - public void close() {} - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java new file mode 100644 index 000000000000..4758bb183215 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.disk; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.RandomAccessInputView; +import org.apache.paimon.data.SimpleCollectingOutputView; +import org.apache.paimon.data.serializer.AbstractRowDataSerializer; +import org.apache.paimon.memory.MemorySegment; +import org.apache.paimon.memory.MemorySegmentPool; +import org.apache.paimon.utils.MutableObjectIterator; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; + +/** Only cache {@link InternalRow}s in memory. */ +public class InMemoryBuffer implements InternalRowBuffer { + private final AbstractRowDataSerializer serializer; + private final ArrayList recordBufferSegments; + private final SimpleCollectingOutputView recordCollector; + private final MemorySegmentPool pool; + private final int segmentSize; + + private long currentDataBufferOffset; + private int numBytesInLastBuffer; + private int numRecords = 0; + + InMemoryBuffer(MemorySegmentPool pool, AbstractRowDataSerializer serializer) { + // serializer has states, so we must duplicate + this.serializer = (AbstractRowDataSerializer) serializer.duplicate(); + this.pool = pool; + this.segmentSize = pool.pageSize(); + this.recordBufferSegments = new ArrayList<>(); + this.recordCollector = + new SimpleCollectingOutputView(this.recordBufferSegments, pool, segmentSize); + } + + @Override + public void reset() { + this.currentDataBufferOffset = 0; + this.numRecords = 0; + returnToSegmentPool(); + this.recordCollector.reset(); + } + + private void returnToSegmentPool() { + pool.returnAll(this.recordBufferSegments); + this.recordBufferSegments.clear(); + } + + @Override + public boolean put(InternalRow row) throws IOException { + try { + this.serializer.serializeToPages(row, this.recordCollector); + currentDataBufferOffset = this.recordCollector.getCurrentOffset(); + numBytesInLastBuffer = this.recordCollector.getCurrentPositionInSegment(); + numRecords++; + return true; + } catch (EOFException e) { + return false; + } + } + + @Override + public int size() { + return numRecords; + } + + @Override + public void complete() {} + + @Override + public long memoryOccupancy() { + return currentDataBufferOffset; + } + + @Override + public InMemoryBufferIterator newIterator() { + RandomAccessInputView recordBuffer = + new RandomAccessInputView( + this.recordBufferSegments, segmentSize, numBytesInLastBuffer); + return new InMemoryBufferIterator(recordBuffer, serializer); + } + + ArrayList getRecordBufferSegments() { + return recordBufferSegments; + } + + long getCurrentDataBufferOffset() { + return currentDataBufferOffset; + } + + int getNumRecordBuffers() { + int result = (int) (currentDataBufferOffset / segmentSize); + long mod = currentDataBufferOffset % segmentSize; + if (mod != 0) { + result += 1; + } + return result; + } + + int getNumBytesInLastBuffer() { + return numBytesInLastBuffer; + } + + AbstractRowDataSerializer getSerializer() { + return serializer; + } + + private static class InMemoryBufferIterator + implements InternalRowBufferIterator, MutableObjectIterator { + + private final RandomAccessInputView recordBuffer; + private final AbstractRowDataSerializer serializer; + private final BinaryRow reuse; + private BinaryRow row; + + private InMemoryBufferIterator( + RandomAccessInputView recordBuffer, + AbstractRowDataSerializer serializer) { + this.recordBuffer = recordBuffer; + this.serializer = serializer; + this.reuse = new BinaryRow(serializer.getArity()); + } + + @Override + public boolean advanceNext() { + try { + row = next(reuse); + return row != null; + } catch (IOException ioException) { + throw new RuntimeException(ioException); + } + } + + @Override + public BinaryRow getRow() { + return row; + } + + @Override + public BinaryRow next(BinaryRow reuse) throws IOException { + try { + return (BinaryRow) serializer.mapFromPages(reuse, recordBuffer); + } catch (EOFException e) { + return null; + } + } + + @Override + public BinaryRow next() throws IOException { + return next(reuse); + } + + @Override + public void close() {} + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/InternalRowBuffer.java b/paimon-core/src/main/java/org/apache/paimon/disk/InternalRowBuffer.java new file mode 100644 index 000000000000..776db52aa8ea --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/disk/InternalRowBuffer.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.disk; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.AbstractRowDataSerializer; +import org.apache.paimon.memory.MemorySegmentPool; + +import java.io.Closeable; +import java.io.IOException; + +/** Cache buffer for {@link InternalRow}. */ +public interface InternalRowBuffer { + + boolean put(InternalRow row) throws IOException; + + int size(); + + long memoryOccupancy(); + + void complete(); + + void reset(); + + InternalRowBufferIterator newIterator(); + + /** Iterator to fetch record from buffer. */ + interface InternalRowBufferIterator extends Closeable { + + boolean advanceNext(); + + BinaryRow getRow(); + + void close(); + } + + static InternalRowBuffer getBuffer( + IOManager ioManager, + MemorySegmentPool memoryPool, + AbstractRowDataSerializer serializer, + boolean spillable) { + if (spillable) { + return new ExternalBuffer(ioManager, memoryPool, serializer); + } else { + return new InMemoryBuffer(memoryPool, serializer); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index 784fdfb657c1..ca1d65caead4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -52,7 +52,7 @@ import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber; /** {@link FileStoreWrite} for {@link AppendOnlyFileStore}. */ -public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite { +public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite { private final FileIO fileIO; private final AppendOnlyFileStoreRead read; @@ -65,6 +65,8 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite createWriter( return new AppendOnlyWriter( fileIO, + ioManager, schemaId, fileFormat, targetFileSize, @@ -130,6 +135,8 @@ protected RecordWriter createWriter( commitForceCompact, factory, restoreIncrement, + useWriteBuffer, + spillable, fileCompression, statsCollectors); } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index 680e483e2691..5699ea415db3 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -24,12 +24,17 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.disk.ChannelWithMeta; +import org.apache.paimon.disk.ExternalBuffer; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.disk.InternalRowBuffer; import org.apache.paimon.format.FieldStats; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.memory.HeapMemorySegmentPool; import org.apache.paimon.options.Options; import org.apache.paimon.stats.FieldStatsArraySerializer; import org.apache.paimon.types.DataType; @@ -42,12 +47,15 @@ import org.apache.paimon.utils.RecordWriter; import org.apache.paimon.utils.StatsCollectorFactories; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.io.File; import java.io.IOException; import java.nio.file.Files; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -306,6 +314,137 @@ public void testCloseUnexpectedly() throws Exception { assertThat(afterClosedUnexpectedly).containsExactlyInAnyOrderElementsOf(committedFiles); } + @Test + public void testExternalBufferWorks() throws Exception { + AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE, true); + + // we give it a small Memory Pool, force it to spill + writer.setMemoryPool(new HeapMemorySegmentPool(16384L, 1024)); + + char[] s = new char[990]; + Arrays.fill(s, 'a'); + + // set the record that much larger than the maxMemory + for (int j = 0; j < 100; j++) { + writer.write(row(j, String.valueOf(s), PART)); + } + + InternalRowBuffer buffer = writer.getWriteBuffer(); + Assertions.assertThat(buffer.size()).isEqualTo(100); + Assertions.assertThat(buffer.memoryOccupancy()).isLessThanOrEqualTo(16384L); + + writer.close(); + } + + @Test + public void testNoBuffer() throws Exception { + AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE); + + // we give it a small Memory Pool, force it to spill + writer.setMemoryPool(new HeapMemorySegmentPool(16384L, 1024)); + + InternalRowBuffer buffer = writer.getWriteBuffer(); + Assertions.assertThat(buffer).isNull(); + + writer.close(); + } + + @Test + public void testMultipleFlush() throws Exception { + AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE, true); + + // we give it a small Memory Pool, force it to spill + writer.setMemoryPool(new HeapMemorySegmentPool(16384L, 1024)); + + char[] s = new char[990]; + Arrays.fill(s, 'a'); + + // set the record that much larger than the maxMemory + for (int j = 0; j < 100; j++) { + writer.write(row(j, String.valueOf(s), PART)); + } + + writer.flushMemory(); + Assertions.assertThat(writer.memoryOccupancy()).isEqualTo(0L); + Assertions.assertThat(writer.getWriteBuffer().size()).isEqualTo(0); + Assertions.assertThat(writer.getNewFiles().size()).isGreaterThan(0); + long rowCount = + writer.getNewFiles().stream().map(DataFileMeta::rowCount).reduce(0L, Long::sum); + Assertions.assertThat(rowCount).isEqualTo(100); + + for (int j = 0; j < 100; j++) { + writer.write(row(j, String.valueOf(s), PART)); + } + writer.flushMemory(); + + Assertions.assertThat(writer.memoryOccupancy()).isEqualTo(0L); + Assertions.assertThat(writer.getWriteBuffer().size()).isEqualTo(0); + Assertions.assertThat(writer.getNewFiles().size()).isGreaterThan(1); + rowCount = writer.getNewFiles().stream().map(DataFileMeta::rowCount).reduce(0L, Long::sum); + Assertions.assertThat(rowCount).isEqualTo(200); + } + + @Test + public void testClose() throws Exception { + AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE, true); + + // we give it a small Memory Pool, force it to spill + writer.setMemoryPool(new HeapMemorySegmentPool(16384L, 1024)); + + char[] s = new char[990]; + Arrays.fill(s, 'a'); + + // set the record that much larger than the maxMemory + for (int j = 0; j < 100; j++) { + writer.write(row(j, String.valueOf(s), PART)); + } + + ExternalBuffer externalBuffer = (ExternalBuffer) writer.getWriteBuffer(); + List channel = externalBuffer.getSpillChannels(); + + writer.close(); + + for (ChannelWithMeta meta : channel) { + File file = new File(meta.getChannel().getPath()); + Assertions.assertThat(file.exists()).isEqualTo(false); + } + Assertions.assertThat(writer.getWriteBuffer()).isEqualTo(null); + } + + @Test + public void testNonSpillable() throws Exception { + AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE, true); + + // we give it a small Memory Pool, force it to spill + writer.setMemoryPool(new HeapMemorySegmentPool(2048L, 1024)); + + char[] s = new char[990]; + Arrays.fill(s, 'a'); + + // set the record that much larger than the maxMemory + for (int j = 0; j < 100; j++) { + writer.write(row(j, String.valueOf(s), PART)); + } + // we got only one file after commit + Assertions.assertThat(writer.prepareCommit(true).newFilesIncrement().newFiles().size()) + .isEqualTo(1); + writer.close(); + + writer = createEmptyWriter(Long.MAX_VALUE, false); + + // we give it a small Memory Pool, force it to spill + writer.setMemoryPool(new HeapMemorySegmentPool(2048L, 1024)); + + // set the record that much larger than the maxMemory + for (int j = 0; j < 100; j++) { + writer.write(row(j, String.valueOf(s), PART)); + } + // we got 100 files + Assertions.assertThat(writer.prepareCommit(true).newFilesIncrement().newFiles().size()) + .isEqualTo(100); + writer.close(); + } + private FieldStats initStats(Integer min, Integer max, long nullCount) { return new FieldStats(min, max, nullCount); } @@ -328,17 +467,48 @@ private DataFilePathFactory createPathFactory() { } private AppendOnlyWriter createEmptyWriter(long targetFileSize) { - return createWriter(targetFileSize, false, Collections.emptyList()).getLeft(); + return createWriter(targetFileSize, false, false, false, Collections.emptyList()).getLeft(); + } + + private AppendOnlyWriter createEmptyWriter(long targetFileSize, boolean spillable) { + return createWriter(targetFileSize, false, true, spillable, Collections.emptyList()) + .getLeft(); } private Pair> createWriter( long targetFileSize, boolean forceCompact, List scannedFiles) { - return createWriter(targetFileSize, forceCompact, scannedFiles, new CountDownLatch(0)); + return createWriter( + targetFileSize, forceCompact, true, true, scannedFiles, new CountDownLatch(0)); + } + + private Pair> createWriter( + long targetFileSize, + boolean forceCompact, + boolean useWriteBuffer, + boolean spillable, + List scannedFiles) { + return createWriter( + targetFileSize, + forceCompact, + useWriteBuffer, + spillable, + scannedFiles, + new CountDownLatch(0)); + } + + private Pair> createWriter( + long targetFileSize, + boolean forceCompact, + List scannedFiles, + CountDownLatch latch) { + return createWriter(targetFileSize, forceCompact, false, false, scannedFiles, latch); } private Pair> createWriter( long targetFileSize, boolean forceCompact, + boolean useWriteBuffer, + boolean spillable, List scannedFiles, CountDownLatch latch) { FileFormat fileFormat = FileFormat.fromIdentifier(AVRO, new Options()); @@ -358,9 +528,11 @@ private Pair> createWriter( : Collections.singletonList( generateCompactAfter(compactBefore)); }); + CoreOptions options = new CoreOptions(new HashMap<>()); AppendOnlyWriter writer = new AppendOnlyWriter( LocalFileIO.create(), + IOManager.create(tempDir.toString()), SCHEMA_ID, fileFormat, targetFileSize, @@ -370,10 +542,13 @@ private Pair> createWriter( forceCompact, pathFactory, null, + useWriteBuffer, + spillable, CoreOptions.FILE_COMPRESSION.defaultValue(), StatsCollectorFactories.createStatsFactories( - new CoreOptions(new HashMap<>()), - AppendOnlyWriterTest.SCHEMA.getFieldNames())); + options, AppendOnlyWriterTest.SCHEMA.getFieldNames())); + writer.setMemoryPool( + new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); return Pair.of(writer, compactManager.allFiles()); } diff --git a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java index 5bd9ad5115fc..6ca73bd6a2f4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java @@ -165,16 +165,17 @@ private void writeHuge(ExternalBuffer buffer) throws IOException { RandomDataGenerator random = new RandomDataGenerator(); writer.writeString(0, BinaryString.fromString(random.nextHexString(500000))); writer.complete(); - buffer.add(row); + buffer.put(row); } - private void assertBuffer(List expected, ExternalBuffer buffer) { - ExternalBuffer.BufferIterator iterator = buffer.newIterator(); + private void assertBuffer(List expected, InternalRowBuffer buffer) { + InternalRowBuffer.InternalRowBufferIterator iterator = buffer.newIterator(); assertBuffer(expected, iterator); iterator.close(); } - private void assertBuffer(List expected, ExternalBuffer.BufferIterator iterator) { + private void assertBuffer( + List expected, InternalRowBuffer.InternalRowBufferIterator iterator) { List values = new ArrayList<>(); while (iterator.advanceNext()) { values.add(iterator.getRow().getLong(0)); @@ -203,7 +204,7 @@ private long randomInsert(ExternalBuffer buffer) throws IOException { writer.reset(); writer.writeLong(0, l); writer.complete(); - buffer.add(row); + buffer.put(row); return l; } } diff --git a/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java b/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java new file mode 100644 index 000000000000..e1b1277140b6 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.disk; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.memory.HeapMemorySegmentPool; +import org.apache.paimon.types.DataTypes; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.apache.paimon.memory.MemorySegmentPool.DEFAULT_PAGE_SIZE; + +/** Tests for {@link InternalRowBuffer}. */ +public class InMemoryBufferTest { + + private InternalRowSerializer serializer; + + @BeforeEach + public void before() { + this.serializer = new InternalRowSerializer(DataTypes.STRING()); + } + + @Test + public void testNonSpill() throws Exception { + InMemoryBuffer buffer = + new InMemoryBuffer( + new HeapMemorySegmentPool(2 * DEFAULT_PAGE_SIZE, DEFAULT_PAGE_SIZE), + this.serializer); + + BinaryRow binaryRow = new BinaryRow(1); + BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow); + + byte[] s = new byte[20 * 1024]; + Arrays.fill(s, (byte) 'a'); + binaryRowWriter.writeString(0, BinaryString.fromBytes(s)); + binaryRowWriter.complete(); + + boolean result = buffer.put(binaryRow); + Assertions.assertThat(result).isTrue(); + result = buffer.put(binaryRow); + Assertions.assertThat(result).isTrue(); + result = buffer.put(binaryRow); + Assertions.assertThat(result).isTrue(); + result = buffer.put(binaryRow); + Assertions.assertThat(result).isFalse(); + } + + @Test + public void testPutRead() throws Exception { + InMemoryBuffer buffer = + new InMemoryBuffer( + new HeapMemorySegmentPool(2 * DEFAULT_PAGE_SIZE, DEFAULT_PAGE_SIZE), + this.serializer); + + BinaryRow binaryRow = new BinaryRow(1); + BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow); + + byte[] s = new byte[10]; + Arrays.fill(s, (byte) 'a'); + binaryRowWriter.writeString(0, BinaryString.fromBytes(s)); + binaryRowWriter.complete(); + for (int i = 0; i < 100; i++) { + buffer.put(binaryRow.copy()); + } + + Assertions.assertThat(buffer.size()).isEqualTo(100); + try (InternalRowBuffer.InternalRowBufferIterator iterator = buffer.newIterator()) { + while (iterator.advanceNext()) { + Assertions.assertThat(iterator.getRow()).isEqualTo(binaryRow); + } + } + } + + @Test + public void testClose() throws Exception { + InMemoryBuffer buffer = + new InMemoryBuffer( + new HeapMemorySegmentPool(2 * DEFAULT_PAGE_SIZE, DEFAULT_PAGE_SIZE), + this.serializer); + + BinaryRow binaryRow = new BinaryRow(1); + BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow); + + byte[] s = new byte[10]; + Arrays.fill(s, (byte) 'a'); + binaryRowWriter.writeString(0, BinaryString.fromBytes(s)); + binaryRowWriter.complete(); + buffer.put(binaryRow.copy()); + + Assertions.assertThat(buffer.memoryOccupancy()).isGreaterThan(0); + buffer.reset(); + Assertions.assertThat(buffer.memoryOccupancy()).isEqualTo(0); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java index 9e88548d7a79..3404214d1a42 100644 --- a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java @@ -23,12 +23,14 @@ import org.apache.paimon.append.AppendOnlyWriter; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.disk.IOManager; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.io.KeyValueFileReadWriteTest; import org.apache.paimon.io.KeyValueFileWriterFactory; +import org.apache.paimon.memory.HeapMemorySegmentPool; import org.apache.paimon.options.Options; import org.apache.paimon.types.DataType; import org.apache.paimon.types.IntType; @@ -64,9 +66,11 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception new DataFilePathFactory(new Path(tempDir.toString()), "dt=1", 1, format); FileFormat fileFormat = FileFormat.fromIdentifier(format, new Options()); LinkedList toCompact = new LinkedList<>(); + CoreOptions options = new CoreOptions(new HashMap<>()); AppendOnlyWriter appendOnlyWriter = new AppendOnlyWriter( LocalFileIO.create(), + IOManager.create(tempDir.toString()), 0, fileFormat, 10, @@ -76,9 +80,13 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception false, dataFilePathFactory, null, + false, + false, CoreOptions.FILE_COMPRESSION.defaultValue(), StatsCollectorFactories.createStatsFactories( - new CoreOptions(new HashMap<>()), SCHEMA.getFieldNames())); + options, SCHEMA.getFieldNames())); + appendOnlyWriter.setMemoryPool( + new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); appendOnlyWriter.write( GenericRow.of(1, BinaryString.fromString("aaa"), BinaryString.fromString("1"))); CommitIncrement increment = appendOnlyWriter.prepareCommit(true); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java index 0c10b01ff78a..fd36d4a39f8a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java @@ -21,8 +21,8 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.InternalRowSerializer; -import org.apache.paimon.disk.ExternalBuffer; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.disk.InternalRowBuffer; import org.apache.paimon.flink.FlinkRowData; import org.apache.paimon.flink.FlinkRowWrapper; import org.apache.paimon.flink.sink.RowDataPartitionKeyExtractor; @@ -60,7 +60,7 @@ public class GlobalIndexAssignerOperator extends AbstractStreamOperator toRow; private final SerializableFunction fromRow; - private transient ExternalBuffer bootstrapBuffer; + private transient InternalRowBuffer bootstrapBuffer; public GlobalIndexAssignerOperator( Table table, @@ -89,10 +89,11 @@ public void initializeState(StateInitializationContext context) throws Exception long bufferSize = options.get(CoreOptions.WRITE_BUFFER_SIZE).getBytes(); long pageSize = options.get(CoreOptions.PAGE_SIZE).getBytes(); bootstrapBuffer = - new ExternalBuffer( + InternalRowBuffer.getBuffer( IOManager.create(ioManager.getSpillingDirectoriesPaths()), new HeapMemorySegmentPool(bufferSize, (int) pageSize), - new InternalRowSerializer(table.rowType())); + new InternalRowSerializer(table.rowType()), + true); } @Override @@ -106,7 +107,9 @@ public void processElement(StreamRecord> streamRecord) break; case ROW: if (bootstrapBuffer != null) { - bootstrapBuffer.add(toRow.apply(value)); + // ignore return value, we must enable spillable for bootstrapBuffer, so return + // is always true + bootstrapBuffer.put(toRow.apply(value)); } else { assigner.process(value); } @@ -127,7 +130,8 @@ public void endInput() throws Exception { private void endBootstrap() throws Exception { if (bootstrapBuffer != null) { bootstrapBuffer.complete(); - try (ExternalBuffer.BufferIterator iterator = bootstrapBuffer.newIterator()) { + try (InternalRowBuffer.InternalRowBufferIterator iterator = + bootstrapBuffer.newIterator()) { while (iterator.advanceNext()) { assigner.process(fromRow.apply(iterator.getRow())); }