Skip to content

Commit

Permalink
[core] Introduce write-buffer-for-append to writing lots of partitions (
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 committed Sep 12, 2023
1 parent bd178ca commit 3048dbe
Show file tree
Hide file tree
Showing 13 changed files with 819 additions and 155 deletions.
30 changes: 28 additions & 2 deletions docs/content/concepts/append-only-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ under the License.
# Append Only Table

If a table does not have a primary key defined, it is an append-only table by default. Separated by the definition of bucket,
we have two different append-only mode: "Append For Queue" and "Append For Scalable Table".
we have two different append-only mode: "Append For Queue" and "Append For Scalable Table".

## Append For Queue

Expand Down Expand Up @@ -276,4 +276,30 @@ CREATE TABLE MyTable (
```
{{< /tab >}}

{{< /tabs >}}
{{< /tabs >}}

## Multiple Partitions Write
While writing multiple partitions in a single insert job, we may get an out-of-memory error if
too many records arrived between two checkpoint.

On 0.6 version, introduced a `write-buffer-for-append` option for append-only table. Setting this
parameter to true, we will cache the records use Segment Pool to avoid OOM.

### Influence
If we also set `write-buffer-spillable` to true, we can spill the records to disk by serializer.
But this may cause the checkpoint acknowledge delay and have an influence on sink speed.

But if we don't set `write-buffer-spillable`, once we run out of memory in segment poll, we will flush
them to filesystem as a complete data file. This may cause too many small files, and make more pressure on
compaction. We need to trade off carefully.

### Example
```sql
CREATE TABLE MyTable (
product_id BIGINT,
price DOUBLE,
sales BIGINT
) WITH (
'write-buffer-for-append' = 'true'
);
```
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,12 @@
<td>MemorySize</td>
<td>Target size of a file.</td>
</tr>
<tr>
<td><h5>write-buffer-for-append</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>This option only works for append-only table. Whether the write use write buffer to avoid out-of-memory error.</td>
</tr>
<tr>
<td><h5>write-buffer-size</h5></td>
<td style="word-wrap: break-word;">256 mb</td>
Expand Down
11 changes: 11 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,13 @@ public class CoreOptions implements Serializable {
.withDescription(
"Whether the write buffer can be spillable. Enabled by default when using object storage.");

public static final ConfigOption<Boolean> WRITE_BUFFER_FOR_APPEND =
key("write-buffer-for-append")
.booleanType()
.defaultValue(false)
.withDescription(
"This option only works for append-only table. Whether the write use write buffer to avoid out-of-memory error.");

public static final ConfigOption<MemorySize> WRITE_MANIFEST_CACHE =
key("write-manifest-cache")
.memoryType()
Expand Down Expand Up @@ -1007,6 +1014,10 @@ public boolean writeBufferSpillable(boolean usingObjectStore, boolean isStreamin
return options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore || !isStreaming);
}

public boolean useWriteBuffer() {
return options.get(WRITE_BUFFER_FOR_APPEND);
}

public long sortSpillBufferSize() {
return options.get(SORT_SPILL_BUFFER_SIZE).getBytes();
}
Expand Down
202 changes: 185 additions & 17 deletions paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@

package org.apache.paimon.append;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compact.CompactManager;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.InternalRowBuffer;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.NewFilesIncrement;
import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
Expand All @@ -38,6 +44,7 @@

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -48,7 +55,7 @@
* A {@link RecordWriter} implementation that only accepts records which are always insert
* operations and don't have any unique keys or sort keys.
*/
public class AppendOnlyWriter implements RecordWriter<InternalRow> {
public class AppendOnlyWriter implements RecordWriter<InternalRow>, MemoryOwner {

private final FileIO fileIO;
private final long schemaId;
Expand All @@ -63,12 +70,14 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow> {
private final List<DataFileMeta> compactAfter;
private final LongCounter seqNumCounter;
private final String fileCompression;
private final boolean spillable;
private final SinkWriter sinkWriter;
private final FieldStatsCollector.Factory[] statsCollectors;

private RowDataRollingFileWriter writer;
private final IOManager ioManager;

public AppendOnlyWriter(
FileIO fileIO,
IOManager ioManager,
long schemaId,
FileFormat fileFormat,
long targetFileSize,
Expand All @@ -78,6 +87,8 @@ public AppendOnlyWriter(
boolean forceCompact,
DataFilePathFactory pathFactory,
@Nullable CommitIncrement increment,
boolean useWriteBuffer,
boolean spillable,
String fileCompression,
FieldStatsCollector.Factory[] statsCollectors) {
this.fileIO = fileIO;
Expand All @@ -93,9 +104,11 @@ public AppendOnlyWriter(
this.compactAfter = new ArrayList<>();
this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
this.fileCompression = fileCompression;
this.spillable = spillable;
this.ioManager = ioManager;
this.statsCollectors = statsCollectors;

this.writer = createRollingRowWriter();
sinkWriter = createSinkWrite(useWriteBuffer);

if (increment != null) {
newFiles.addAll(increment.newFilesIncrement().newFiles());
Expand All @@ -110,12 +123,22 @@ public void write(InternalRow rowData) throws Exception {
rowData.getRowKind() == RowKind.INSERT,
"Append-only writer can only accept insert row kind, but current row kind is: %s",
rowData.getRowKind());
writer.write(rowData);
boolean success = sinkWriter.write(rowData);
if (!success) {
flush(false, false);
success = sinkWriter.write(rowData);
if (!success) {
// Should not get here, because writeBuffer will throw too big exception out.
// But we throw again in case of something unexpected happens. (like someone changed
// code in SpillableBuffer.)
throw new RuntimeException("Mem table is too small to hold a single element.");
}
}
}

@Override
public void compact(boolean fullCompaction) throws Exception {
flushWriter(true, fullCompaction);
flush(true, fullCompaction);
}

@Override
Expand All @@ -130,19 +153,14 @@ public Collection<DataFileMeta> dataFiles() {

@Override
public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception {
flushWriter(false, false);
flush(false, false);
trySyncLatestCompaction(waitCompaction || forceCompact);
return drainIncrement();
}

private void flushWriter(boolean waitForLatestCompaction, boolean forcedFullCompaction)
private void flush(boolean waitForLatestCompaction, boolean forcedFullCompaction)
throws Exception {
List<DataFileMeta> flushedFiles = new ArrayList<>();
if (writer != null) {
writer.close();
flushedFiles.addAll(writer.result());
writer = createRollingRowWriter();
}
List<DataFileMeta> flushedFiles = sinkWriter.flush();

// add new generated files
flushedFiles.forEach(compactManager::addNewFile);
Expand All @@ -169,9 +187,14 @@ public void close() throws Exception {
fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
}

if (writer != null) {
writer.abort();
writer = null;
sinkWriter.close();
}

private SinkWriter createSinkWrite(boolean useWriteBuffer) {
if (useWriteBuffer) {
return new BufferedSinkWriter();
} else {
return new DirectSinkWriter();
}
}

Expand Down Expand Up @@ -214,4 +237,149 @@ private CommitIncrement drainIncrement() {

return new CommitIncrement(newFilesIncrement, compactIncrement);
}

@Override
public void setMemoryPool(MemorySegmentPool memoryPool) {
sinkWriter.setMemoryPool(memoryPool);
}

@Override
public long memoryOccupancy() {
return sinkWriter.memoryOccupancy();
}

@Override
public void flushMemory() throws Exception {
flush(false, false);
}

@VisibleForTesting
InternalRowBuffer getWriteBuffer() {
if (sinkWriter instanceof BufferedSinkWriter) {
return ((BufferedSinkWriter) sinkWriter).writeBuffer;
} else {
return null;
}
}

@VisibleForTesting
List<DataFileMeta> getNewFiles() {
return newFiles;
}

/** Internal interface to Sink Data from input. */
interface SinkWriter {

boolean write(InternalRow data) throws IOException;

List<DataFileMeta> flush() throws IOException;

long memoryOccupancy();

void close();

void setMemoryPool(MemorySegmentPool memoryPool);
}

/**
* Directly sink data to file, no memory cache here, use OrcWriter/ParquetWrite/etc directly
* write data. May cause out-of-memory.
*/
private class DirectSinkWriter implements SinkWriter {

private RowDataRollingFileWriter writer = createRollingRowWriter();

@Override
public boolean write(InternalRow data) throws IOException {
writer.write(data);
return true;
}

@Override
public List<DataFileMeta> flush() throws IOException {
List<DataFileMeta> flushedFiles = new ArrayList<>();
if (writer != null) {
writer.close();
flushedFiles.addAll(writer.result());
writer = createRollingRowWriter();
}
return flushedFiles;
}

@Override
public long memoryOccupancy() {
return 0;
}

@Override
public void close() {
if (writer != null) {
writer.abort();
writer = null;
}
}

@Override
public void setMemoryPool(MemorySegmentPool memoryPool) {
// do nothing
}
}

/**
* Use buffered writer, segment pooled from segment pool. When spillable, may delay checkpoint
* acknowledge time. When non-spillable, may cause too many small files.
*/
private class BufferedSinkWriter implements SinkWriter {

InternalRowBuffer writeBuffer;

@Override
public boolean write(InternalRow data) throws IOException {
return writeBuffer.put(data);
}

@Override
public List<DataFileMeta> flush() throws IOException {
List<DataFileMeta> flushedFiles = new ArrayList<>();
if (writeBuffer != null) {
writeBuffer.complete();
RowDataRollingFileWriter writer = createRollingRowWriter();
try (InternalRowBuffer.InternalRowBufferIterator iterator =
writeBuffer.newIterator()) {
while (iterator.advanceNext()) {
writer.write(iterator.getRow());
}
} finally {
writer.close();
}
flushedFiles.addAll(writer.result());
// reuse writeBuffer
writeBuffer.reset();
}
return flushedFiles;
}

@Override
public long memoryOccupancy() {
return writeBuffer.memoryOccupancy();
}

@Override
public void close() {
if (writeBuffer != null) {
writeBuffer.reset();
writeBuffer = null;
}
}

@Override
public void setMemoryPool(MemorySegmentPool memoryPool) {
writeBuffer =
InternalRowBuffer.getBuffer(
ioManager,
memoryPool,
new InternalRowSerializer(writeSchema),
spillable);
}
}
}
Loading

0 comments on commit 3048dbe

Please sign in to comment.